from __future__ import annotations
from pathlib import Path
import numpy as np
import polars as pl
from pyreadr import write_rds
from .Base import read_chromosomes, validate_chromosome_args
from .Plots import savgol_line, LinePlotData, LinePlot, BoxPlotData, BoxPlot
from .UniversalReader_classes import UniversalReader
[docs]class ChrLevels:
"""
Read report and visualize chromosome methylation levels
"""
[docs] def __init__(self, df: pl.DataFrame) -> None:
"""
Read report and visualize chromosome methylation levels
Parameters
----------
df
"""
self.report = df
# delete this in future and change to calculation of plot data
# when plot is drawn
self.plot_data = self.__calculate_plot_data(df)
@staticmethod
def __calculate_plot_data(df: pl.DataFrame):
group_cols = [pl.sum("sum"), pl.sum("count"), pl.min("start").alias("chr_pos")]
mut_cols = [(pl.col("sum") / pl.col("count")).alias("density")]
if "upper" in df.columns:
group_cols += [pl.mean("upper"), pl.mean("lower")]
return (
df
.sort(["chr", "window"])
.group_by(["chr", "window", "context"], maintain_order=True)
.agg(group_cols)
.group_by(["chr", "window"], maintain_order=True)
.agg(pl.all().exclude(["chr", "window"]))
.with_row_count("fragment")
.explode(pl.all().exclude(["chr", "window", "fragment"]))
.with_columns(mut_cols)
)
[docs] @classmethod
def from_bismark(
cls,
file: str | Path,
chr_min_length=10 ** 6,
window_length: int = 10 ** 6,
block_size_mb: int = 100,
use_threads: bool = False,
confidence: float = None
):
"""
Initialize ChrLevels with CX_report file
Parameters
----------
file
Path to the report file.
chr_min_length
Minimum length of chromosome to be analyzed
window_length
Length of windows in bp
block_size_mb
Size of batch in bytes, which will be read from report file (for report types other than "binom").
use_threads
Will reading be multithreaded.
confidence
Pvalue for confidence bands of the LinePlot.
"""
reader = UniversalReader(**(locals() | dict(report_type="bismark")))
args = validate_chromosome_args(chr_min_length, window_length, confidence)
report_df = read_chromosomes(**(locals() | args))
return cls(report_df)
[docs] @classmethod
def from_cgmap(
cls,
file: str | Path,
chr_min_length=10 ** 6,
window_length: int = 10 ** 6,
block_size_mb: int = 100,
use_threads: bool = False,
confidence: float = None
):
"""
Initialize ChrLevels with CGMap file
Parameters
----------
file
Path to the report file.
chr_min_length
Minimum length of chromosome to be analyzed
window_length
Length of windows in bp
block_size_mb
Size of batch in bytes, which will be read from report file (for report types other than "binom").
use_threads
Will reading be multithreaded.
confidence
Pvalue for confidence bands of the LinePlot.
"""
reader = UniversalReader(**(locals() | dict(report_type="cgmap")))
args = validate_chromosome_args(chr_min_length, window_length, confidence)
report_df = read_chromosomes(**(locals() | args))
return cls(report_df)
[docs] @classmethod
def from_bedGraph(
cls,
file: str | Path,
cytosine_file: str | Path,
chr_min_length=10 ** 6,
window_length: int = 10 ** 6,
block_size_mb: int = 100,
use_threads: bool = False,
confidence: float = None
):
"""
Initialize ChrLevels with CGMap file
Parameters
----------
file
Path to the report file.
cytosine_file
Path to preprocessed by :class:`Sequence` cytosine file.
chr_min_length
Minimum length of chromosome to be analyzed
window_length
Length of windows in bp
block_size_mb
Size of batch in bytes, which will be read from report file (for report types other than "binom").
use_threads
Will reading be multithreaded.
confidence
Pvalue for confidence bands of the LinePlot.
"""
reader = UniversalReader(**(locals() | dict(report_type="bedgraph")))
args = validate_chromosome_args(chr_min_length, window_length, confidence)
report_df = read_chromosomes(**(locals() | args))
return cls(report_df)
[docs] @classmethod
def from_coverage(
cls,
file: str | Path,
cytosine_file: str | Path,
chr_min_length=10 ** 6,
window_length: int = 10 ** 6,
block_size_mb: int = 100,
use_threads: bool = False,
confidence: float = None
):
"""
Initialize ChrLevels with CGMap file
Parameters
----------
file
Path to the report file.
cytosine_file
Path to preprocessed by :class:`Sequence` cytosine file.
chr_min_length
Minimum length of chromosome to be analyzed
window_length
Length of windows in bp
block_size_mb
Size of batch in bytes, which will be read from report file (for report types other than "binom").
use_threads
Will reading be multithreaded.
confidence
Pvalue for confidence bands of the LinePlot.
"""
reader = UniversalReader(**(locals() | dict(report_type="coverage")))
args = validate_chromosome_args(chr_min_length, window_length, confidence)
report_df = read_chromosomes(**(locals() | args))
return cls(report_df)
[docs] @classmethod
def from_binom(
cls,
file: str | Path,
chr_min_length=10 ** 6,
window_length: int = 10 ** 6,
confidence: float = None,
p_value: float = .05
):
"""
Initialize ChrLevels with .parquet file from :class:`Binom`.
Parameters
----------
file
Path to the report file.
chr_min_length
Minimum length of chromosome to be analyzed
window_length
Length of windows in bp
confidence
Pvalue for confidence bands of the LinePlot.
p_value
Pvalue with which cytosine will be considered methylated.
"""
reader = UniversalReader(**(locals() | dict(report_type="binom", methylation_pvalue=p_value)))
args = validate_chromosome_args(chr_min_length, window_length, confidence)
report_df = read_chromosomes(**(locals() | args))
return cls(report_df)
[docs] def save_plot_rds(self, path, compress: bool = False):
"""
Saves plot data in a rds DataFrame with columns:
+----------+---------+
| fragment | density |
+==========+=========+
| Int | Float |
+----------+---------+
"""
write_rds(path, self.plot_data.to_pandas(),
compress="gzip" if compress else None)
[docs] def filter(self, context: str = None, strand: str = None, chr: str = None):
"""
Filter chromosome methylation levels data.
Parameters
----------
context
Methylation context (CG, CHG, CHH) to filter (only one).
strand
Strand to filter (+ or -).
chr
Chromosome name to filter.
Returns
-------
:class:`ChrLevels`
"""
context_filter = self.report["context"] == context if context is not None else True
strand_filter = self.report["strand"] == strand if strand is not None else True
chr_filter = self.report["chr"] == chr if chr is not None else True
if context_filter is None and strand_filter is None and chr_filter is None:
return self
else:
return self.__class__(self.report.filter(context_filter & strand_filter & chr_filter))
@property
def _ticks_data(self):
ticks_data = self.plot_data.group_by("chr", maintain_order=True).agg(pl.min("fragment"))
x_lines = ticks_data["fragment"].to_numpy()
x_lines = np.append(x_lines, self.plot_data["fragment"].max())
x_ticks = (x_lines[1:] + x_lines[:-1]) // 2
# get middle ticks
x_labels = ticks_data["chr"].to_list()
return x_ticks, x_labels, x_lines
def line_plot_data(self, smooth: int = 0):
y = self.plot_data["density"].to_numpy()
upper, lower = None, None
if "upper" in self.plot_data.columns and np.isnan(self.plot_data["upper"].to_numpy()).sum() == 0:
upper = savgol_line(self.plot_data["upper"].to_numpy(), smooth) * 100
lower = savgol_line(self.plot_data["lower"].to_numpy(), smooth) * 100
y = savgol_line(y, smooth) * 100
x = np.arange(len(y))
x_ticks, x_labels, x_lines = self._ticks_data
return LinePlotData(x, y, x_ticks, x_lines, lower, upper, x_labels=x_labels)
[docs] def line_plot(self, smooth: int = 0):
return LinePlot(self.line_plot_data(smooth))
def box_plot_data(self):
pd_df = (
self.report
.group_by(["chr"])
.agg((pl.col("sum") / pl.col("count")).alias("density"))
.sort("chr")
)
data = pd_df.to_dict()
chroms = data["chr"]
values = data["density"]
return [BoxPlotData(value, chrom) for chrom, value in zip(chroms, values)]
[docs] def box_plot(self):
"""
Returns
-------
BoxPlot
"""
return BoxPlot(self.box_plot_data())