Source code for bsxplorer.MetageneClasses
from __future__ import annotations
from pathlib import Path
from typing import Literal
import numpy as np
import polars as pl
import seaborn as sns
from scipy import stats
from .Plots import (
BoxPlot, savgol_line, LinePlotData, LinePlot, plot_stat_expr, HeatMapData, HeatMap,
BoxPlotData
)
from .SeqMapper import CytosinesFileCM, SequenceFile
from .Base import (
MetageneBase,
MetageneFilesBase,
read_metagene,
validate_metagene_args
)
from .Clusters import ClusterSingle, ClusterMany
from .UniversalReader_batches import ReportTypes
from .utils import MetageneSchema, AvailableSumfunc, CONTEXTS
from .GenomeClass import Genome
from .UniversalReader_classes import UniversalReader
[docs]class Metagene(MetageneBase):
"""
Stores Metagene data.
"""
[docs] @classmethod
def from_bismark(
cls,
file: str | Path,
genome: pl.DataFrame,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
block_size_mb: int = 100,
use_threads: bool = True,
sumfunc: AvailableSumfunc = "wmean"
):
"""
Constructor for Metagene class from Bismark ``coverage2cytosine`` report.
Parameters
----------
file
Path to bismark genomeWide report.
genome
``polars.Dataframe`` with gene ranges (from :class:`Genome`)
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
block_size_mb
Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.)
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
sumfunc
Summary function to calculate density for window with.
Returns
-------
Metagene
Examples
--------
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/bismark.CX_report.txt'
>>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500)
"""
reader = UniversalReader(**(locals() | dict(report_type="bismark")))
args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc)
report_df = read_metagene(**(locals() | args))
return cls(report_df,
upstream_windows=args["upstream_windows"],
gene_windows=args["body_windows"],
downstream_windows=args["downstream_windows"])
[docs] @classmethod
def from_cgmap(
cls,
file: str | Path,
genome: pl.DataFrame,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
block_size_mb: int = 100,
use_threads: bool = True,
sumfunc: AvailableSumfunc = "wmean"
):
"""
Constructor for Metagene class from BSSeeker2 CGmap file.
Parameters
----------
file
Path to CGmap file report.
genome
``polars.Dataframe`` with gene ranges (from :class:`Genome`)
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
block_size_mb
Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.)
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
sumfunc
Summary function to calculate density for window with.
Returns
-------
Metagene
Examples
--------
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/CGmap.txt'
>>> metagene = Metagene.from_cgmap(path, genome, up_windows=500, body_windows=1000, down_windows=500)
"""
reader = UniversalReader(**(locals() | dict(report_type="cgmap")))
args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc)
report_df = read_metagene(**(locals() | args))
return cls(report_df,
upstream_windows=args["upstream_windows"],
gene_windows=args["body_windows"],
downstream_windows=args["downstream_windows"])
[docs] @classmethod
def from_binom(
cls,
file: str | Path,
genome: pl.DataFrame,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
p_value: float = .05,
use_threads=True,
):
"""
Constructor for Metagene class from :meth:`BinomialData.preprocess` ``.parquet`` file.
Only ``"mean"`` summary function is supported for construction :class:`Metagene` from binom data.
Parameters
----------
p_value
P-value of cytosine methylation for it to be considered methylated.
file
Path to preprocessed `.parquet` file.
genome
``polars.Dataframe`` with gene ranges (from :class:`Genome`)
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
Returns
-------
Metagene
Examples
--------
>>> save_name = "preprocessed.parquet"
>>> BinomialData.preprocess("path/to/bismark.CX_report.txt", report_type="bismark", name=save_name)
>>>
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>> metagene = Metagene.from_binom(save_name, genome, up_windows=500, body_windows=1000, down_windows=500)
"""
reader = UniversalReader(file, "binom", methylation_pvalue=p_value, use_threads=use_threads)
args = validate_metagene_args(genome, up_windows, body_windows, down_windows, "wmean")
report_df = read_metagene(**(locals() | args))
return cls(report_df,
upstream_windows=args["upstream_windows"],
gene_windows=args["body_windows"],
downstream_windows=args["downstream_windows"])
[docs] @classmethod
def from_bedGraph(
cls,
file: str | Path,
genome: pl.DataFrame,
fasta: str | Path,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
sumfunc: AvailableSumfunc = "wmean",
block_size_mb: int = 30,
use_threads: bool = True,
save_preprocessed: bool = False,
temp_dir: str = Path.cwd()
):
"""
Constructor for Metagene class from ``.bedGraph`` file.
Parameters
----------
file
Path to ``.bedGraph`` file.
genome
``polars.Dataframe`` with gene ranges (from :class:`Genome`)
fasta
Path to FASTA genome sequence file or path to preprocessed with :class:`Sequence` cytosine file.
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
sumfunc
Summary function to calculate density for window with.
temp_dir
Directory for temporary files.
save_preprocessed
Does preprocessed sequence file need to be saved
block_size_mb
Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.)
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
Returns
-------
Metagene
Examples
--------
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/report.bedGraph'
>>> fasta = 'path/to/sequence.fa'
>>> metagene = Metagene.from_bedGraph(path, genome, fasta, up_windows=500, body_windows=1000, down_windows=500)
"""
with CytosinesFileCM(fasta) as cm:
cytosine_file = cm.cytosine_path
if not cm.is_cytosine:
SequenceFile(fasta).preprocess_cytosines(cytosine_file)
reader = UniversalReader(file, report_type="bedgraph", use_threads=use_threads, cytosine_file=cytosine_file,
block_size_mb=block_size_mb)
args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc)
report_df = read_metagene(reader, genome, up_windows, body_windows, down_windows, sumfunc)
return cls(report_df,
upstream_windows=args["upstream_windows"],
gene_windows=args["body_windows"],
downstream_windows=args["downstream_windows"])
[docs] @classmethod
def from_coverage(
cls,
file: str | Path,
genome: pl.DataFrame,
fasta: str | Path,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
sumfunc: AvailableSumfunc = "wmean",
block_size_mb: int = 30,
use_threads: bool = True,
save_preprocessed: bool = False,
temp_dir: str = "./"
):
"""
Constructor for Metagene class from ``.cov`` file.
Parameters
----------
file
Path to ``.cov`` file.
genome
``polars.Dataframe`` with gene ranges (from :class:`Genome`)
fasta
Path to FASTA genome sequence file or path to preprocessed with :class:`Sequence` cytosine file.
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
sumfunc
Summary function to calculate density for window with.
temp_dir
Directory for temporary files.
save_preprocessed
Does preprocessed sequence file need to be saved
block_size_mb
Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.)
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
Returns
-------
Metagene
Examples
--------
>>> path = 'path/to/report.cov'
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> fasta = 'path/to/sequence.fa'
>>> metagene = Metagene.from_coverage(path, genome, fasta, up_windows=500, body_windows=1000, down_windows=500)
"""
with CytosinesFileCM(fasta) as cm:
cytosine_file = cm.cytosine_path
if not cm.is_cytosine:
SequenceFile(fasta).preprocess_cytosines(cytosine_file)
reader = UniversalReader(file, report_type="bedgraph", use_threads=use_threads, cytosine_file=cytosine_file,
block_size_mb=block_size_mb)
args = validate_metagene_args(genome, up_windows, body_windows, down_windows, sumfunc)
report_df = read_metagene(reader, genome, up_windows, body_windows, down_windows, sumfunc)
return cls(report_df,
upstream_windows=args["upstream_windows"],
gene_windows=args["body_windows"],
downstream_windows=args["downstream_windows"])
# Todo Check and update
[docs] def filter(
self,
context: Literal["CG", "CHG", "CHH", None] = None,
strand: Literal["+", "-", None] = None,
chr: str = None,
genome: pl.DataFrame = None,
id: list[str] = None,
coords: list[str] = None
) -> Metagene:
"""
Method for filtering metagene.
Parameters
----------
context
Methylation context (CG, CHG, CHH) to filter (only one).
strand
Strand to filter (+ or -).
chr
Chromosome name to filter.
genome
DataFrame with annotation to filter with (from :class:`Genome`)
Returns
-------
Filtered :class:`Metagene`.
Examples
--------
>>> metagene
shape: (3, 9)
┌─────────────┬────────┬───────┬────────────────┬─────────┬───────────────┬──────────┬─────┬───────┐
│ chr ┆ strand ┆ start ┆ gene ┆ context ┆ id ┆ fragment ┆ sum ┆ count │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ cat ┆ cat ┆ u64 ┆ cat ┆ cat ┆ cat ┆ u32 ┆ f32 ┆ u32 │
╞═════════════╪════════╪═══════╪════════════════╪═════════╪═══════════════╪══════════╪═════╪═══════╡
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 4 ┆ 0.0 ┆ 1 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CHH ┆ gene-AT1G0101 ┆ 19 ┆ 0.0 ┆ 2 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CHH ┆ gene-AT1G0101 ┆ 20 ┆ 0.0 ┆ 1 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
└─────────────┴────────┴───────┴────────────────┴─────────┴───────────────┴──────────┴─────┴───────┘
>>> metagene.filter(context = "CG", strand = "+")
shape: (3, 9)
┌─────────────┬────────┬───────┬────────────────┬─────────┬───────────────┬──────────┬─────┬───────┐
│ chr ┆ strand ┆ start ┆ gene ┆ context ┆ id ┆ fragment ┆ sum ┆ count │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ cat ┆ cat ┆ u64 ┆ cat ┆ cat ┆ cat ┆ u32 ┆ f32 ┆ u32 │
╞═════════════╪════════╪═══════╪════════════════╪═════════╪═══════════════╪══════════╪═════╪═══════╡
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 4 ┆ 0.0 ┆ 1 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 166 ┆ 0.0 ┆ 1 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
│ NC_003070.9 ┆ + ┆ 3631 ┆ NC_003070.9:36 ┆ CG ┆ gene-AT1G0101 ┆ 264 ┆ 0.0 ┆ 1 │
│ ┆ ┆ ┆ 31-5899 ┆ ┆ 0 ┆ ┆ ┆ │
└─────────────┴────────┴───────┴────────────────┴─────────┴───────────────┴──────────┴─────┴───────┘
"""
context_filter = self.report_df["context"] == context if context is not None else True
strand_filter = self.report_df["strand"] == strand if strand is not None else True
chr_filter = self.report_df["chr"] == chr if chr is not None else True
metadata = self.metadata
metadata["context"] = context
metadata["strand"] = strand
if genome is not None:
def genome_filter(df: pl.DataFrame):
cast_dtypes = {"chr": pl.Utf8, "strand": pl.Utf8}
cast_cat_dtypes = {"chr": pl.Categorical, "strand": pl.Categorical}
return df.cast(cast_dtypes).join(genome.cast(cast_dtypes).select(["chr", "start"]), on=["chr", "start"]).cast(cast_cat_dtypes)
else:
genome_filter = lambda df: df
if id is not None:
def id_filter(df: pl.DataFrame):
return df.filter(pl.col("id").cast(pl.String).is_in(id))
else:
id_filter = lambda df: df
if coords is not None:
def coords_filter(df: pl.DataFrame):
return df.filter(pl.col("gene").is_in(coords))
else:
coords_filter = lambda df: df
if context_filter is None and strand_filter is None and chr_filter is None:
return self
else:
return self.__class__(
coords_filter(id_filter(genome_filter(self.report_df.filter(context_filter & strand_filter & chr_filter)))),
**metadata)
[docs] def resize(self, to_fragments: int = None) -> Metagene:
"""
Mutate DataFrame to fewer fragments.
Parameters
----------
to_fragments
Number of TOTAL (including flanking regions) fragments per gene.
Returns
-------
Resized :class:`Metagene`.
Examples
--------
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/bismark.CX_report.txt'
>>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500)
>>> metagene
Metagene with 20223666 windows total.
Filtered by None context and None strand.
Upstream windows: 500.
Body windows: 1000.
Downstream windows: 500.
>>> metagene.resize(100)
Metagene with 4946800 windows total.
Filtered by None context and None strand.
Upstream windows: 25.
Body windows: 50.
Downstream windows: 25.
"""
if self.upstream_windows is not None and self.gene_windows is not None and self.downstream_windows is not None:
from_fragments = self.total_windows
else:
from_fragments = self.report_df["fragment"].max() + 1
if to_fragments is None or from_fragments <= to_fragments:
return self
resized = (
self.report_df.lazy()
.with_columns(
((pl.col("fragment") / from_fragments) * to_fragments).floor()
.cast(MetageneSchema.fragment)
)
.group_by(
by=['chr', 'strand', 'start', 'gene', 'id', 'context', 'fragment']
).agg([
pl.sum('sum').alias('sum'),
pl.sum('count').alias('count')
])
).collect()
metadata = self.metadata
metadata["upstream_windows"] = metadata["upstream_windows"] // (from_fragments // to_fragments)
metadata["downstream_windows"] = metadata["downstream_windows"] // (from_fragments // to_fragments)
metadata["gene_windows"] = metadata["gene_windows"] // (from_fragments // to_fragments)
return self.__class__(resized, **metadata)
[docs] def trim_flank(self, upstream=True, downstream=True) -> Metagene:
"""
Trim Metagene flanking regions.
Parameters
----------
upstream
Trim upstream region?
downstream
Trim downstream region?
Returns
-------
Trimmed :class:`Metagene`
Examples
--------
>>> metagene
Metagene with 20223666 windows total.
Filtered by None context and None strand.
Upstream windows: 500.
Body windows: 1000.
Downstream windows: 500.
>>> metagene.trim_flank()
Metagene with 7750085 windows total.
Filtered by None context and None strand.
Upstream windows: 0.
Body windows: 1000.
Downstream windows: 0.
Or if you do not need to trim one of flanking regions (e.g. upstream).
>>> metagene.trim_flank(upstream=False)
Metagene with 16288550 windows total.
Filtered by None context and None strand.
Upstream windows: 500.
Body windows: 1000.
Downstream windows: 0.
"""
trimmed = self.report_df.lazy()
metadata = self.metadata.copy()
if downstream:
trimmed = (
trimmed
.filter(pl.col("fragment") < self.upstream_windows + self.gene_windows)
)
metadata["downstream_windows"] = 0
if upstream:
trimmed = (
trimmed
.filter(pl.col("fragment") > self.upstream_windows - 1)
.with_columns(pl.col("fragment") - self.upstream_windows)
)
metadata["upstream_windows"] = 0
return self.__class__(trimmed.collect(), **metadata)
# TODO finish annotation
[docs] def cluster(
self,
count_threshold: int = 5,
na_rm: float | None = None) -> ClusterSingle:
"""
Cluster regions with hierarchical clustering, by their methylation pattern.
Parameters
----------
na_rm
Replace empty windows by specified value.
count_threshold
Minimum counts per window
Returns
-------
:class:`ClusterSingle`
See Also
-------
ClusterSingle : For possible analysis options
"""
return ClusterSingle(self, count_threshold, na_rm)
@staticmethod
def _reverse_strand(df, max_fragment):
return (
df
.filter(pl.col("strand") == "-")
.with_columns((max_fragment - pl.col("fragment")).alias("fragment"))
.sort("fragment")
)
def line_plot_data(
self,
resolution: int = None,
smooth: int = 50,
confidence: float = 0.,
stat: str = "wmean",
merge_strands: bool = True,
label=""
):
"""
Parameters
----------
confidence
Probability for confidence bands (e.g. 0.95)
smooth
Number of windows for
`SavGol <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_ filter
(set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified.
resolution
Number of fragments to resize to. Keep None if no resize is needed.
stat
Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN``
merge_strands
Does negative strand need to be reversed
label
Label for the data.
Returns
-------
:class:`LinePlotData`
"""
resized = self if resolution is None else self.resize(resolution)
df = resized.report_df
# Merge strands
if merge_strands:
df = df.filter(pl.col("strand") != "-").extend(self._reverse_strand(df, df["fragment"].max()))
# Apply stats expr
res = (
df
.group_by("fragment")
.agg([
plot_stat_expr(stat).alias("density"),
pl.col("sum"),
pl.col("count"),
pl.sum("count").alias("n"),
(pl.col("sum") / pl.col("count")).mean().alias("average"),
(pl.col("sum") - (pl.col("sum") / pl.col("count"))).mean().pow(2).alias("variance")
])
.sort("fragment")
)
if 0 < confidence < 1 and stat in ["mean", "wmean"]:
res = (
res
.with_columns(
(pl.col("variance") / pl.col("n")).sqrt().alias("scale")
)
.with_columns(
pl.struct(["n", "average", "scale"])
.map_elements(
lambda field: stats.t.interval(confidence, df=field["n"] - 1, loc=field["average"], scale=field["scale"]),
return_dtype=pl.List(pl.Float64)
).alias("interval")
)
.with_columns(pl.col("interval").list.to_struct(fields=["lower", "upper"]))
.unnest("interval")
)
elif 0 < confidence < 1 and not (stat in ["mean", "wmean"]):
raise ValueError("Confidence bands available only for mean and wmean stat parameters.")
# Fill empty windows
template = pl.DataFrame(
dict(fragment=list(range(self.total_windows))),
schema=dict(fragment=res.schema["fragment"])
)
joined = template.join(res, on="fragment", how="left")
# Calculate CI
lower = None
upper = None
if 0 < confidence < 1 and stat in ["mean", "wmean"]:
upper = savgol_line(joined["upper"].to_numpy(), smooth) * 100
lower = savgol_line(joined["lower"].to_numpy(), smooth) * 100
# Smooth and convert to percents
y = savgol_line(joined["density"].to_numpy(), smooth) * 100
x = np.arange(len(y))
return LinePlotData(x, y, resized._x_ticks, resized._borders, lower, upper, label,
["Upstream", "", "Body", "", "Downstream"])
[docs] def line_plot(
self,
resolution: int = None,
stat="wmean",
smooth: int = 50,
confidence: float = 0.,
merge_strands: bool = True
) -> LinePlot:
"""
Create :class:`LinePlot` method.
Parameters
----------
confidence
Probability for confidence bands (e.g. 0.95)
smooth
Number of windows for
`SavGol <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_ filter
(set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified.
resolution
Number of fragments to resize to. Keep None if no resize is needed.
stat
Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN``
merge_strands
Does negative strand need to be reversed
Returns
-------
:class:`LinePlot`
Notes
-----
- ``mean`` – Default mean between bins, when count of cytosine residues in bin IS NOT taken into account
- ``wmean`` – Weighted mean between bins. Cytosine residues in bin IS taken into account
- ``log`` – NOT weighted geometric mean.
- ``wlog`` - Weighted geometric mean.
- ``qN`` – Return quantile by ``N`` percent (e.g. "``q50``")
See Also
--------
:doc:`LinePlot example<../../markdowns/test>`
Examples
--------
Firstly we need to initialize Metagene class
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/bismark.CX_report.txt'
>>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500)
Next we can optionally filter metagene by context and strand.
>>> filtered = metagene.filter(context = "CG", strand = "-")
And LinePlot can be created
>>> lp = filtered.line_plot()
>>> figure = lp.draw_mpl()
>>> figure.show()
.. image:: ../../images/lineplot/lp_ara_mpl.png
No filtering is suitable too. Then LinePlot will visualize all methylation contexts.
>>> lp = metagene.line_plot()
>>> figure = lp.draw_mpl()
>>> figure.show()
.. image:: ../../images/lineplot/ara_multi_mpl.png
You can use Plotly version for all plots as well.
>>> figure = lp.draw_plotly()
>>> figure.show()
.. image:: ../../images/lineplot/ara_multi_plotly.png
See Also
--------
LinePlot : For more information about plottling parameters.
"""
resized = self.resize(resolution)
lp_data = resized.line_plot_data(resolution, smooth, confidence, stat, merge_strands)
return LinePlot(lp_data)
[docs] def contexts_line_plot(
self,
resolution: int = None,
stat="wmean",
smooth: int = 50,
confidence: float = 0.,
merge_strands: bool = True
) -> LinePlot:
resized = self.resize(resolution)
lp_data = [
filtered.line_plot_data(resolution, smooth, confidence, stat, merge_strands, context) for context, filtered
in zip(CONTEXTS, map(lambda context: resized.filter(context=context), CONTEXTS))
if len(filtered) > 0
]
return LinePlot(lp_data)
def heat_map_data(
self,
nrow: int = 100,
ncol: int = 100,
label=None
):
resized = self.resize(ncol)
report_df = resized.report_df
# Merge strands
report_df = report_df.filter(pl.col("strand") != "-").extend(self._reverse_strand(report_df, report_df["fragment"].max()))
# sort by rows and add row numbers
hm_data = (
report_df.lazy()
.group_by("gene")
.agg([
pl.col("fragment"),
(pl.col("sum") / pl.col("count")).alias("density")
])
.with_columns(
(pl.col("density").list.sum() / (resized.total_windows + 1)).alias("order")
)
.sort('order', descending=True)
.with_row_count(name='row')
.with_columns(
(pl.col('row') / (pl.max("row") + 1) * nrow).floor().alias('row').cast(pl.UInt16)
)
.explode(['fragment', 'density'])
.group_by(["row", "fragment"])
.agg(pl.mean("density"))
)
# prepare full template
template = (
pl.LazyFrame(data={"row": list(range(nrow))})
.with_columns(pl.lit(list(range(0, resized.total_windows))).alias("fragment"))
.explode("fragment")
.with_columns([
pl.col("fragment").cast(MetageneSchema.fragment),
pl.col("row").cast(pl.UInt16)
])
)
# join template with actual data
hm_data = (
# template join with orig
template.join(hm_data, on=['row', 'fragment'], how='left')
.fill_null(0)
.sort(['row', 'fragment'])
.group_by('row', maintain_order=True)
.agg(pl.col('density'))
.collect()
['density']
.to_list()
)
# convert to matrix and percents
hm_data = np.array(hm_data, dtype=np.float32) * 100
return HeatMapData(hm_data, resized._x_ticks, resized._borders, label if label is not None else "", ["Upstream", "", "Body", "", "Downstream"])
[docs] def heat_map(
self,
nrow: int = 100,
ncol: int = 100,
) -> HeatMap:
"""
Create :class:`HeatMap` method.
Parameters
----------
nrow
Number of fragments to resize to. Keep None if no resize is needed.
ncol
Number of columns in the resulting heat-map.
stat
Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN``
Returns
-------
:class:`HeatMap`
Examples
--------
Firstly we need to initialize Metagene class
>>> genome = Genome.from_gff("path/to/genome.gff").gene_body()
>>>
>>> path = 'path/to/bismark.CX_report.txt'
>>> metagene = Metagene.from_bismark(path, genome, up_windows=500, body_windows=1000, down_windows=500)
Next we need to (in contrast with :meth:`Metagene.line_plot`) filter metagene by context and strand.
>>> filtered = metagene.filter(context = "CG", strand = "-")
And HeatMap can be created. Let it's width be 200 columns and height – 100 rows.
>>> hm = filtered.heat_map(nrow=100, ncol=200)
>>> figure = hm.draw_mpl()
>>> figure.show()
.. image:: ../../images/heatmap/ara_mpl.png
Plotly version is also available:
>>> figure = hm.draw_plotly()
>>> figure.show()
.. image:: ../../images/heatmap/ara_plotly.png
See Also
--------
Metagene.line_plot : For more information about `stat` parameter.
HeatMap : For more information about plotting parameters.
"""
hm_data = self.heat_map_data(nrow, ncol)
return HeatMap(hm_data)
def __str__(self):
representation = (f'Metagene with {len(self)} windows total.\n'
f'Filtered by {self.context} context and {self.strand} strand.\n'
f'Upstream windows: {self.upstream_windows}.\n'
f'Body windows: {self.gene_windows}.\n'
f'Downstream windows: {self.downstream_windows}.\n')
return representation
def __repr__(self):
return self.__str__()
def box_plot_data(self, filter_context: Literal["CG", "CHG", "CHH"] | None = None, label=""):
df = self.filter(context=filter_context).report_df if filter_context is not None else self.report_df
if not df.is_empty():
data = (
df
.group_by(["chr", "start"])
.agg([
pl.first("gene").alias("locus"),
pl.first("strand"),
pl.first("id"),
(pl.sum("sum") / pl.sum("count")).alias("density")
])
)
return BoxPlotData(
data["density"].to_list(),
label,
data["locus"].to_list(),
data["id"].to_list()
)
else:
return BoxPlotData.empty(label)
[docs] def context_box_plot(self):
data = [self.box_plot_data(context, context) for context in CONTEXTS]
return BoxPlot(data)
[docs]class MetageneFiles(MetageneFilesBase):
"""
Stores and plots multiple data for :class:`Metagene`.
If you want to compare Bismark data with different genomes, create this class with a list of :class:`Bismark` classes.
"""
[docs] @classmethod
def from_list(
cls,
filenames: list[str | Path],
genomes: pl.DataFrame | list[pl.DataFrame],
labels: list[str] = None,
up_windows: int = 0,
body_windows: int = 2000,
down_windows: int = 0,
report_type: ReportTypes = "bismark",
block_size_mb: int = 50,
use_threads: bool = True,
sumfunc: AvailableSumfunc = "wmean",
**kwargs
) -> MetageneFiles:
"""
Create istance of :class:`MetageneFiles` from list of paths.
Parameters
----------
filenames
List of filenames to read from
genomes
Annotation DataFrame or list of Annotations (may be different annotations)
labels
Labels for plots for Metagenes
up_windows
Number of windows upstream region to split into
body_windows
Number of windows body region to split into
down_windows
Number of windows downstream region to split into
report_type
Type of input report. Possible options: ``bismark``, ``cgmap``, ``binom``, ``bedgraph``, ``coverage``.
block_size_mb
Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.)
use_threads
Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()`
sumfunc
Summary function to calculate density for window with.
Returns
-------
MetageneFiles
Instance of :class:`MetageneFiles`
See Also
________
Metagene
Examples
--------
Initialization using :meth:`MetageneFiles.from_list`
>>> ara_genome = Genome.from_gff("/path/to/arath.gff").gene_body(min_length=2000)
>>> brd_genome = Genome.from_gff("/path/to/bradi.gff").gene_body(min_length=2000)
>>>
>>> metagenes = MetageneFiles.from_list(
>>> ["path/to/bradi.txt", "path/to/ara.txt"],
>>> [brd_genome, ara_genome],
>>> ["BraDi", "AraTh"],
>>> report_type="bismark",
>>> up_windows=250, body_windows=500, down_windows=250
>>> )
:class:`MetageneFiles` can be initialized explicitly:
>>> metagene_ara = Metagene.from_bismark("path/to/ara.txt", ara_genome, up_windows=250, body_windows=500, down_windows=250)
>>> metagene_brd = Metagene.from_bismark("path/to/ara.txt", ara_genome, up_windows=250, body_windows=500, down_windows=250)
>>> metagenes = MetageneFiles(samples=[metagene_brd, metagene_ara], labels=["BraDi", "AraTh"])
The resulting objects will be identical.
Warnings
--------
When :class:`MetageneFiles` is initialized explicitly, number of windows needs ot be the same in evety sample
"""
read_fnc = {
"bismark": Metagene.from_bismark,
"binom": Metagene.from_binom,
"cgmap": Metagene.from_cgmap,
"bedgraph": Metagene.from_bedGraph,
"coverage": Metagene.from_coverage
}
if not isinstance(genomes, list):
genomes = [genomes] * len(filenames)
else:
if len(genomes) != len(filenames):
raise AttributeError("Number of genomes and filenames provided does not match")
default_args = dict(
up_windows=up_windows,
body_windows=body_windows,
down_windows=down_windows,
use_threads=use_threads
)
if report_type not in ["binom"]:
default_args |= dict(
block_size_mb=block_size_mb,
sumfunc=sumfunc
)
samples: list[Metagene] = []
for file, genome in zip(filenames, genomes):
sample = read_fnc[report_type](file=file, genome=genome, **default_args, **kwargs)
samples.append(sample)
return cls(samples, labels)
[docs] def filter(self, context: str = None, strand: str = None, chr: str = None, genome: pl.DataFrame = None,
id: list[str] = None, coords: list[str] = None) -> MetageneFiles:
"""
:meth:`Metagene.filter` all metagenes.
Parameters
----------
context
Methylation context (CG, CHG, CHH) to filter (only one).
strand
Strand to filter (+ or -).
chr
Chromosome name to filter.
genome
DataFrame with annotation to filter with (e.g. from :class:`Genome`)
Returns
-------
Filtered :class:`MetageneFiles`.
See Also
--------
Metagene.filter : For examples.
"""
return self.__class__([sample.filter(context, strand, chr, genome, id, coords) for sample in self.samples], self.labels)
[docs] def trim_flank(self, upstream=True, downstream=True):
"""
:meth:`Metagene.trim_flank` all metagenes.
Parameters
----------
upstream
Trim upstream region?
downstream
Trim downstream region?
Returns
-------
Trimmed :class:`Metagene`
See Also
--------
Metagene.trim_flank : For examples.
"""
return self.__class__([sample.trim_flank(upstream, downstream) for sample in self.samples], self.labels)
[docs] def resize(self, to_fragments: int):
"""
:meth:`Metagene.resize` all metagenes.
Parameters
----------
to_fragments
Number of TOTAL (including flanking regions) fragments per gene.
Returns
-------
Resized :class:`Metagene`.
See Also
--------
Metagene.resize : For examples.
"""
return self.__class__([sample.resize(to_fragments) for sample in self.samples], self.labels)
[docs] def merge(self):
"""
Merge :class:`MetageneFiles` into single :class:`Metagene`
Warnings
--------
**ALL** metagenes in :class:`MetageneFiles` need to be aligned to the same annotation.
Parameters
----------
Returns
-------
Instance of :class:`Metagene`, with merged replicates.
"""
pl.enable_string_cache()
metadata = [sample.metadata for sample in self.samples]
upstream_windows = set([md.get("upstream_windows") for md in metadata])
gene_windows = set([md.get("gene_windows") for md in metadata])
downstream_windows = set([md.get("downstream_windows") for md in metadata])
if len(upstream_windows) == len(downstream_windows) == len(gene_windows) == 1:
merged = (
pl.concat([sample.report_df for sample in self.samples]).lazy()
.group_by(["strand", "context", "chr", "gene", "start", "id", "fragment"], maintain_order=True)
.agg([pl.sum("sum").alias("sum"), pl.sum("count").alias("count")])
.select(self.samples[0].report_df.columns)
).collect()
return Metagene(merged,
upstream_windows=list(upstream_windows)[0],
downstream_windows=list(downstream_windows)[0],
gene_windows=list(gene_windows)[0])
else:
raise Exception("Metadata for merge DataFrames does not match!")
[docs] def line_plot(
self,
resolution: int = None,
smooth: int = 50,
confidence: float = 0.,
stat: str = "wmean",
merge_strands: bool = True
):
"""
Create :class:`LinePlotFiles` method.
Parameters
----------
resolution
Number of fragments to resize to. Keep None if no resize is needed.
stat
Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN``
merge_strands
Does negative strand need to be reversed
confidence
Probability for confidence bands (e.g. 0.95)
smooth
Number of windows for
`SavGol <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_ filter
(set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified.
Returns
-------
:class:`LinePlot`
See Also
--------
Metagene.line_plot : for more information about `stat` parameter.
Examples
--------
>>> filtered = metagenes.filter("CG")
>>> lp = metagenes.line_plot()
Matplotlib version:
>>> figure = lp.draw_mpl()
>>> figure.show()
.. image:: ../../images/lineplot/ara_bradi_mpl.png
Plotly version
>>> figure = lp.draw_plotly()
>>> figure.show()
Now when we have metagene initlaized
.. image:: ../../images/lineplot/ara_bradi_plotly.png
"""
lp_data = [
sample.line_plot_data(resolution, smooth, confidence, stat, merge_strands, label)
for sample, label in zip(self.samples, self.labels)
]
return LinePlot(lp_data)
[docs] def heat_map(self, nrow: int = 100, ncol: int = None) -> HeatMap:
"""
Create :class:`HeatMapNew` method.
Parameters
----------
nrow
Number of fragments to resize to. Keep None if no resize is needed.
ncol
Number of columns in the resulting heat-map.
stat
Summary function to use for plot. Possible options: ``mean``, ``wmean``, ``log``, ``wlog``, ``qN``
Returns
-------
:class:`HeatMap`
See Also
--------
Metagene.line_plot : for more information about `stat` parameter.
Metagene.heat_map
Examples
--------
>>> filtered = metagenes.filter("CG")
>>> hm = metagenes.heat_map(nrow=100, ncol=100)
Matplotlib version:
>>> figure = hm.draw_mpl(major_labels=None)
>>> figure.show()
.. image:: ../../images/heatmap/ara_bradi_mpl.png
Plotly version
>>> figure = hm.draw_plotly(major_labels=None)
>>> figure.show()
Now when we have metagene initlaized
.. image:: ../../images/heatmap/ara_bradi_plotly.png
"""
hm_data = [
sample.heat_map_data(nrow, ncol, label)
for sample, label in zip(self.samples, self.labels)
]
return HeatMap(hm_data)
def box_plot_data(self, filter_context: Literal["CG", "CHG", "CHH"] | None = None):
return [sample.box_plot_data(filter_context, label) for label, sample in zip(self.labels, self.samples)]
# Todo add fixes from issue14-16#fix
[docs] def dendrogram(self, q: float = .75, **kwargs):
"""
Cluster samples by total methylation level of regions and draw dendrogram.
Parameters
----------
q
Quantile of regions, which will be clustered
kwargs
Keyword arguments for `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_
Returns
-------
``matplotlib.pyplot.Figure``
"""
gene_stats = []
for sample, label in zip(self.samples, self.labels):
gene_stats.append(
sample.report_df
.group_by(["chr", "strand", "start", "gene"])
.agg((pl.sum("sum") / pl.sum("count")).alias("density"))
.with_columns(pl.lit(label).alias("label"))
)
gene_set = set.intersection(*[set(stat["gene"].to_list()) for stat in gene_stats])
if len(gene_set) < 1:
raise ValueError("Region set intersection is empty. Are Metagenes read with same genome?")
gene_stats = [stat.filter(pl.col("gene").is_in(list(gene_set))) for stat in gene_stats]
dendro_data = pl.concat(gene_stats).pivot(values="density", columns="label", index="gene", aggregate_function="mean")
matrix = dendro_data.select(pl.all().exclude("gene")).to_pandas()
# Filter by variance
if q > 0:
var = matrix.to_numpy().var(1)
matrix = matrix[var > np.quantile(var, q)]
fig = sns.clustermap(matrix, **kwargs)
return fig
[docs] def cluster(self, count_threshold=5, na_rm: float | None = None):
"""
Cluster samples regions by methylation pattern.
Parameters
----------
count_threshold
Minimum number of reads per fragment to include it in analysis.
na_rm
Value to fill empty fragments. (None if not full regions need to be deleted)
Returns
-------
ClusterMany
"""
return ClusterMany(self, count_threshold, na_rm)