Source code for bsxplorer.GenomeClass

from __future__ import annotations

import itertools
from dataclasses import dataclass

import numpy as np
import polars as pl
from pathlib import Path

from matplotlib import pyplot as plt

from .Plots import savgol_line
from .utils import MetageneSchema


[docs]class Genome: """ Storing and manipulating annotation data. """ def __init__(self, genome: pl.LazyFrame): """ Constructor for :class:`Genome`. Shouldn't be called directly. Parameters ---------- genome `polars.LazyFrame <https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html>`_ """ self.genome = self.validate(genome)
[docs] def raw(self): """ This method returns raw Genome DataFrame without any filtering and ranges manipulation. Returns ------- ``pl.DataFrame`` """ return self.genome.collect()
[docs] @classmethod def validate(cls, genome): if "id" not in genome.columns: genome = genome.with_columns(id=pl.lit("")) if "type" not in genome.columns: genome = genome.with_columns(type=pl.lit("")) if "strand" not in genome.columns: genome = genome.with_columns(strand=pl.lit(".")) return genome.select(list(cls._schema.keys())).cast(cls._schema)
_schema = { "chr": pl.Utf8, "type": pl.Utf8, "start": MetageneSchema.position, "end": MetageneSchema.position, "strand": pl.Utf8, "id": pl.Utf8, }
[docs] @classmethod def from_custom(cls, file: str | Path, chr_col: int = 0, start_col: int = 1, end_col: int = 2, id_col: int = None, strand_col: int | None = None, type_col: int = None, comment_char: str = "#", has_header: bool = False, read_filters: pl.Expr = None ) -> Genome: """ Create :class:`Genome` from custom tab separated file with genomic regions. Warnings -------- Index for columns starts from 0! Parameters ---------- file Path to file. chr_col Index of chromosome column. start_col Index of region start column. end_col Index of region end column. id_col Index of id column (if exists). strand_col Index of strand column. type_col Index of type column (if exists). comment_char Character for comments in file. has_header Does file have header. read_filters Filter annotation by `polars.Expr <https://docs.pola.rs/py-polars/html/reference/expressions/index.html>`_ Returns ------- Instance of :class:`Genome` """ if any(val is None for val in [chr_col, start_col, end_col]): raise Exception("All position columns need to be specified!") genes = ( pl.scan_csv( file, comment_char=comment_char, has_header=has_header, separator='\t' ) ) cols = genes.columns select_cols = [ pl.col(cols[chr_col]).alias("chr"), (pl.col(cols[type_col]) if type_col is not None else pl.lit(None)).alias("type"), pl.col(cols[start_col]).alias("start"), pl.col(cols[end_col]).alias("end"), (pl.col(cols[strand_col]) if strand_col is not None else pl.lit(".")).alias("strand"), (pl.col(cols[id_col]) if id_col is not None else pl.lit("")).alias("id"), ] genes = ( genes .with_columns(select_cols) .select(["chr", "type", "start", "end", "strand", "id"]) .sort(["chr", "start"]) .filter(True if read_filters is None else read_filters) ) print(f"Genome read from {file}") return cls(genes)
[docs] @classmethod def from_gff(cls, file: str | Path) -> Genome: """ Constructor for :class:`Genome` class from .gff file. Parameters ---------- file Path to .gff file. Returns ------- Instance of :class:`Genome`. """ id_regex = "^ID=([^;]+)" genome = cls.from_custom(file, 0, 3, 4, 8, 6, 2, "#", False) genome.genome = genome.genome.with_columns( pl.col("id").str.extract(id_regex) ) return genome
[docs] def all(self, min_length: int = 0, flank_length: int = 0) -> pl.DataFrame: """ Filter annotation and calculate positions of flanking regions. Parameters ---------- min_length Region length threshold. flank_length Length of flanking regions. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.all(min_length = 0) shape: (710_650, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬─────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪═════════════════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ 5899 ┆ 1631 ┆ 7899 ┆ gene-AT1G01010 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 153878 ┆ 154312 ┆ 151878 ┆ 156312 ┆ cds-NP_051123.1 │ │ NC_000932.1 ┆ ? ┆ 69611 ┆ 140650 ┆ 67611 ┆ 142650 ┆ rna-ArthCp047 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴─────────────────┘ """ genes = self.__filter_genes( self.genome, None, min_length, flank_length).collect() genes = self.__trim_genes(genes, flank_length) return self.__check_empty(genes)
[docs] def gene_body(self, min_length: int = 0, flank_length: int = 2000) -> pl.DataFrame: """ Filter annotation by type == gene and calculate positions of flanking regions. Warnings -------- This method will have empty output, if type is not specified in input file. Parameters ---------- min_length Region length threshold. flank_length Length of flanking regions. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.gene_body(min_length=2000, flank_length=2000) shape: (14_644, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪════════════════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ 5899 ┆ 1631 ┆ 7899 ┆ gene-AT1G01010 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 104691 ┆ 107500 ┆ 102691 ┆ 109500 ┆ gene-ArthCr087 │ │ NC_000932.1 ┆ + ┆ 141485 ┆ 143708 ┆ 139485 ┆ 145708 ┆ gene-ArthCp086 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴────────────────┘ """ genes = self.__filter_genes(self.genome, 'gene', min_length, flank_length).collect() genes = self.__trim_genes(genes, flank_length) return self.__check_empty(genes)
[docs] def exon(self, min_length: int = 100) -> pl.DataFrame: """ Filter annotation by type == exon and length threshold. Warnings -------- This method will have empty output, if type is not specified in input file. Parameters ---------- min_length Region length threshold. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.exon(min_length=200) shape: (132_822, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬────────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪════════════════════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ 3913 ┆ 3631 ┆ 3913 ┆ exon-NM_099983.2-1 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 152806 ┆ 153195 ┆ 152806 ┆ 153195 ┆ exon-ArthCp085-1 │ │ NC_000932.1 ┆ + ┆ 153878 ┆ 154312 ┆ 153878 ┆ 154312 ┆ exon-ArthCp085-2 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴────────────────────┘ """ flank_length = 0 genes = self.__filter_genes( self.genome, 'exon', min_length, flank_length).collect() genes = self.__trim_genes(genes, flank_length) return self.__check_empty(genes)
[docs] def cds(self, min_length: int = 100) -> pl.DataFrame: """ Filter annotation by type == CDS and length threshold. Warnings -------- This method will have empty output, if type is not specified in input file. Parameters ---------- min_length Region length threshold. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.cds(min_length=200) shape: (81_837, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬─────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪═════════════════╡ │ NC_003070.9 ┆ + ┆ 3996 ┆ 4276 ┆ 3996 ┆ 4276 ┆ cds-NP_171609.1 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 152806 ┆ 153195 ┆ 152806 ┆ 153195 ┆ cds-NP_051123.1 │ │ NC_000932.1 ┆ + ┆ 153878 ┆ 154312 ┆ 153878 ┆ 154312 ┆ cds-NP_051123.1 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴─────────────────┘ """ flank_length = 0 genes = self.__filter_genes( self.genome, 'CDS', min_length, flank_length).collect() genes = self.__trim_genes(genes, flank_length) return self.__check_empty(genes)
[docs] def near_TSS(self, min_length: int = 4000, flank_length: int = 2000): """ Filter annotation by type == gene and calculate positions of near TSS regions. Warnings -------- This method will have empty output, if type is not specified in input file. Use down_windows=0 for Metagene. Parameters ---------- min_length Region length threshold. flank_length Length of flanking regions. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.near_TSS(min_length=2000, flank_length=2000) shape: (14_644, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪════════════════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ 5631 ┆ 1631 ┆ 5631 ┆ gene-AT1G01010 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 104691 ┆ 106691 ┆ 102691 ┆ 106691 ┆ gene-ArthCr087 │ │ NC_000932.1 ┆ + ┆ 141485 ┆ 143485 ┆ 139485 ┆ 143485 ┆ gene-ArthCp086 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴────────────────┘ """ ''' upstream_length = ( # when before length is enough # we set upstream length to specified pl.when(pl.col('upstream') >= flank_length).then(flank_length) # when genes are intersecting (current start < previous end) # we don't take this as upstream region .when(pl.col('upstream') < 0).then(0) # when length between genes is not enough for full specified length # we divide it into half .otherwise((pl.col('upstream') - (pl.col('upstream') % 2)) // 2) ) ''' upstream_length = flank_length gene_type = "gene" genes = self.__filter_genes( self.genome, gene_type, min_length, flank_length) genes = ( genes .groupby(['chr', 'strand'], maintain_order=True) .agg([ pl.col('start'), # upstream shift (pl.col('start').shift(-1) - pl.col('end')).shift(1) .fill_null(flank_length) .alias('upstream'), pl.col('id') ]) .explode(['start', 'upstream', 'id']) .with_columns([ (pl.col('start') - upstream_length).alias('upstream'), (pl.col("start") + flank_length).alias("end") ]) .with_columns(pl.col("end").alias("downstream")) .select(['chr', 'strand', 'start', 'end', 'upstream', 'downstream', 'id']) ).collect() return self.__check_empty(genes)
[docs] def near_TES(self, min_length: int = 4000, flank_length: int = 2000): """ Filter annotation by type == gene and calculate positions of near TES regions. Warnings -------- This method will have empty output, if type is not specified in input file. Use down_windows=0 for Metagene. Parameters ---------- min_length Region length threshold. flank_length Length of flanking regions. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.near_TES(min_length=2000, flank_length=2000) shape: (14_644, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪════════════════╡ │ NC_003070.9 ┆ + ┆ 3899 ┆ 5899 ┆ 3899 ┆ 7899 ┆ gene-AT1G01010 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 105500 ┆ 107500 ┆ 105500 ┆ 109500 ┆ gene-ArthCr087 │ │ NC_000932.1 ┆ + ┆ 141708 ┆ 143708 ┆ 141708 ┆ 145708 ┆ gene-ArthCp086 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴────────────────┘ """ # decided not to use this ''' downstream_length = ( # when before length is enough # we set upstream length to specified pl.when(pl.col('downstream') >= flank_length).then(flank_length) # when genes are intersecting (current start < previous end) # we don't take this as upstream region .when(pl.col('downstream') < 0).then(0) # when length between genes is not enough for full specified length # we divide it into half .otherwise((pl.col('downstream') - pl.col('downstream') % 2) // 2) ) ''' downstream_length = flank_length gene_type = "gene" genes = self.__filter_genes( self.genome, gene_type, min_length, flank_length) genes = ( genes .groupby(['chr', 'strand'], maintain_order=True).agg([ pl.col('end'), # downstream shift (pl.col('start').shift(-1) - pl.col('end')) .fill_null(flank_length) .alias('downstream'), pl.col('id') ]) .explode(['end', 'downstream', 'id']) .with_columns([ (pl.col('end') + downstream_length).alias('downstream'), (pl.col("end") - flank_length).alias("start") ]) .with_columns(pl.col("start").alias("upstream")) .select(['chr', 'strand', 'start', 'end', 'upstream', 'downstream', 'id']) ).collect() return self.__check_empty(genes)
[docs] def other(self, region_type: str, min_length: int = 1000, flank_length: int = 100) -> pl.DataFrame: """ Filter annotation by selected type and calculate positions of nflanking regions. Warnings -------- This method will have empty output, if type is not specified in input file. If no flanking regions are needed – enter flank_length=0. Parameters ---------- region_type Filter annotation by region type from gff. min_length Region length threshold. flank_length Length of flanking regions. Returns ------- Return :class:`polars.DataFrame` for downstream usage. Examples -------- >>> path = "/path/to/genome.gff" >>> genome = genome.from_gff(path) >>> genome.other("region") shape: (45_888, 7) ┌─────────────┬────────┬────────┬────────┬──────────┬────────────┬─────────────────┐ │ chr ┆ strand ┆ start ┆ end ┆ upstream ┆ downstream ┆ id │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ u64 ┆ u64 ┆ u64 ┆ u64 ┆ str │ ╞═════════════╪════════╪════════╪════════╪══════════╪════════════╪═════════════════╡ │ NC_003070.9 ┆ + ┆ 3631 ┆ 5899 ┆ 3531 ┆ 5999 ┆ rna-NM_099983.2 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ NC_000932.1 ┆ + ┆ 152806 ┆ 154312 ┆ 152706 ┆ 154412 ┆ rna-ArthCp085 │ │ NC_000932.1 ┆ ? ┆ 69611 ┆ 140650 ┆ 69511 ┆ 140750 ┆ rna-ArthCp047 │ └─────────────┴────────┴────────┴────────┴──────────┴────────────┴─────────────────┘ """ genes = self.__filter_genes( self.genome, region_type, min_length, flank_length).collect() genes = self.__trim_genes(genes, flank_length) return self.__check_empty(genes)
@staticmethod def __filter_genes(genes, gene_type, min_length, flank_length): if gene_type is not None: genes = genes.filter(pl.col('type') == gene_type).drop('type') else: genes = genes.drop("type") # filter genes, which start < flank_length if flank_length > 0: genes = genes.filter(pl.col('start') > flank_length) # filter genes which don't pass length threshold if min_length > 0: genes = genes.filter((pl.col('end') - pl.col('start')) > min_length) return genes @staticmethod def __trim_genes(genes, flank_length) -> pl.DataFrame: # upstream shift # calculates length to previous gene on same chr_strand length_before = (pl.col('start').shift(-1) - pl.col('end')).shift(1).fill_null(flank_length) # downstream shift # calculates length to next gene on same chr_strand length_after = (pl.col('start').shift(-1) - pl.col('end')).fill_null(flank_length) # decided not to use this conditions ''' upstream_length_conditioned = ( # when before length is enough # we set upstream length to specified pl.when(pl.col('upstream') >= flank_length).then(flank_length) # when genes are intersecting (current start < previous end) # we don't take this as upstream region .when(pl.col('upstream') < 0).then(0) # when length between genes is not enough for full specified length # we divide it into half .otherwise((pl.col('upstream') - (pl.col('upstream') % 2)) // 2) ) downstream_length_conditioned = ( # when before length is enough # we set upstream length to specified pl.when(pl.col('downstream') >= flank_length).then(flank_length) # when genes are intersecting (current start < previous end) # we don't take this as upstream region .when(pl.col('downstream') < 0).then(0) # when length between genes is not enough for full specified length # we divide it into half .otherwise((pl.col('downstream') - pl.col('downstream') % 2) // 2) ) ''' if (genes["end"] < genes["start"]).sum() > 0: forward = ( genes.filter(pl.col("start") <= pl.col("end")) .with_columns(pl.lit("+").alias("strand")) ) reverse = ( genes.filter(pl.col("start") > pl.col("end")) .with_columns(pl.lit("-").alias("strand")) .rename({"start": "end", "end": "start"}) .select(forward.columns) ) genes = pl.concat([forward, reverse]).sort(["chr", "start"]) trimmed = ( genes .groupby(['chr', 'strand'], maintain_order=True).agg([ pl.col('start'), pl.col('end'), length_before.alias('upstream'), length_after.alias('downstream'), pl.col('id') ]) .explode(['start', 'end', 'upstream', 'downstream', 'id']) .with_columns([ # calculates length of region (pl.col('start') - flank_length).alias('upstream'), # calculates length of region (pl.col('end') + flank_length).alias('downstream') ]) ) return trimmed @staticmethod def __check_empty(genes): if len(genes) > 0: return genes else: raise Exception("Genome DataFrame is empty. Are you sure input file is valid?")
@dataclass class RegAlignResult: """ Class for storing regions aligned to outer set of regions Attributes ---------- gene_body Regions, which middle coordinate is inside outer region. upstream Regions, which middle coordinate is in upstream of outer region. downstream Regions, which middle coordinate is in downstream of outer region. intergene Regions, which middle coordinate not in flanking regions nor gene body. """ gene_body: pl.DataFrame upstream: pl.DataFrame downstream: pl.DataFrame intergene: pl.DataFrame flank_length: int def ref_positions(self): """ This function calculates the coordinates normalized with respect to the length of the outer region Returns ------- polars.DataFrame for upstream, gene_body and downstream relative positions. """ def ref_pos_expr(for_column: str): expr = ( pl.when( pl.col(for_column) < pl.col("areg_start") ).then( (pl.col("areg_start") - pl.col(for_column)) / self.flank_length * -1 ).when( pl.col(for_column) > pl.col("areg_end") ).then( (pl.col(for_column) - pl.col("areg_end")) / self.flank_length + 1 ).otherwise( (pl.col(for_column) - pl.col("areg_start")) / pl.col("length") ) ) return expr ref_positions = ( self.near_reg() .cast(dict(areg_start=pl.Int64, areg_end=pl.Int64, start=pl.Int64, end=pl.Int64)) .with_columns([ (pl.col("areg_end") - pl.col("areg_start")).alias("length"), pl.when(pl.col("strand") == "-").then(0 - pl.col("end")).otherwise(pl.col("start")).alias("start"), pl.when(pl.col("strand") == "-").then(0 - pl.col("areg_end")).otherwise(pl.col("areg_start")).alias( "areg_start"), pl.when(pl.col("strand") == "-").then(0 - pl.col("start")).otherwise(pl.col("end")).alias("end"), pl.when(pl.col("strand") == "-").then(0 - pl.col("areg_start")).otherwise(pl.col("areg_end")).alias( "areg_end"), ]) .with_columns([ ref_pos_expr("start").alias("ref_start"), ref_pos_expr("end").alias("ref_end"), ]) # .select(["ref_start", "ref_end"]) .with_columns([ pl.concat_list([pl.col("ref_start"), pl.col("ref_end")]).alias("ref_pos"), pl.lit([1, -1]).alias("is_start") ]) .explode(["ref_pos", "is_start"]) .sort("ref_pos") .with_columns(pl.col("is_start").cum_sum().alias("coverage")) ) return ref_positions def metagene_coverage(self): """ This function calculates the coverage of the metagene by the aligned regions. Returns ------- tuple(relative_positions, coverage) """ ref_positions = self.ref_positions() pos = ref_positions["ref_pos"].to_numpy() cov = ref_positions["coverage"].to_numpy() interval_values = (-1 <= pos) & (pos <= 2) return pos[interval_values], cov[interval_values] def plot_density_mpl( self, fig_axes: tuple = None, flank_windows: int = None, body_windows: int = None, smooth: int = None, norm: bool = False, tick_labels: list[str] = None, label: str = None, **mpl_kwargs ): """ Plot coverage, returned by :func:`RegAlignResult.metagene_coverage` Parameters ---------- fig_axes Tuple of ( `matplotlib.pyplot.Figure <https://matplotlib.org/stable/api/figure_api.html#matplotlib.figure.Figure>`_, `matplotlib.axes.Axes <https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.html#matplotlib.axes.Axes>`_). New are created if ``None`` flank_windows Number of windows for flanking regions (set None for no resampling). body_windows Number of windows for body region (set None for no resampling). smooth Number of windows for `SavGol <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_ filter (set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified. norm Should the output plot be normalized by maximum coverage. tick_labels Labels for upstream, body region start and end, downstream (e.g. TSS, TES). **Exactly 5** need to be provided. Set ``None`` to disable. label Label of line on line-plot mpl_kwargs Keyword arguments for `matplotlib.plot <https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.plot.html>`_ Returns ------- ``matplotlib.pyplot.Figure`` """ fig, axes = plt.subplots() if fig_axes is None else fig_axes tick_labels = ["Upstream", "TSS", "Body", "TES", "Downstream"] if tick_labels is None else tick_labels pos, cov = self.metagene_coverage() xticks = [-1, 0, .5, 1, 2] if flank_windows is not None and body_windows is not None: resampled_pos = [] resampled_cov = [] for start, stop, windows in zip([-1, 0, 1], [0, 1, 2], [flank_windows, body_windows, flank_windows]): points, step = np.linspace(start, stop, windows + 1, retstep=True) indexes = [(start <= pos) & (pos < (start + step)) for start in points[:-1]] resampled_pos.append(points[:-1]) resampled_cov.append(np.array([cov[index].mean() if cov[index].size != 0 else 0 for index in indexes])) cov = np.concatenate(resampled_cov) pos = np.array(list(range(len(cov)))) valid = np.isnan(cov) cov = np.interp(pos, pos[~valid], cov[~valid]) if (~valid).sum() > 1 else np.full_like(pos, 0) cov = savgol_line(cov, smooth) xticks = [0, flank_windows, flank_windows + body_windows / 2, flank_windows + body_windows, flank_windows * 2 + body_windows] else: new_cov = [] new_pos = [] for p_prev, p_next, c_prev, c_next in zip(pos[:-1], pos[1:], cov[:-1], cov[1:]): if c_next > c_prev: new_pos += [p_prev, p_prev] new_cov += [c_prev, c_next] elif c_next < c_prev: new_pos += [p_next, p_next] new_cov += [c_prev, c_next] else: new_pos += [p_prev] new_cov += [c_prev] pos = np.array(new_pos) cov = np.array(new_cov) if norm and cov.size > 0: cov = cov / cov.max() axes.plot(pos, cov, label=label, **mpl_kwargs) axes.set_xticks(xticks, labels=tick_labels) axes.set_title("Сoverage of genes and flanking regions by DMRs") axes.set_xlabel('Position') axes.set_ylabel('Density') axes.axvline(x=xticks[1], linestyle='--', color='k', alpha=.3) axes.axvline(x=xticks[3], linestyle='--', color='k', alpha=.3) return fig def near_reg(self) -> pl.DataFrame: return pl.concat([self.upstream, self.gene_body, self.downstream]) def areg_ids(self) -> list[str]: return self.near_reg()["id"].unique().to_list() def areg_stats(self) -> pl.DataFrame: return ( self.near_reg() .group_by(["chr", "areg_start", "areg_end"]) .agg([ pl.first("id"), pl.len().alias("count"), (pl.col("end") - pl.col("start")).mean().alias("mean_length") ]) .sort("count", descending=True) ) def align_regions( regions: pl.DataFrame, along_regions: pl.DataFrame, flank_length: int = 2000 ): total = [] DeprecationWarning("This method will be removed in future versions. Please use Enrichment class.") # Join by middle for chrom in regions["chr"].unique(): chr_left = ( regions.filter(chr=chrom) .with_columns(((pl.col('end') + pl.col('start')) / 2).floor().cast(pl.UInt32).alias('mid')) .sort('mid') ) chr_right = ( along_regions.filter(chr=chrom) .with_columns(((pl.col('end') + pl.col('start')) / 2).floor().cast(pl.UInt32).alias('mid')) .sort('mid') ) joined = ( chr_left .join_asof(chr_right, on='mid', strategy='nearest') .with_columns(((pl.col('end') + pl.col('start')) / 2).floor().cast(pl.UInt32).alias('mid')) .select(["chr", "start", "mid", "end", pl.col("start_right").alias("areg_start"), pl.col("end_right").alias("areg_end"), "id", "strand"]) ) total.append(joined) total = pl.concat(total) in_gb = total.filter( (pl.col("mid") >= pl.col("areg_start")) & (pl.col("mid") <= pl.col("areg_end")) ) in_up = total.filter( ( (pl.col("strand") != "-") & (pl.col("end") >= (pl.col("areg_start") - flank_length)) & (pl.col("mid") < pl.col("areg_start")) ) | ( (pl.col("strand") == "-") & (pl.col("mid") > pl.col("areg_end")) & (pl.col("start") <= (pl.col("areg_end") + flank_length)) ) ) in_down = total.filter( ( (pl.col("strand") != "-") & (pl.col("mid") > pl.col("areg_end")) & (pl.col("start") <= (pl.col("areg_end") + flank_length)) ) | ( (pl.col("strand") == "-") & (pl.col("end") >= (pl.col("areg_start") - flank_length)) & (pl.col("mid") < pl.col("areg_start")) ) ) intergene = total.filter( (pl.col("end") < (pl.col("areg_start") - flank_length)) | (pl.col("start") > (pl.col("areg_end") + flank_length)) ) return RegAlignResult(in_gb, in_up, in_down, intergene, flank_length)
[docs]class EnrichmentResult: """ Class for storing and visualizing enrichment results. Warnings -------- This class SHOULD NOT be called directly. To create it call :func:`Enrichment.enrich` instead. """ def __init__( self, aligned: pl.DataFrame, enrich_stats: pl.DataFrame, is_gff: bool = False, ): self.aligned = aligned self.enrich_stats = enrich_stats self._is_gff = is_gff
[docs] def ref_positions(self): """ This method calculates relative normalized positions of region start and end to gene region respectively. Coordinates (-1, 0) refer to the upstream region, [0, 1] refer to the gene body, (1, 2) refer to the downstream region. Returns ------- pl.DataFrame """ ref_positions = ( self.aligned.lazy() .filter( pl.col("type").is_in(["upstream", "gene", "downstream"]) if self._is_gff else True ) .select(["gstart", "gend", "afrag_start", "afrag_end", "strand", "type"]) .cast({key: pl.Int64 for key in ["gstart", "gend", "afrag_start", "afrag_end"]}) .with_columns( ref_start=( pl.when(pl.col("strand") == "-") .then(pl.col("gend") - pl.col("afrag_end")) .otherwise(pl.col("afrag_start") - pl.col("gstart")) ), ref_end=( pl.when(pl.col("strand") == "-") .then(pl.col("gend") - pl.col("afrag_start")) .otherwise(pl.col("afrag_end") - pl.col("gstart")) ), glength=(pl.col("gend") - pl.col("gstart")) ) .with_columns( ref_start=pl.col("ref_start") / pl.col("glength"), ref_end=pl.col("ref_end") / pl.col("glength"), ) .with_columns( ref_start=( pl.when(pl.col("type") == "upstream") .then(pl.col("ref_start") - 1) .when(pl.col("type") == "downstream") .then(pl.col("ref_start") + 1) .otherwise(pl.col("ref_start")) ), ref_end=( pl.when(pl.col("type") == "upstream") .then(pl.col("ref_end") - 1) .when(pl.col("type") == "downstream") .then(pl.col("ref_end") + 1) .otherwise(pl.col("ref_end")) ) ) .with_columns([ pl.concat_list([pl.col("ref_start"), pl.col("ref_end")]).alias("ref_pos"), pl.lit([1, -1]).alias("is_start") ]) .explode(["ref_pos", "is_start"]) .sort("ref_pos") .filter(((pl.col("ref_start") < -1) | (pl.col("ref_end") > 2)).not_()) .with_columns(pl.col("is_start").cum_sum().alias("coverage")) .collect() ) return ref_positions
[docs] def metagene_coverage(self): """ This function calculates the coverage of the metagene by the aligned regions. Returns ------- tuple(relative_positions, coverage) """ ref_positions = self.ref_positions() pos = ref_positions["ref_pos"].to_numpy() cov = ref_positions["coverage"].to_numpy() interval_values = (-1 <= pos) & (pos <= 2) return pos[interval_values], cov[interval_values]
[docs] def plot_enrich_mpl( self, fig_axes: tuple = None, exclude: list = None, label: str = "", **mpl_kwargs ): """ Visualize enrichment results as a scatterplot, where x – genomic region type, y – enrichment value. Parameters ---------- fig_axes Tuple of ( `matplotlib.pyplot.Figure <https://matplotlib.org/stable/api/figure_api.html#matplotlib.figure.Figure>`_, `matplotlib.axes.Axes <https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.html#matplotlib.axes.Axes>`_). New are created if ``None`` exclude List of names of genomic region types to exclude from the final plot. Set None if no exclusion should be performed. label Label for the points on the scatterplot (useful, when several groups of points should be bisualized on the same plot). mpl_kwargs Keyword arguemtnts for plt.scatter function of the matplotlib API. Returns ------- ``matplotlib.pyplot.Figure`` """ exclude = list() if exclude is None else exclude fig, axes = plt.subplots() if fig_axes is None else fig_axes assert isinstance(fig, plt.Figure) assert isinstance(axes, plt.Axes) plot_df = self.enrich_stats.filter(pl.col("type").is_in(exclude).not_()).sort("enrichment") axes.scatter(plot_df["type"].to_list(), plot_df["enrichment"].to_list(), label=label, marker='o', alpha=.90, s=50, **mpl_kwargs) return fig
[docs] def plot_density_mpl( self, fig_axes: tuple = None, flank_windows: int = None, body_windows: int = None, smooth: int = None, norm: bool = False, tick_labels: list[str] = None, label: str = None, **mpl_kwargs ): """ Plot coverage, returned by :func:`RegAlignResult.metagene_coverage` Parameters ---------- fig_axes Tuple of ( `matplotlib.pyplot.Figure <https://matplotlib.org/stable/api/figure_api.html#matplotlib.figure.Figure>`_, `matplotlib.axes.Axes <https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.html#matplotlib.axes.Axes>`_). New are created if ``None`` flank_windows Number of windows for flanking regions (set None for no resampling). body_windows Number of windows for body region (set None for no resampling). smooth Number of windows for `SavGol <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html>`_ filter (set 0 for no smoothing). Applied only if `flank_windows` and `body_windows` params are specified. norm Should the output plot be normalized by maximum coverage. tick_labels Labels for upstream, body region start and end, downstream (e.g. TSS, TES). **Exactly 5** need to be provided. Set ``None`` to disable. label Label of line on line-plot mpl_kwargs Keyword arguments for `matplotlib.plot <https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.plot.html>`_ Returns ------- ``matplotlib.pyplot.Figure`` """ fig, axes = plt.subplots() if fig_axes is None else fig_axes tick_labels = ["Upstream", "TSS", "Body", "TES", "Downstream"] if tick_labels is None else tick_labels pos, cov = self.metagene_coverage() goodones = (pos > np.quantile(pos, q=.005)) & (pos < np.quantile(pos, q=.995)) pos = pos[goodones] cov = cov[goodones] xticks = [-1, 0, .5, 1, 2] if flank_windows is not None and body_windows is not None: resampled_pos = [] resampled_cov = [] for start, stop, windows in zip([-1, 0, 1], [0, 1, 2], [flank_windows, body_windows, flank_windows]): points, step = np.linspace(start, stop, windows + 1, retstep=True) indexes = [(start <= pos) & (pos < (start + step)) for start in points[:-1]] resampled_pos.append(points[:-1]) resampled_cov.append(np.array([cov[index].mean() if cov[index].size != 0 else 0 for index in indexes])) cov = np.concatenate(resampled_cov) pos = np.array(list(range(len(cov)))) valid = np.isnan(cov) cov = np.interp(pos, pos[~valid], cov[~valid]) if (~valid).sum() > 1 else np.full_like(pos, 0) cov = savgol_line(cov, smooth) xticks = [0, flank_windows, flank_windows + body_windows / 2, flank_windows + body_windows, flank_windows * 2 + body_windows] else: new_cov = [] new_pos = [] for p_prev, p_next, c_prev, c_next in zip(pos[:-1], pos[1:], cov[:-1], cov[1:]): if c_next > c_prev: new_pos += [p_prev, p_prev] new_cov += [c_prev, c_next] elif c_next < c_prev: new_pos += [p_next, p_next] new_cov += [c_prev, c_next] else: new_pos += [p_prev] new_cov += [c_prev] pos = np.array(new_pos) cov = np.array(new_cov) if norm and cov.size > 0: cov = cov / cov.max() axes.plot(pos, cov, label=label, **mpl_kwargs) axes.set_xticks(xticks, labels=tick_labels) axes.set_title("Сoverage of genes and flanking regions by DMRs") axes.set_xlabel('Position') axes.set_ylabel('Density') axes.axvline(x=xticks[1], linestyle='--', color='k', alpha=.3) axes.axvline(x=xticks[3], linestyle='--', color='k', alpha=.3) return fig
[docs]class Enrichment: """ Class for performing logFC enrichment of specified regions over genome. Parameters ---------- regions polars.DataFrame of region coordinates, which is validated by :func:`Genome.validate`. Obligatory columns are: chr (chromosome name), start, end. genome polars.DataFrame of genome, which is generated by :func:`Genome.raw` method of an :class:`Genome` instance. flank_length Length in bp of flanking regions for genes. If no flanking regions should be added, set this parameter to 0. """ def __init__(self, regions: pl.DataFrame, genome: pl.DataFrame, flank_length: int = 0): self.regions = Genome.validate(regions) genome = Genome.validate(genome) self.genome = genome if not flank_length else self._add_flank_regions(genome, flank_length) self._flank_length = flank_length @property def _type_lengths(self) -> pl.DataFrame: return ( self.genome .group_by("type") .agg([(pl.col("end") - pl.col("start")).sum().alias("total")]) ) @property def _chr_lengths(self) -> pl.DataFrame: # Use predefined chromosome lengths if available if "region" not in self._type_lengths["type"]: return self.genome.group_by("chr").agg([(pl.last("end") - pl.first("start")).alias("length")]) # Otherwise, take last gene coord for chromosome end else: return (self.genome.vstack(self.regions).filter(type="region").select( ["type", (pl.col("end")).alias("length")])) @property def _is_gff(self) -> bool: return "gene" in self._type_lengths["type"] @property def _total_rlen(self) -> int: return self.regions.with_columns(length=pl.col("end") - pl.col("start"))["length"].sum() @staticmethod def _add_flank_regions(genome, flank_length) -> pl.DataFrame: gene_bodies = Genome(genome.lazy()).gene_body(flank_length=flank_length) return pl.concat( [ genome, Genome.validate( gene_bodies .select(["chr", pl.col("upstream").alias("start"), pl.col("start").alias("end"), "strand"]) .with_columns( type=pl.when(pl.col("strand") == "-").then(pl.lit("downstream")).otherwise(pl.lit("upstream")) ) ), Genome.validate( gene_bodies .select(["chr", pl.col("end").alias("start"), pl.col("downstream").alias("end"), "strand"]) .with_columns( type=pl.when(pl.col("strand") == "-").then(pl.lit("upstream")).otherwise(pl.lit("downstream")) ) ) ] )
[docs] def enrich(self) -> EnrichmentResult: """ This method performs an alignment of regions to the genome, calculating genomic coordinates of genome regions and user-defined regions intersections, and runs calculates logFC enrichment metric. Returns ------- :class:`EnrichmentResult` """ if not self._is_gff: raise ValueError('"gene" region type must be included in the genome DataFrame!') type_lengths = self._type_lengths chr_lengths = self._chr_lengths # Add intergene type to type_lengths type_lengths.extend( pl.DataFrame({ "type": "intergene", "total": chr_lengths["length"].sum() - type_lengths.filter(type="gene").row(0)[1] }).cast(type_lengths.schema) ) res = [] # Aligning for chrom in chr_lengths["chr"]: filtered_genome = self.genome.filter(chr=chrom).sort("start") filtered_dmrs = self.regions.filter(chr=chrom) gpos = list(filtered_genome[["start", "end"]].iter_rows()) dpos = list(filtered_dmrs[["start", "end"]].iter_rows()) for gstart, gend in gpos: aligned = [] i = 0 while dpos: try: dstart, dend = dpos[i] except IndexError: break if dend < gstart: dpos.pop(0) else: if dend > gstart > dstart: aligned.append((chrom, gstart, gend, dstart, dend, gstart, dend)) elif dstart >= gstart and dend <= gend: aligned.append((chrom, gstart, gend, dstart, dend, dstart, dend)) elif dstart < gend < dend: aligned.append((chrom, gstart, gend, dstart, dend, dstart, gend)) else: break i += 1 if aligned: res.append(aligned) res_df = pl.DataFrame( list(itertools.chain(*res)), schema={ "chr": self.genome.schema["chr"], "gstart": self.genome.schema["start"], "gend": self.genome.schema["end"], "dmr_start": self.genome.schema["start"], "dmr_end": self.genome.schema["end"], "afrag_start": self.genome.schema["start"], "afrag_end": self.genome.schema["end"], } ) joined = res_df.join(self.genome, left_on=["chr", "gstart", "gend"], right_on=["chr", "start", "end"], how="left") len_stats = ( joined .group_by("type") .agg([ (pl.col("afrag_end") - pl.col("afrag_start")).sum().alias("alen"), ]) ) # Intergene row len_stats.extend( self.regions .join(res_df, right_on=["chr", "dmr_start", "dmr_end"], left_on=["chr", "start", "end"], how="left") .filter(pl.col("gstart").is_null()) .group_by(pl.lit(True)) .agg((pl.col("end") - pl.col("start")).sum().alias("alen")) .with_columns(type=pl.lit("intergene")) .select(["type", "alen"]) ) len_stats = len_stats.join(type_lengths, on="type") # Non-CDS row len_stats.extend( pl.DataFrame({ "type": "NCDS", "alen": len_stats.filter(type="gene").row(0)[1] - len_stats.filter(type="CDS").row(0)[1], "total": len_stats.filter(type="gene").row(0)[2] - len_stats.filter(type="CDS").row(0)[2], }).cast(len_stats.schema) ) enrichment = ( len_stats .with_columns( enrichment=( (pl.col("alen") / self._total_rlen).log(2) - (pl.col("total") / chr_lengths["length"].sum()).log(2) ) ) ) return EnrichmentResult(aligned=joined, enrich_stats=enrichment, is_gff=self._is_gff)