from __future__ import annotations

from pathlib import Path
from typing import Literal

import numpy as np
import polars as pl
import seaborn as sns
from scipy import stats

from .Plots import (
    BoxPlot, savgol_line, LinePlotData, LinePlot, plot_stat_expr, HeatMapData, HeatMap,
from .SeqMapper import CytosinesFileCM, SequenceFile
from .Base import (
from .Clusters import ClusterSingle, ClusterMany
from .UniversalReader_batches import ReportTypes
from .utils import MetageneSchema, AvailableSumfunc, CONTEXTS
from .GenomeClass import Genome
from .UniversalReader_classes import UniversalReader

[docs]class Metagene(MetageneBase): """ Stores Metagene data. """
[docs] def __init__(self, report_df: pl.DataFrame, **kwargs): super().__init__(report_df, **kwargs)
[docs] @classmethod def from_bismark( cls, file: str | Path, genome: pl.DataFrame, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, block_size_mb: int = 100, use_threads: bool = True, sumfunc: AvailableSumfunc = "wmean" ): """ Constructor for Metagene class from Bismark ``coverage2cytosine`` report. Parameters ---------- file Path to bismark genomeWide report. genome ``polars.Dataframe`` with gene ranges (from :class:`Genome`) up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` sumfunc Summary function to calculate density for window with. Returns ------- Metagene Examples -------- >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/bismark.CX_report.txt' >>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500) """ reader = UniversalReader(**(locals() | dict(report_type="bismark"))) args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc) report_df = read_metagene(**(locals() | args)) return cls(report_df, upstream_windows=args["upstream_windows"], gene_windows=args["body_windows"], downstream_windows=args["downstream_windows"])
[docs] @classmethod def from_cgmap( cls, file: str | Path, genome: pl.DataFrame, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, block_size_mb: int = 100, use_threads: bool = True, sumfunc: AvailableSumfunc = "wmean" ): """ Constructor for Metagene class from BSSeeker2 CGmap file. Parameters ---------- file Path to CGmap file report. genome ``polars.Dataframe`` with gene ranges (from :class:`Genome`) up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` sumfunc Summary function to calculate density for window with. Returns ------- Metagene Examples -------- >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/CGmap.txt' >>> metagene = Metagene.from_cgmap(path, genome, up_windows=500, body_windows=1000, down_windows=500) """ reader = UniversalReader(**(locals() | dict(report_type="cgmap"))) args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc) report_df = read_metagene(**(locals() | args)) return cls(report_df, upstream_windows=args["upstream_windows"], gene_windows=args["body_windows"], downstream_windows=args["downstream_windows"])
[docs] @classmethod def from_binom( cls, file: str | Path, genome: pl.DataFrame, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, p_value: float = .05, use_threads=True, ): """ Constructor for Metagene class from :meth:`BinomialData.preprocess` ``.parquet`` file. Only ``"mean"`` summary function is supported for construction :class:`Metagene` from binom data. Parameters ---------- p_value P-value of cytosine methylation for it to be considered methylated. file Path to preprocessed `.parquet` file. genome ``polars.Dataframe`` with gene ranges (from :class:`Genome`) up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` Returns ------- Metagene Examples -------- >>> save_name = "preprocessed.parquet" >>> BinomialData.preprocess("path/to/bismark.CX_report.txt", report_type="bismark", name=save_name) >>> >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> metagene = Metagene.from_binom(save_name, genome, up_windows=500, body_windows=1000, down_windows=500) """ reader = UniversalReader(file, "binom", methylation_pvalue=p_value, use_threads=use_threads) args = validate_metagene_args(genome, up_windows, body_windows, down_windows, "wmean") report_df = read_metagene(**(locals() | args)) return cls(report_df, upstream_windows=args["upstream_windows"], gene_windows=args["body_windows"], downstream_windows=args["downstream_windows"])
[docs] @classmethod def from_bedGraph( cls, file: str | Path, genome: pl.DataFrame, fasta: str | Path, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, sumfunc: AvailableSumfunc = "wmean", block_size_mb: int = 30, use_threads: bool = True, save_preprocessed: bool = False, temp_dir: str = Path.cwd() ): """ Constructor for Metagene class from ``.bedGraph`` file. Parameters ---------- file Path to ``.bedGraph`` file. genome ``polars.Dataframe`` with gene ranges (from :class:`Genome`) fasta Path to FASTA genome sequence file or path to preprocessed with :class:`Sequence` cytosine file. up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into sumfunc Summary function to calculate density for window with. temp_dir Directory for temporary files. save_preprocessed Does preprocessed sequence file need to be saved block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` Returns ------- Metagene Examples -------- >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/report.bedGraph' >>> fasta = 'path/to/sequence.fa' >>> metagene = Metagene.from_bedGraph(path, genome, fasta, up_windows=500, body_windows=1000, down_windows=500) """ with CytosinesFileCM(fasta) as cm: cytosine_file = cm.cytosine_path if not cm.is_cytosine: SequenceFile(fasta).preprocess_cytosines(cytosine_file) reader = UniversalReader(file, report_type="bedgraph", use_threads=use_threads, cytosine_file=cytosine_file, block_size_mb=block_size_mb) args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc) report_df = read_metagene(reader, genome, up_windows, body_windows, down_windows, sumfunc) return cls(report_df, upstream_windows=args["upstream_windows"], gene_windows=args["body_windows"], downstream_windows=args["downstream_windows"])
[docs] @classmethod def from_coverage( cls, file: str | Path, genome: pl.DataFrame, fasta: str | Path, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, sumfunc: AvailableSumfunc = "wmean", block_size_mb: int = 30, use_threads: bool = True, save_preprocessed: bool = False, temp_dir: str = "./" ): """ Constructor for Metagene class from ``.cov`` file. Parameters ---------- file Path to ``.cov`` file. genome ``polars.Dataframe`` with gene ranges (from :class:`Genome`) fasta Path to FASTA genome sequence file or path to preprocessed with :class:`Sequence` cytosine file. up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into sumfunc Summary function to calculate density for window with. temp_dir Directory for temporary files. save_preprocessed Does preprocessed sequence file need to be saved block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` Returns ------- Metagene Examples -------- >>> path = 'path/to/report.cov' >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> fasta = 'path/to/sequence.fa' >>> metagene = Metagene.from_coverage(path, genome, fasta, up_windows=500, body_windows=1000, down_windows=500) """ with CytosinesFileCM(fasta) as cm: cytosine_file = cm.cytosine_path if not cm.is_cytosine: SequenceFile(fasta).preprocess_cytosines(cytosine_file) reader = UniversalReader(file, report_type="bedgraph", use_threads=use_threads, cytosine_file=cytosine_file, block_size_mb=block_size_mb) args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc) report_df = read_metagene(reader, genome, up_windows, body_windows, down_windows, sumfunc) return cls(report_df, upstream_windows=args["upstream_windows"], gene_windows=args["body_windows"], downstream_windows=args["downstream_windows"])
[docs] def filter( self, context: Literal["CG", "CHG", "CHH", None] = None, strand: Literal["+", "-", None] = None, chr: str = None, genome: pl.DataFrame = None, id: list[str] = None, coords: list[str] = None ) -> Metagene: """ Method for filtering metagene. Parameters ---------- context Methylation context (CG, CHG, CHH) to filter (only one). strand Strand to filter (+ or -). chr Chromosome name to filter. genome DataFrame with annotation to filter with (from :class:`Genome`) Returns ------- Filtered :class:`Metagene`. Examples -------- >>> metagene shape: (3, 9) ┌─────────────┬────────┬───────┬────────────────┬─────────┬───────────────┬──────────┬─────┬───────┐ │ chr ┆ strand ┆ start ┆ gene ┆ context ┆ id ┆ fragment ┆ sum ┆ count │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ cat ┆ cat ┆ u64 ┆ cat ┆ cat ┆ cat ┆ u32 ┆ f32 ┆ u32 │ ╞═════════════╪════════╪═══════╪════════════════╪═════════╪═══════════════╪══════════╪═════╪═══════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 4 ┆ 0.0 ┆ 1 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CHH ┆ gene-AT1G0101 ┆ 19 ┆ 0.0 ┆ 2 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CHH ┆ gene-AT1G0101 ┆ 20 ┆ 0.0 ┆ 1 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ └─────────────┴────────┴───────┴────────────────┴─────────┴───────────────┴──────────┴─────┴───────┘ >>> metagene.filter(context = "CG", strand = "+") shape: (3, 9) ┌─────────────┬────────┬───────┬────────────────┬─────────┬───────────────┬──────────┬─────┬───────┐ │ chr ┆ strand ┆ start ┆ gene ┆ context ┆ id ┆ fragment ┆ sum ┆ count │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ cat ┆ cat ┆ u64 ┆ cat ┆ cat ┆ cat ┆ u32 ┆ f32 ┆ u32 │ ╞═════════════╪════════╪═══════╪════════════════╪═════════╪═══════════════╪══════════╪═════╪═══════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 4 ┆ 0.0 ┆ 1 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 166 ┆ 0.0 ┆ 1 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ │ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 264 ┆ 0.0 ┆ 1 │ │ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │ └─────────────┴────────┴───────┴────────────────┴─────────┴───────────────┴──────────┴─────┴───────┘ """ context_filter = self.report_df["context"] == context if context is not None else True strand_filter = self.report_df["strand"] == strand if strand is not None else True chr_filter = self.report_df["chr"] == chr if chr is not None else True metadata = self.metadata metadata["context"] = context metadata["strand"] = strand if genome is not None: def genome_filter(df: pl.DataFrame): cast_dtypes = {"chr": pl.Utf8, "strand": pl.Utf8} cast_cat_dtypes = {"chr": pl.Categorical, "strand": pl.Categorical} return df.cast(cast_dtypes).join(genome.cast(cast_dtypes).select(["chr", "start"]), on=["chr", "start"]).cast(cast_cat_dtypes) else: genome_filter = lambda df: df if id is not None: def id_filter(df: pl.DataFrame): return df.filter(pl.col("id").cast(pl.String).is_in(id)) else: id_filter = lambda df: df if coords is not None: def coords_filter(df: pl.DataFrame): return df.filter(pl.col("gene").is_in(coords)) else: coords_filter = lambda df: df if context_filter is None and strand_filter is None and chr_filter is None: return self else: return self.__class__( coords_filter(id_filter(genome_filter(self.report_df.filter(context_filter & strand_filter & chr_filter)))), **metadata)
[docs] def resize(self, to_fragments: int = None) -> Metagene: """ Mutate DataFrame to fewer fragments. Parameters ---------- to_fragments Number of TOTAL (including flanking regions) fragments per gene. Returns ------- Resized :class:`Metagene`. Examples -------- >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/bismark.CX_report.txt' >>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500) >>> metagene Metagene with 20223666 windows total. Filtered by None context and None strand. Upstream windows: 500. Body windows: 1000. Downstream windows: 500. >>> metagene.resize(100) Metagene with 4946800 windows total. Filtered by None context and None strand. Upstream windows: 25. Body windows: 50. Downstream windows: 25. """ if self.upstream_windows is not None and self.gene_windows is not None and self.downstream_windows is not None: from_fragments = self.total_windows else: from_fragments = self.report_df["fragment"].max() + 1 if to_fragments is None or from_fragments <= to_fragments: return self resized = ( self.report_df.lazy() .with_columns( ((pl.col("fragment") / from_fragments) * to_fragments).floor() .cast(MetageneSchema.fragment) ) .group_by( by=['chr', 'strand', 'start', 'gene', 'id', 'context', 'fragment'] ).agg([ pl.sum('sum').alias('sum'), pl.sum('count').alias('count') ]) ).collect() metadata = self.metadata metadata["upstream_windows"] = metadata["upstream_windows"] // (from_fragments // to_fragments) metadata["downstream_windows"] = metadata["downstream_windows"] // (from_fragments // to_fragments) metadata["gene_windows"] = metadata["gene_windows"] // (from_fragments // to_fragments) return self.__class__(resized, **metadata)
[docs] def trim_flank(self, upstream=True, downstream=True) -> Metagene: """ Trim Metagene flanking regions. Parameters ---------- upstream Trim upstream region? downstream Trim downstream region? Returns ------- Trimmed :class:`Metagene` Examples -------- >>> metagene Metagene with 20223666 windows total. Filtered by None context and None strand. Upstream windows: 500. Body windows: 1000. Downstream windows: 500. >>> metagene.trim_flank() Metagene with 7750085 windows total. Filtered by None context and None strand. Upstream windows: 0. Body windows: 1000. Downstream windows: 0. Or if you do not need to trim one of flanking regions (e.g. upstream). >>> metagene.trim_flank(upstream=False) Metagene with 16288550 windows total. Filtered by None context and None strand. Upstream windows: 500. Body windows: 1000. Downstream windows: 0. """ trimmed = self.report_df.lazy() metadata = self.metadata.copy() if downstream: trimmed = ( trimmed .filter(pl.col("fragment") < self.upstream_windows + self.gene_windows) ) metadata["downstream_windows"] = 0 if upstream: trimmed = ( trimmed .filter(pl.col("fragment") > self.upstream_windows - 1) .with_columns(pl.col("fragment") - self.upstream_windows) ) metadata["upstream_windows"] = 0 return self.__class__(trimmed.collect(), **metadata)
[docs] def cluster( self, count_threshold: int = 5, na_rm: float | None = None) -> ClusterSingle: """ Cluster regions with hierarchical clustering, by their methylation pattern. Parameters ---------- na_rm Replace empty windows by specified value. count_threshold Minimum counts per window Returns ------- :class:`ClusterSingle` See Also ------- ClusterSingle : For possible analysis options """ return ClusterSingle(self, count_threshold, na_rm)
@staticmethod def _reverse_strand(df, max_fragment): return ( df .filter(pl.col("strand") == "-") .with_columns((max_fragment - pl.col("fragment")).alias("fragment")) .sort("fragment") ) def line_plot_data( self, resolution: int = None, smooth: int = 50, confidence: float = 0., stat: str = "wmean", merge_strands: bool = True, label="" ): """ Parameters ---------- confidence Probability for confidence bands (e.g. 0.95) smooth Number of windows for `SavGol <>`_ filter (set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified. resolution Number of fragments to resize to. Keep None if no resize is needed. stat Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN`` merge_strands Does negative strand need to be reversed label Label for the data. Returns ------- :class:`LinePlotData` """ resized = self if resolution is None else self.resize(resolution) df = resized.report_df # Merge strands if merge_strands: df = df.filter(pl.col("strand") != "-").extend(self._reverse_strand(df, df["fragment"].max())) # Apply stats expr res = ( df .group_by("fragment") .agg([ plot_stat_expr(stat).alias("density"), pl.col("sum"), pl.col("count"), pl.sum("count").alias("n"), (pl.col("sum") / pl.col("count")).mean().alias("average"), (pl.col("sum") - (pl.col("sum") / pl.col("count"))).mean().pow(2).alias("variance") ]) .sort("fragment") ) if 0 < confidence < 1 and stat in ["mean", "wmean"]: res = ( res .with_columns( (pl.col("variance") / pl.col("n")).sqrt().alias("scale") ) .with_columns( pl.struct(["n", "average", "scale"]) .map_elements( lambda field: stats.t.interval(confidence, df=field["n"] - 1, loc=field["average"], scale=field["scale"]), return_dtype=pl.List(pl.Float64) ).alias("interval") ) .with_columns(pl.col("interval").list.to_struct(fields=["lower", "upper"])) .unnest("interval") ) elif 0 < confidence < 1 and not (stat in ["mean", "wmean"]): raise ValueError("Confidence bands available only for mean and wmean stat parameters.") # Fill empty windows template = pl.DataFrame( dict(fragment=list(range(self.total_windows))), schema=dict(fragment=res.schema["fragment"]) ) joined = template.join(res, on="fragment", how="left") # Calculate CI lower = None upper = None if 0 < confidence < 1 and stat in ["mean", "wmean"]: upper = savgol_line(joined["upper"].to_numpy(), smooth) * 100 lower = savgol_line(joined["lower"].to_numpy(), smooth) * 100 # Smooth and convert to percents y = savgol_line(joined["density"].to_numpy(), smooth) * 100 x = np.arange(len(y)) return LinePlotData(x, y, resized._x_ticks, resized._borders, lower, upper, label, ["Upstream", "", "Body", "", "Downstream"])
[docs] def line_plot( self, resolution: int = None, stat="wmean", smooth: int = 50, confidence: float = 0., merge_strands: bool = True ) -> LinePlot: """ Create :class:`LinePlot` method. Parameters ---------- confidence Probability for confidence bands (e.g. 0.95) smooth Number of windows for `SavGol <>`_ filter (set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified. resolution Number of fragments to resize to. Keep None if no resize is needed. stat Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN`` merge_strands Does negative strand need to be reversed Returns ------- :class:`LinePlot` Notes ----- - ``mean`` – Default mean between bins, when count of cytosine residues in bin IS NOT taken into account - ``wmean`` – Weighted mean between bins. Cytosine residues in bin IS taken into account - ``log`` – NOT weighted geometric mean. - ``wlog`` - Weighted geometric mean. - ``qN`` – Return quantile by ``N`` percent (e.g. "``q50``") See Also -------- :doc:`LinePlot example<../../markdowns/test>` Examples -------- Firstly we need to initialize Metagene class >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/bismark.CX_report.txt' >>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500) Next we can optionally filter metagene by context and strand. >>> filtered = metagene.filter(context = "CG", strand = "-") And LinePlot can be created >>> lp = filtered.line_plot() >>> figure = lp.draw_mpl() >>> .. image:: ../../images/lineplot/lp_ara_mpl.png No filtering is suitable too. Then LinePlot will visualize all methylation contexts. >>> lp = metagene.line_plot() >>> figure = lp.draw_mpl() >>> .. image:: ../../images/lineplot/ara_multi_mpl.png You can use Plotly version for all plots as well. >>> figure = lp.draw_plotly() >>> .. image:: ../../images/lineplot/ara_multi_plotly.png See Also -------- LinePlot : For more information about plottling parameters. """ resized = self.resize(resolution) lp_data = resized.line_plot_data(resolution, smooth, confidence, stat, merge_strands) return LinePlot(lp_data)
[docs] def contexts_line_plot( self, resolution: int = None, stat="wmean", smooth: int = 50, confidence: float = 0., merge_strands: bool = True ) -> LinePlot: resized = self.resize(resolution) lp_data = [ filtered.line_plot_data(resolution, smooth, confidence, stat, merge_strands, context) for context, filtered in zip(CONTEXTS, map(lambda context: resized.filter(context=context), CONTEXTS)) if len(filtered) > 0 ] return LinePlot(lp_data)
def heat_map_data( self, nrow: int = 100, ncol: int = 100, label=None ): resized = self.resize(ncol) report_df = resized.report_df # Merge strands report_df = report_df.filter(pl.col("strand") != "-").extend(self._reverse_strand(report_df, report_df["fragment"].max())) # sort by rows and add row numbers hm_data = ( report_df.lazy() .group_by("gene") .agg([ pl.col("fragment"), (pl.col("sum") / pl.col("count")).alias("density") ]) .with_columns( (pl.col("density").list.sum() / (resized.total_windows + 1)).alias("order") ) .sort('order', descending=True) .with_row_count(name='row') .with_columns( (pl.col('row') / (pl.max("row") + 1) * nrow).floor().alias('row').cast(pl.UInt16) ) .explode(['fragment', 'density']) .group_by(["row", "fragment"]) .agg(pl.mean("density")) ) # prepare full template template = ( pl.LazyFrame(data={"row": list(range(nrow))}) .with_columns(pl.lit(list(range(0, resized.total_windows))).alias("fragment")) .explode("fragment") .with_columns([ pl.col("fragment").cast(MetageneSchema.fragment), pl.col("row").cast(pl.UInt16) ]) ) # join template with actual data hm_data = ( # template join with orig template.join(hm_data, on=['row', 'fragment'], how='left') .fill_null(0) .sort(['row', 'fragment']) .group_by('row', maintain_order=True) .agg(pl.col('density')) .collect() ['density'] .to_list() ) # convert to matrix and percents hm_data = np.array(hm_data, dtype=np.float32) * 100 return HeatMapData(hm_data, resized._x_ticks, resized._borders, label if label is not None else "", ["Upstream", "", "Body", "", "Downstream"])
[docs] def heat_map( self, nrow: int = 100, ncol: int = 100, ) -> HeatMap: """ Create :class:`HeatMap` method. Parameters ---------- nrow Number of fragments to resize to. Keep None if no resize is needed. ncol Number of columns in the resulting heat-map. stat Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN`` Returns ------- :class:`HeatMap` Examples -------- Firstly we need to initialize Metagene class >>> genome = Genome.from_gff("path/to/genome.gff").gene_body() >>> >>> path = 'path/to/bismark.CX_report.txt' >>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500) Next we need to (in contrast with :meth:`Metagene.line_plot`) filter metagene by context and strand. >>> filtered = metagene.filter(context = "CG", strand = "-") And HeatMap can be created. Let it's width be 200 columns and height – 100 rows. >>> hm = filtered.heat_map(nrow=100, ncol=200) >>> figure = hm.draw_mpl() >>> .. image:: ../../images/heatmap/ara_mpl.png Plotly version is also available: >>> figure = hm.draw_plotly() >>> .. image:: ../../images/heatmap/ara_plotly.png See Also -------- Metagene.line_plot : For more information about `stat` parameter. HeatMap : For more information about plotting parameters. """ hm_data = self.heat_map_data(nrow, ncol) return HeatMap(hm_data)
def __str__(self): representation = (f'Metagene with {len(self)} windows total.\n' f'Filtered by {self.context} context and {self.strand} strand.\n' f'Upstream windows: {self.upstream_windows}.\n' f'Body windows: {self.gene_windows}.\n' f'Downstream windows: {self.downstream_windows}.\n') return representation def __repr__(self): return self.__str__() def box_plot_data(self, filter_context: Literal["CG", "CHG", "CHH"] | None = None, label=""): df = self.filter(context=filter_context).report_df if filter_context is not None else self.report_df if not df.is_empty(): data = ( df .group_by(["chr", "start"]) .agg([ pl.first("gene").alias("locus"), pl.first("strand"), pl.first("id"), (pl.sum("sum") / pl.sum("count")).alias("density") ]) ) return BoxPlotData( data["density"].to_list(), label, data["locus"].to_list(), data["id"].to_list() ) else: return BoxPlotData.empty(label)
[docs] def context_box_plot(self): data = [self.box_plot_data(context, context) for context in CONTEXTS] return BoxPlot(data)
[docs]class MetageneFiles(MetageneFilesBase): """ Stores and plots multiple data for :class:`Metagene`. If you want to compare Bismark data with different genomes, create this class with a list of :class:`Bismark` classes. """
[docs] @classmethod def from_list( cls, filenames: list[str | Path], genomes: pl.DataFrame | list[pl.DataFrame], labels: list[str] = None, up_windows: int = 0, body_windows: int = 2000, down_windows: int = 0, report_type: ReportTypes = "bismark", block_size_mb: int = 50, use_threads: bool = True, sumfunc: AvailableSumfunc = "wmean", **kwargs ) -> MetageneFiles: """ Create istance of :class:`MetageneFiles` from list of paths. Parameters ---------- filenames List of filenames to read from genomes Annotation DataFrame or list of Annotations (may be different annotations) labels Labels for plots for Metagenes up_windows Number of windows upstream region to split into body_windows Number of windows body region to split into down_windows Number of windows downstream region to split into report_type Type of input report. Possible options: ``bismark``, ``cgmap``, ``binom``, ``bedgraph``, ``coverage``. block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` sumfunc Summary function to calculate density for window with. Returns ------- MetageneFiles Instance of :class:`MetageneFiles` See Also ________ Metagene Examples -------- Initialization using :meth:`MetageneFiles.from_list` >>> ara_genome = Genome.from_gff("/path/to/arath.gff").gene_body(min_length=2000) >>> brd_genome = Genome.from_gff("/path/to/bradi.gff").gene_body(min_length=2000) >>> >>> metagenes = MetageneFiles.from_list( >>> ["path/to/bradi.txt", "path/to/ara.txt"], >>> [brd_genome, ara_genome], >>> ["BraDi", "AraTh"], >>> report_type="bismark", >>> up_windows=250, body_windows=500, down_windows=250 >>> ) :class:`MetageneFiles` can be initialized explicitly: >>> metagene_ara = Metagene.from_bismark("path/to/ara.txt", ara_genome, up_windows=250, body_windows=500, down_windows=250) >>> metagene_brd = Metagene.from_bismark("path/to/ara.txt", ara_genome, up_windows=250, body_windows=500, down_windows=250) >>> metagenes = MetageneFiles(samples=[metagene_brd, metagene_ara], labels=["BraDi", "AraTh"]) The resulting objects will be identical. Warnings -------- When :class:`MetageneFiles` is initialized explicitly, number of windows needs ot be the same in evety sample """ read_fnc = { "bismark": Metagene.from_bismark, "binom": Metagene.from_binom, "cgmap": Metagene.from_cgmap, "bedgraph": Metagene.from_bedGraph, "coverage": Metagene.from_coverage } if not isinstance(genomes, list): genomes = [genomes] * len(filenames) else: if len(genomes) != len(filenames): raise AttributeError("Number of genomes and filenames provided does not match") default_args = dict( up_windows=up_windows, body_windows=body_windows, down_windows=down_windows, use_threads=use_threads ) if report_type not in ["binom"]: default_args |= dict( block_size_mb=block_size_mb, sumfunc=sumfunc ) samples: list[Metagene] = [] for file, genome in zip(filenames, genomes): sample = read_fnc[report_type](file=file, genome=genome, **default_args, **kwargs) samples.append(sample) return cls(samples, labels)
[docs] def filter(self, context: str = None, strand: str = None, chr: str = None, genome: pl.DataFrame = None, id: list[str] = None, coords: list[str] = None) -> MetageneFiles: """ :meth:`Metagene.filter` all metagenes. Parameters ---------- context Methylation context (CG, CHG, CHH) to filter (only one). strand Strand to filter (+ or -). chr Chromosome name to filter. genome DataFrame with annotation to filter with (e.g. from :class:`Genome`) Returns ------- Filtered :class:`MetageneFiles`. See Also -------- Metagene.filter : For examples. """ return self.__class__([sample.filter(context, strand, chr, genome, id, coords) for sample in self.samples], self.labels)
[docs] def trim_flank(self, upstream=True, downstream=True): """ :meth:`Metagene.trim_flank` all metagenes. Parameters ---------- upstream Trim upstream region? downstream Trim downstream region? Returns ------- Trimmed :class:`Metagene` See Also -------- Metagene.trim_flank : For examples. """ return self.__class__([sample.trim_flank(upstream, downstream) for sample in self.samples], self.labels)
[docs] def resize(self, to_fragments: int): """ :meth:`Metagene.resize` all metagenes. Parameters ---------- to_fragments Number of TOTAL (including flanking regions) fragments per gene. Returns ------- Resized :class:`Metagene`. See Also -------- Metagene.resize : For examples. """ return self.__class__([sample.resize(to_fragments) for sample in self.samples], self.labels)
[docs] def merge(self): """ Merge :class:`MetageneFiles` into single :class:`Metagene` Warnings -------- **ALL** metagenes in :class:`MetageneFiles` need to be aligned to the same annotation. Parameters ---------- Returns ------- Instance of :class:`Metagene`, with merged replicates. """ pl.enable_string_cache() metadata = [sample.metadata for sample in self.samples] upstream_windows = set([md.get("upstream_windows") for md in metadata]) gene_windows = set([md.get("gene_windows") for md in metadata]) downstream_windows = set([md.get("downstream_windows") for md in metadata]) if len(upstream_windows) == len(downstream_windows) == len(gene_windows) == 1: merged = ( pl.concat([sample.report_df for sample in self.samples]).lazy() .group_by(["strand", "context", "chr", "gene", "start", "id", "fragment"], maintain_order=True) .agg([pl.sum("sum").alias("sum"), pl.sum("count").alias("count")]) .select(self.samples[0].report_df.columns) ).collect() return Metagene(merged, upstream_windows=list(upstream_windows)[0], downstream_windows=list(downstream_windows)[0], gene_windows=list(gene_windows)[0]) else: raise Exception("Metadata for merge DataFrames does not match!")
[docs] def line_plot( self, resolution: int = None, smooth: int = 50, confidence: float = 0., stat: str = "wmean", merge_strands: bool = True ): """ Create :class:`LinePlotFiles` method. Parameters ---------- resolution Number of fragments to resize to. Keep None if no resize is needed. stat Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN`` merge_strands Does negative strand need to be reversed confidence Probability for confidence bands (e.g. 0.95) smooth Number of windows for `SavGol <>`_ filter (set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified. Returns ------- :class:`LinePlot` See Also -------- Metagene.line_plot : for more information about `stat` parameter. Examples -------- >>> filtered = metagenes.filter("CG") >>> lp = metagenes.line_plot() Matplotlib version: >>> figure = lp.draw_mpl() >>> .. image:: ../../images/lineplot/ara_bradi_mpl.png Plotly version >>> figure = lp.draw_plotly() >>> Now when we have metagene initlaized .. image:: ../../images/lineplot/ara_bradi_plotly.png """ lp_data = [ sample.line_plot_data(resolution, smooth, confidence, stat, merge_strands, label) for sample, label in zip(self.samples, self.labels) ] return LinePlot(lp_data)
[docs] def heat_map(self, nrow: int = 100, ncol: int = None) -> HeatMap: """ Create :class:`HeatMapNew` method. Parameters ---------- nrow Number of fragments to resize to. Keep None if no resize is needed. ncol Number of columns in the resulting heat-map. stat Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN`` Returns ------- :class:`HeatMap` See Also -------- Metagene.line_plot : for more information about `stat` parameter. Metagene.heat_map Examples -------- >>> filtered = metagenes.filter("CG") >>> hm = metagenes.heat_map(nrow=100, ncol=100) Matplotlib version: >>> figure = hm.draw_mpl(major_labels=None) >>> .. image:: ../../images/heatmap/ara_bradi_mpl.png Plotly version >>> figure = hm.draw_plotly(major_labels=None) >>> Now when we have metagene initlaized .. image:: ../../images/heatmap/ara_bradi_plotly.png """ hm_data = [ sample.heat_map_data(nrow, ncol, label) for sample, label in zip(self.samples, self.labels) ] return HeatMap(hm_data)
def box_plot_data(self, filter_context: Literal["CG", "CHG", "CHH"] | None = None): return [sample.box_plot_data(filter_context, label) for label, sample in zip(self.labels, self.samples)] # Todo add fixes from issue14-16#fix
[docs] def box_plot(self): return BoxPlot(self.box_plot_data())
[docs] def dendrogram(self, q: float = .75, **kwargs): """ Cluster samples by total methylation level of regions and draw dendrogram. Parameters ---------- q Quantile of regions, which will be clustered kwargs Keyword arguments for `seaborn.clustermap <>`_ Returns ------- ``matplotlib.pyplot.Figure`` """ gene_stats = [] for sample, label in zip(self.samples, self.labels): gene_stats.append( sample.report_df .group_by(["chr", "strand", "start", "gene"]) .agg((pl.sum("sum") / pl.sum("count")).alias("density")) .with_columns(pl.lit(label).alias("label")) ) gene_set = set.intersection(*[set(stat["gene"].to_list()) for stat in gene_stats]) if len(gene_set) < 1: raise ValueError("Region set intersection is empty. Are Metagenes read with same genome?") gene_stats = [stat.filter(pl.col("gene").is_in(list(gene_set))) for stat in gene_stats] dendro_data = pl.concat(gene_stats).pivot(values="density", columns="label", index="gene", aggregate_function="mean") matrix ="gene")).to_pandas() # Filter by variance if q > 0: var = matrix.to_numpy().var(1) matrix = matrix[var > np.quantile(var, q)] fig = sns.clustermap(matrix, **kwargs) return fig
[docs] def cluster(self, count_threshold=5, na_rm: float | None = None): """ Cluster samples regions by methylation pattern. Parameters ---------- count_threshold Minimum number of reads per fragment to include it in analysis. na_rm Value to fill empty fragments. (None if not full regions need to be deleted) Returns ------- ClusterMany """ return ClusterMany(self, count_threshold, na_rm)