Source code for bsxplorer.Binom

from __future__ import annotations

from pathlib import Path
from typing import Literal

import polars as pl
from pyarrow import parquet as pq
from scipy.stats import binom

from .UniversalReader_batches import ARROW_SCHEMAS, ReportTypes
from .UniversalReader_classes import UniversalReader
from .utils import arrow2polars_convert


[docs]class BinomialData: """Calculates P-value for cytosine residues.""" def __init__(self, path): self.preprocessed_path = Path(path)
[docs] @classmethod def read_total_stats( cls, file: str | Path, report_type: ReportTypes, block_size_mb: int = 20, use_threads: bool = True, **kwargs ): """ Get methylation stats from methylation reports file. Parameters ---------- file Path to cytosine report. report_type Type of report. Possible types: "bismark", "cgmap", "bedgraph", "coverage". block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` kwargs Keyword agruements for reader (e.g. sequence) Returns ------- """ with UniversalReader(file, report_type, use_threads, block_size_mb=block_size_mb, **kwargs) as reader: metadata = dict(cytosine_residues=0, density_sum=0) for batch in reader: batch.filter_not_none() metadata["cytosine_residues"] += len(batch) metadata["density_sum"] += batch.data["density"].sum() return metadata
[docs] @classmethod def from_report( cls, file: str | Path, report_type: ReportTypes, block_size_mb: int = 20, use_threads: bool = True, min_coverage: int = 2, save: str | Path | bool = None, dir: str | Path = Path.cwd(), **kwargs ): """ Method to preprocess BS-seq cytosine report data by calculating methylation P-value for every cytosine (assuming distribution is binomial) that passes `min_coverage` threshold. Parameters ---------- file Path to cytosine report. report_type Type of report. Possible types: "bismark", "cgmap", "bedgraph", "coverage". save Name with which preprocessed file will be saved. If not provided - input file name is being used. min_coverage Minimal coverage for cytosine. block_size_mb Block size for reading. (Block size ≠ amount of RAM used. Reader allocates approx. Block size * 20 memory for reading.) use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` dir Path to working dir, where file will be saved. kwargs Keyword agruements for reader (e.g. sequence) Returns ------- BinomialData Instance of Binom class. """ file = Path(file) filename = "{tmp}{name}.binom.pq".format( tmp="tmp_" if save is False else "", name=file.stem if save is None else save ) save_path = Path(dir) / filename # Update metadata with total distribution stats metadata = cls.read_total_stats(file, report_type, block_size_mb, use_threads, **kwargs) # Calculating total p_values print("Writing p_values file into:", save_path.absolute()) total_probability = metadata["density_sum"] / metadata["cytosine_residues"] arrow_pvalue_schema = ARROW_SCHEMAS["binom"] polars_pvalue_schema = arrow2polars_convert(arrow_pvalue_schema) with pq.ParquetWriter(save_path, arrow_pvalue_schema) as pq_writer: with UniversalReader(file, report_type, use_threads, block_size_mb=block_size_mb, **kwargs) as reader: for batch in reader: filtered = batch.data.filter(pl.col("count_total") >= min_coverage) # Binomial test for cytosine methylation cdf_col = 1 - binom.cdf(filtered["count_m"].cast(pl.Int64) - 1, filtered["count_total"], total_probability) # Write to p_valued file p_valued = ( filtered.with_columns(pl.lit(cdf_col).cast(pl.Float64).alias("p_value")) .select(arrow_pvalue_schema.names) .cast(polars_pvalue_schema) ) pq_writer.write(p_valued.to_arrow().cast(arrow_pvalue_schema)) print("DONE") print(f"\nTotal cytosine residues: {metadata['cytosine_residues']}.\nAverage proportion of methylated reads to total reads for cytosine residue: {round(metadata['density_sum'] / metadata['cytosine_residues'] * 100, 3)}%") return cls(save_path)
[docs] def region_pvalue( self, genome: pl.DataFrame, methylation_pvalue: float = .05, use_threads: bool = True, save: str | Path | bool = None, dir: str | Path = Path.cwd() ): """ Map cytosines with provided annotation and calculate region methylation P-value (assuming distribution is binomial). Parameters ---------- genome DataFrame with annotation (e.g. from `Genome` class) methylation_pvalue P-value of cytosine methylation for it to be considered methylated. use_threads Do multi-threaded or single-threaded reading. If multi-threaded option is used, number of threads is defined by `multiprocessing.cpu_count()` save Name with which preprocessed file will be saved. If not provided - input file name is being used. dir Path to working dir, where file will be saved. Returns ------- RegionStat Instance of RegionStat class. Examples -------- If there no preprocessed file: >>> report_path = "/path/to/report.txt" >>> genome_path = "/path/to/genome.gff" >>> c_binom = bsxplorer.BinomialData.preprocess(report_path, report_type="bismark") >>> genome = bsxplorer.Genome.from_gff(genome_path).gene_body() >>> data = c_binom.region_pvalue(genome) >>> data shape: (3, 11) ┌─────────┬────────┬─────────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┐ │ chr ┆ strand ┆ id ┆ start ┆ end ┆ p_valu ┆ p_valu ┆ p_valu ┆ total ┆ total ┆ total │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ e_cont ┆ e_cont ┆ e_cont ┆ contex ┆ contex ┆ contex │ │ cat ┆ cat ┆ str ┆ u64 ┆ u64 ┆ ext_CG ┆ ext_CH ┆ ext_CH ┆ t_CG ┆ t_CHG ┆ t_CHH │ │ ┆ ┆ ┆ ┆ ┆ --- ┆ G ┆ H ┆ --- ┆ --- ┆ --- │ │ ┆ ┆ ┆ ┆ ┆ f64 ┆ --- ┆ --- ┆ i64 ┆ i64 ┆ i64 │ │ ┆ ┆ ┆ ┆ ┆ ┆ f64 ┆ f64 ┆ ┆ ┆ │ ╞═════════╪════════╪═════════╪═══════╪═══════╪════════╪════════╪════════╪════════╪════════╪════════╡ │ NC_0030 ┆ + ┆ gene-AT ┆ 3631 ┆ 5899 ┆ 1.0 ┆ 1.0 ┆ 1.0 ┆ 60 ┆ 82 ┆ 251 │ │ 70.9 ┆ ┆ 1G01010 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ │ NC_0030 ┆ - ┆ gene-AT ┆ 6788 ┆ 9130 ┆ 0.9992 ┆ 1.0 ┆ 1.0 ┆ 31 ┆ 55 ┆ 295 │ │ 70.9 ┆ ┆ 1G01020 ┆ ┆ ┆ 65 ┆ ┆ ┆ ┆ ┆ │ │ NC_0030 ┆ + ┆ gene-AT ┆ 11101 ┆ 11372 ┆ 1.0 ┆ 1.0 ┆ 1.0 ┆ 1 ┆ 8 ┆ 43 │ │ 70.9 ┆ ┆ 1G03987 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ └─────────┴────────┴─────────┴───────┴───────┴────────┴────────┴────────┴────────┴────────┴────────┘ If preprocessed file exists: >>> preprocessed_path = "/path/to/preprocessed.binom.pq" >>> c_binom = bsxplorer.BinomialData(preprocessed_path) >>> data = c_binom.region_pvalue(genome) """ polars_pvalue_schema = arrow2polars_convert(ARROW_SCHEMAS["binom"]) genome = genome.cast({k: v for k, v in polars_pvalue_schema.items() if k in genome.columns}) genome = genome.rename({"strand": "gene_strand"}) context_metadata = pl.DataFrame( schema={"context": polars_pvalue_schema["context"], "count_m": pl.Int64, "count_total": pl.Int64} ) gene_stats = None with UniversalReader( self.preprocessed_path, "binom", use_threads, methylation_pvalue=methylation_pvalue ) as reader: for full_batch in reader: # Extend context metadata context_metadata.extend( full_batch.data.group_by("context").agg([ pl.sum("count_m").cast(pl.Int64), pl.sum("count_total").cast(pl.Int64) ]) ) # Map on genome mapped = ( full_batch.data.lazy() .join_asof( genome.lazy(), left_on="position", right_on="start", strategy="backward", by="chr" ) .filter( # Limit by end of region pl.col('position') <= pl.col('downstream'), # Filter by strand if it is defined ((pl.col("gene_strand") != ".") & (pl.col("gene_strand") == pl.col("strand"))) | (pl.col("gene_strand") == "."), pl.col("start").is_not_nan() ) .group_by(["chr", "start", "context"], maintain_order=True) .agg([ pl.first("gene_strand"), pl.first("end"), pl.first("id"), pl.sum("count_m"), pl.sum("count_total") ]) .rename({"gene_strand": "strand"}) .collect() ) if gene_stats is None: gene_stats = mapped else: gene_stats.extend(mapped) context_metadata = context_metadata.group_by("context").agg([pl.sum("count_m"), pl.sum("count_total")]) # Calculate region pvalues result = pl.DataFrame(schema=gene_stats.schema | {"p_value": pl.Float64}) for metadata_row in context_metadata.iter_rows(named=True): total_probability = metadata_row["count_m"] / metadata_row["count_total"] filtered = gene_stats.filter(pl.col("context") == metadata_row["context"]) cdf = 1 - binom.cdf(filtered["count_m"].cast(pl.Int64) - 1, filtered["count_total"], total_probability) result.extend(filtered.with_columns(pl.lit(cdf, pl.Float64).alias("p_value"))) print(f"{metadata_row['context']}\tTotal sites: {metadata_row['count_total']}, methylated: {metadata_row['count_m']}\t({round(total_probability * 100, 2)}%)") result = result.select(["chr", "start", "end", "id", "strand", "context", "p_value", "count_m", "count_total"]) filename = "{tmp}{name}.tsv".format( tmp="tmp_" if save is False else "", name=self.preprocessed_path.stem if save is None else save ) save_path = Path(dir) / filename if save: result.write_csv(save_path.absolute(), include_header=False, separator="\t") print("Saved into:", save_path) return RegionStat.from_expanded(result)
class Filter: __slots__ = ["expr"] def __le__(self, other): self.expr = self.expr.le(other) return self def __lt__(self, other): self.expr = self.expr.lt(other) return self def __ge__(self, other): self.expr = self.expr.ge(other) return self def __gt__(self, other): self.expr = self.expr.gt(other) return self def __eq__(self, other): self.expr = self.expr.eq(other) return self def __and__(self, other: PValueFilter | CountFilter): self.expr = self.expr.and_(other.expr) return self def __or__(self, other: PValueFilter | CountFilter): self.expr = self.expr.or_(other.expr) return self class PValueFilter(Filter): def __init__(self, context: str): self.context = context self.expr = pl.col(f"p_value_context_{context}") class CountFilter(Filter): def __init__(self, context: str): super().__init__() self.context = context self.expr = pl.col(f"count_total_context_{context}")
[docs]class RegionStat: """ Class for manipulation with region P-value data. Attributes ---------- data : polars.DataFrame Region stats DataFrame """ schema = { 'chr': pl.String, 'start': pl.UInt64, 'end': pl.UInt64, 'id': pl.String, 'strand': pl.String, 'context': pl.String, 'p_value': pl.Float64, 'count_m': pl.UInt32, 'count_total': pl.UInt32 } def __init__(self, data: pl.DataFrame = None): """ Class for manipulation with region P-value data. Warnings -------- Do not call this method directly. Parameters ---------- data : polars.DataFrame Region stats DataFrame """ self.data = data
[docs] @classmethod def from_expanded(cls, df: pl.DataFrame): """ Generate Instance of RegionStat class from DataFrame with context column expanded (e.g. output of `Binom.gene_pvalue()` function) Parameters ---------- df DataFrame with context column expanded Returns ------- RegionStat Instance of RegionStat class """ gene_stats = ( df .filter(df.select(["chr", "strand", "start", "end", "context"]).is_duplicated().not_()) .pivot(values=["p_value", "count_m", "count_total"], columns="context", index=list(set(df.columns) - {"p_value", "context", "count_m", "count_total"})) ) return cls(gene_stats)
[docs] @classmethod def from_csv(cls, file: str | Path): """ Read RegionStat data from preprocessed and saved with :method:`RegionStat.save` Parameters ---------- file Path to file. """ if not file.exists(): raise FileNotFoundError(file) df = pl.read_csv(file, has_header=False, separator="\t", schema=cls.schema) return cls.from_expanded(df)
[docs] def filter( self, context: Literal["CG", "CHG", "CHH"] = None, op: Literal["<=", "<", ">", ">="] = None, p_value: float = .05, min_n: int = 0 ): """ Filter RegionStat class by P-value of methylation in selected context or minimal region counts. E.g. :math:`P_{CG}\leq0.05` or :math:`N_{CG}\geq20` Minimal counts are compared only with :math:`\geq` operation. Parameters ---------- context Methylation context (CG, CHG, CHH). op Comparative operation (<=, <, >, >=). p_value P-value for operation. min_n Minimal counts for cytosines methylated in selected context. Returns ------- RegionStat Filtered class. Examples -------- >>> data = BinomialData("./A_thaliana.binom.pq").region_pvalue(genome) >>> data shape: (3, 11) ┌─────────┬────────┬─────────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┐ │ chr ┆ strand ┆ id ┆ start ┆ end ┆ p_valu ┆ p_valu ┆ p_valu ┆ total_ ┆ total_ ┆ total_ │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ e_cont ┆ e_cont ┆ e_cont ┆ contex ┆ contex ┆ contex │ │ cat ┆ cat ┆ str ┆ u64 ┆ u64 ┆ ext_CG ┆ ext_CH ┆ ext_CH ┆ t_CG ┆ t_CHG ┆ t_CHH │ │ ┆ ┆ ┆ ┆ ┆ --- ┆ G ┆ H ┆ --- ┆ --- ┆ --- │ │ ┆ ┆ ┆ ┆ ┆ f64 ┆ --- ┆ --- ┆ i64 ┆ i64 ┆ i64 │ │ ┆ ┆ ┆ ┆ ┆ ┆ f64 ┆ f64 ┆ ┆ ┆ │ ╞═════════╪════════╪═════════╪═══════╪═══════╪════════╪════════╪════════╪════════╪════════╪════════╡ │ NC_0030 ┆ + ┆ gene-AT ┆ 3631 ┆ 5899 ┆ 1.0 ┆ 1.0 ┆ 1.0 ┆ 60 ┆ 82 ┆ 251 │ │ 70.9 ┆ ┆ 1G01010 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ │ NC_0030 ┆ - ┆ gene-AT ┆ 6788 ┆ 9130 ┆ 0.9992 ┆ 1.0 ┆ 1.0 ┆ 31 ┆ 55 ┆ 295 │ │ 70.9 ┆ ┆ 1G01020 ┆ ┆ ┆ 65 ┆ ┆ ┆ ┆ ┆ │ │ NC_0030 ┆ + ┆ gene-AT ┆ 11101 ┆ 11372 ┆ 1.0 ┆ 1.0 ┆ 1.0 ┆ 1 ┆ 8 ┆ 43 │ │ 70.9 ┆ ┆ 1G03987 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ └─────────┴────────┴─────────┴───────┴───────┴────────┴────────┴────────┴────────┴────────┴────────┘ >>> data.filter("CG", "<", 0.05, 20) shape: (3, 11) ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐ │ chr ┆ strand ┆ id ┆ start ┆ end ┆ p_valu ┆ p_valu ┆ p_valu ┆ total_ ┆ total_ ┆ total_ │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ e_cont ┆ e_cont ┆ e_cont ┆ contex ┆ contex ┆ contex │ │ cat ┆ cat ┆ str ┆ u64 ┆ u64 ┆ ext_CG ┆ ext_CH ┆ ext_CH ┆ t_CG ┆ t_CHG ┆ t_CHH │ │ ┆ ┆ ┆ ┆ ┆ --- ┆ G ┆ H ┆ --- ┆ --- ┆ --- │ │ ┆ ┆ ┆ ┆ ┆ f64 ┆ --- ┆ --- ┆ i64 ┆ i64 ┆ i64 │ │ ┆ ┆ ┆ ┆ ┆ ┆ f64 ┆ f64 ┆ ┆ ┆ │ ╞════════╪════════╪════════╪════════╪════════╪════════╪════════╪════════╪════════╪════════╪════════╡ │ NC_003 ┆ + ┆ gene-A ┆ 23121 ┆ 31227 ┆ 9.9920 ┆ 1.0 ┆ 1.0 ┆ 123 ┆ 171 ┆ 604 │ │ 070.9 ┆ ┆ T1G010 ┆ ┆ ┆ e-16 ┆ ┆ ┆ ┆ ┆ │ │ ┆ ┆ 40 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ │ NC_003 ┆ - ┆ gene-A ┆ 121067 ┆ 130577 ┆ 0.0048 ┆ 1.0 ┆ 1.0 ┆ 233 ┆ 338 ┆ 1169 │ │ 070.9 ┆ ┆ T1G013 ┆ ┆ ┆ 71 ┆ ┆ ┆ ┆ ┆ │ │ ┆ ┆ 20 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ │ NC_003 ┆ - ┆ gene-A ┆ 192634 ┆ 193670 ┆ 0.0005 ┆ 0.2612 ┆ 0.9915 ┆ 20 ┆ 22 ┆ 143 │ │ 070.9 ┆ ┆ T1G015 ┆ ┆ ┆ 71 ┆ 87 ┆ 5 ┆ ┆ ┆ │ │ ┆ ┆ 30 ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ │ └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘ """ if context is not None and op is not None: if op == "<": expr = pl.col(f"p_value_context_{context}") < p_value elif op == "<=": expr = pl.col(f"p_value_context_{context}") <= p_value elif op == ">=": expr = pl.col(f"p_value_context_{context}") >= p_value else: expr = pl.col(f"p_value_context_{context}") > p_value else: expr = True return self.__class__( self.data .filter(expr) .filter(pl.col(f"count_total_context_{context}") >= min_n) )
[docs] def save(self, path: str | Path): """ Save regions as BED-like file. Parameters ---------- path Path where file needs to be saved. """ path = Path(path) ( self.data .select(["chr", "start", "end", "id", "strand"]) .write_csv(path, include_header=False, separator="\t") )
[docs] def categorise( self, context: Literal["CG", "CHG", "CHH"] = None, p_value: float = .05, min_n: int = 0, save: str | Path = None ) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]: """ Categorise regions as BM (Body Methylation), IM (Intermediate Methylated) and UM (Undermethylated) according to `Takuno and Gaut <https://doi.org/10.1073/pnas.1215380110>`_ E.g. for CG: :math:`P_{CG}<0.05, P_{CG}\geq0.05, P_{CG}\geq0.05\ and\ N_{CG}\geq20` Parameters ---------- context Methylation context (CG, CHG, CHH). p_value P-value for operation. min_n Minimal counts for cytosines methylated in selected context. save Path where files with BM, IM, UM genes will be saved (None if saving not needed). Returns ------- tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame] BM, IM, UM :class:`pl.DataFrame` """ other_contexts = [ c for c in map(lambda name: name.replace("p_value_context_", ""), self.data.select("^p_value_context_.+$").columns) if c != context ] assert isinstance(context, str) bm_filter = (PValueFilter(context) < p_value) & (CountFilter(context) >= min_n) im_filter = (PValueFilter(context) >= p_value) & (PValueFilter(context) < 1 - p_value) & (CountFilter(context) >= min_n) um_filter = (PValueFilter(context) >= 1 - p_value) & (CountFilter(context) >= min_n) for other_context in other_contexts: bm_filter = bm_filter & (PValueFilter(other_context) >= p_value) im_filter = im_filter & (PValueFilter(other_context) >= p_value) um_filter = um_filter & (PValueFilter(other_context) >= p_value) bm = self.__class__(self.data.filter(bm_filter.expr)) im = self.__class__(self.data.filter(im_filter.expr)) um = self.__class__(self.data.filter(um_filter.expr)) if save is not None: for df, df_type in zip([bm, im, um], ["BM", "IM", "UM"]): save_path = Path(save) df.save(save_path.with_name(save_path.name + "_" + df_type).with_suffix(".tsv")) return bm.data, im.data, um.data
def __len__(self): return len(self.data) def __repr__(self): return self.data.__repr__() def __str__(self): return self.data.__str__()