Source code for bsxplorer.BamReader

from __future__ import annotations

import dataclasses
import datetime
import itertools
import queue
import time
from collections import Counter, UserDict
from dataclasses import dataclass
from math import ceil
from pathlib import Path
from threading import Thread
from typing import Any, Literal

import numba as nb
import numpy as np

import polars as pl
import pysam
from matplotlib import pyplot as plt
from numba import jit, prange
from progress.bar import Bar
from pyarrow import dataset as pads, compute as pc

from .UniversalReader_batches import UniversalBatch
from .utils import AvailableBAM


# noinspection PyMissingOrEmptyDocstring
def check_path(path: str | Path):
    path = Path(path).expanduser().absolute()
    if not path.exists():
        raise FileNotFoundError
    return path


# noinspection PyMissingOrEmptyDocstring
class QualsCounter(UserDict):
    def __init__(self, data=None):
        UserDict.__init__(self, data if data is not None else dict())

    def __add__(self, other: dict):
        for key in other.keys():
            if key in self.data.keys():
                self.data[key] += other[key]
            else:
                self.data[key] = other[key]
        return self

    def total(self):
        return sum(self.values())

    def weighted_sum(self):
        return sum(key * value for key, value in self.data.items())

    def mean_qual(self):
        return self.weighted_sum() / self.total()


# noinspection PyMissingOrEmptyDocstring
@dataclass
class ReadTask:
    chrom: str
    start: int
    end: int
    filters: list[dict]
    context: str


# noinspection PyMissingOrEmptyDocstring
@dataclass
class AlignmentResult:
    data: Any
    chrom: str
    start: int
    end: int
    alignments: int

    @property
    def is_empty(self):
        return self.data is None


# noinspection PyMissingOrEmptyDocstring
@dataclass
class QCResult:
    quals_count: QualsCounter
    pos_count: list[QualsCounter]
    chrom: str
    start: int
    end: int

    @property
    def is_empty(self):
        return self.quals_count is None


class BAMOptions:
    def __init__(self, bamtype: AvailableBAM):
        self._bamtype = bamtype

    @property
    def strand_conv(self):
        if self._bamtype == "bismark":
            return [3, 4]

    @property
    def strand_conv_df(self) -> pl.DataFrame:
        return pl.DataFrame(
            [
                ["CTCT", "CTGA", "GACT", "GAGA"],
                [True, False, True, False],
                [True, True, False, False]
            ],
            schema=dict(strand_conv=pl.String, strand=pl.Boolean, converted=pl.Boolean)
        )

    @property
    def calls_df(self) -> pl.DataFrame:
        return pl.DataFrame(
            [
                ['z', 'Z', 'x', 'X', 'h', 'H', 'u', 'U'],
                ['CG', 'CG', 'CHG', 'CHG', 'CHH', 'CHH', 'U', 'U'],
                [False, True, False, True, False, True, False, True]
            ],
            schema=dict(call=pl.String, context=pl.String, m=pl.Boolean)
        )


class BAMBar(Bar):
    def __init__(self, reads_total: int = 0, **kwargs):
        self.reads = 0
        self.reads_total = reads_total
        super().__init__(**kwargs)

    suffix = "%(index_kb)d/%(max_kb)d kb (File reading time %(elapsed_fmt)s, ETA: %(eta_fmt)s)"
    fill = "@"

    @property
    def index_kb(self):
        return round(self.index // 1000)

    @property
    def max_kb(self):
        return round(self.max // 1000)

    @property
    def elapsed_fmt(self):
        return str(datetime.timedelta(seconds=int(self.elapsed)))

    @property
    def eta_fmt(self):
        return str(datetime.timedelta(seconds=self.eta))

    @property
    def eta(self):
        if self.reads:
            return int(ceil((self.elapsed / self.reads) * (self.reads_total - self.reads)))
        else:
            return 0

    @property
    def elapsed(self):
        return time.monotonic() - self.start_ts


@jit(
    nb.types.Tuple((nb.types.Array(nb.int64, 2, "C"), nb.types.Array(nb.float64, 1, "C")))(
        nb.types.Array(nb.int8, 2, "F"),
        nb.types.Array(nb.int64, 1, "C"),
        nb.int64,
        nb.int64
    ),
    nopython=True,
    parallel=True,
    fastmath=True
)
def _jit_methylation_entropy(matrix, columns, window_length, min_depth):
    positions_matrix = np.zeros((matrix.shape[1] - window_length, window_length), dtype=np.int64)
    ME_array = np.zeros((matrix.shape[1] - window_length), dtype=np.float64)

    # Sliding window loop
    for i in prange(matrix.shape[1] - window_length):
        window = matrix[:, i:(i + window_length)]

        none_idx = np.not_equal(window, -1).sum(axis=1)

        # Filter empty values
        window = window[none_idx == window.shape[1], :]
        # Check minimal depth
        if window.shape[1] < min_depth:
            continue

        # Initialize signaures array
        signatures = np.zeros(window.shape[0], dtype=np.int64)
        # Fill signatures array
        for col in prange(window.shape[1]):
            signatures = np.multiply(signatures, 2)
            signatures += window[:, col]

        # Sort and find unique values
        sort_signatures = np.sort(signatures)
        nonzero_indicies = np.nonzero(sort_signatures - np.roll(sort_signatures, -1))[0]
        counts = nonzero_indicies + 1
        if nonzero_indicies.size != 0:
            counts_shifted = np.roll(counts, 1)
            counts_shifted[0] = 0

            # Unique counts
            counts = counts - counts_shifted
            total_counts = counts.sum()
            ME_value = abs((1 / window_length) * sum([(count / total_counts) * np.log2(count / total_counts) for count in counts]))
        else:
            ME_value = 0

        positions_matrix[i, :] = columns[i:(i + window_length)]
        ME_array[i] = ME_value

    return positions_matrix, ME_array


@jit(
    nb.types.Tuple((nb.types.Array(nb.int64, 2, "C"), nb.types.Array(nb.float64, 1, "C")))(
        nb.types.Array(nb.int8, 2, "F"),
        nb.types.Array(nb.int64, 1, "C"),
        nb.int64,
        nb.int64
    ),
    nopython=True,
    parallel=True,
    fastmath=True
)
def _jit_epipolymorphism(matrix, columns, window_length, min_depth):
    positions_matrix = np.zeros((matrix.shape[1] - window_length, window_length), dtype=np.int64)
    PM_array = np.zeros((matrix.shape[1] - window_length), dtype=np.float64)

    # Sliding window loop
    for i in prange(matrix.shape[1] - window_length):
        window = matrix[:, i:(i + window_length)]

        none_idx = np.not_equal(window, -1).sum(axis=1)

        # Filter empty values
        window = window[none_idx == window.shape[1], :]
        # Check minimal depth
        if window.shape[1] < min_depth:
            continue

        # Initialize signaures array
        signatures = np.zeros(window.shape[0], dtype=np.int64)
        # Fill signatures array
        for col in prange(window.shape[1]):
            signatures = np.multiply(signatures, 2)
            signatures += window[:, col]

        # Sort and find unique values
        sort_signatures = np.sort(signatures)
        nonzero_indicies = np.nonzero(sort_signatures - np.roll(sort_signatures, -1))[0]
        counts = nonzero_indicies + 1
        counts_shifted = np.roll(counts, 1)
        counts_shifted[0] = 0

        # Unique counts
        counts = counts - counts_shifted
        total_counts = counts.sum()

        PM_value = 1 - sum([(count / total_counts) for count in counts])

        positions_matrix[i, :] = columns[i:(i + window_length)]
        PM_array[i] = PM_value

    return positions_matrix, PM_array


@jit(
    nb.types.Tuple((nb.types.Array(nb.int64, 1, "C"), nb.types.Array(nb.float64, 1, "C"), nb.types.Array(nb.int64, 2, "C")))(
        nb.types.Array(nb.int8, 2, "F"),
        nb.types.Array(nb.int64, 1, "C"),
        nb.int64,
        nb.int64
    ),
    nopython=True,
    parallel=True,
    fastmath=True
)
def _jit_PDR(matrix, columns, min_cyt: int, min_depth: int):
    # Filter low cytosines reads
    low_cyt = np.sum(matrix != -1, axis=1) >= min_cyt
    matrix = matrix[low_cyt, :]

    n_cols = matrix.shape[1]
    position_array = np.zeros(n_cols, dtype=np.int64)
    pdr_array = np.zeros(n_cols, dtype=np.float64)
    count_matrix = np.zeros((n_cols, 2), dtype=np.int64)

    for i in prange(n_cols):
        covering_reads = matrix[:, i] != -1

        # Skip if no reads cover this cytosine
        if covering_reads.sum() < min_depth:
            continue

        window = matrix[covering_reads, :]

        concordant_reads = np.logical_xor((window == 1).sum(axis=1), (window == 0).sum(axis=1))

        total = len(concordant_reads)
        concordant_count = concordant_reads.sum()
        discordant_count = total - concordant_count
        PDR_value = discordant_count / total

        pdr_array[i] = PDR_value
        count_matrix[i] = concordant_count, discordant_count
        position_array[i] = columns[i]

    non_empty = position_array != 0
    position_array = position_array[non_empty]
    pdr_array = pdr_array[non_empty]
    count_matrix = count_matrix[non_empty, :]

    return position_array, pdr_array, count_matrix


[docs]class PivotRegion: """ Class for storing and calculating methylation entropy, epipolymorphism and PDR stats. This class should not be initialized by user, but by :class:`BAMReader` """
[docs] def __init__(self, df: pl.DataFrame, calls=pl.DataFrame, chr: str = chr): self.calls = calls self.pivot = df self.chr = chr
[docs] @classmethod def from_calls(cls, calls, chr): if len(calls["strand"].unique()) > 1: indexes = ["qname", "converted", "strand"] else: indexes = ["qname", "converted"] pivoted = ( calls .sort("position") .cast(dict(m=pl.Int8)) .pivot(index=indexes, columns="position", values="m") .fill_null(-1) ) if not pivoted.is_empty(): return cls(pivoted, calls, chr) else: return None
[docs] def filter(self, **kwargs): return self.from_calls(self.calls.filter(**kwargs), self.chr)
[docs] def matrix_df(self): return self.pivot.select(pl.all().exclude(["qname", "converted", "strand"]))
def _jit_compatitable(self): df = self.matrix_df() matrix = df.to_numpy().astype(np.int8) columns = np.array(list(map(int, df.columns)), dtype=np.int64) return matrix, columns
[docs] def methylation_entropy(self, window_length: int = 4, min_depth: int = 4): """ Wrapper for _jit_methylation_entropy – JIT compiled ME calculating function Parameters ---------- window_length Length of the sliding window min_depth Minimal depth of reads to consider this window for calculation Returns ------- Matrix with position of cytosines from window and array with their ME values """ matrix, columns = self._jit_compatitable() if matrix.shape[1] > window_length and matrix.shape[0] > min_depth: try: return _jit_methylation_entropy(matrix, columns, window_length, min_depth) except TypeError: return np.zeros((0, window_length)), np.zeros((0, 1)) else: return np.zeros((0, window_length)), np.zeros((0, 1))
[docs] def epipolymorphism(self, window_length: int = 4, min_depth: int = 4): """ Wrapper for _jit_epipolymorphism – JIT compiled EPM calculating function Parameters ---------- window_length Length of the sliding window min_depth Minimal depth of reads to consider this window for calculation Returns ------- Matrix with position of cytosines from window and array with their ME values """ matrix, columns = self._jit_compatitable() if matrix.shape[1] > window_length and matrix.shape[0] > min_depth: try: return _jit_epipolymorphism(matrix, columns, window_length, min_depth) except TypeError: return np.zeros((0, window_length)), np.zeros((0, 1)) else: return np.zeros((0, window_length)), np.zeros((0, 1))
[docs] def PDR(self, min_cyt: int = 5, min_depth: int = 4): """ Wrapper for _jit_epipolymorphism – JIT compiled PDR calculating function Parameters ---------- min_cyt Mimimal number of cytosines in region. min_depth Minimal depth of reads to consider this window for calculation Returns ------- Array with cytosine positions, Array of PDR values, Matrix with concordant/discordant reads. """ matrix, columns = self._jit_compatitable() if matrix.shape[1] > min_cyt and matrix.shape[0] > min_depth: return _jit_PDR(matrix, columns, min_cyt, min_depth) else: return np.zeros((0, 1)), np.zeros((0, 1)), np.zeros((0, 2))
class BAMThread(Thread): def __init__( self, queue: queue.Queue, results_queue: queue.Queue, options: BAMOptions, sequence_ds: pads.Dataset, min_qual, keep_converted=False, mode="report", daemon=True, ): Thread.__init__(self, daemon=daemon) self.alignments_queue = queue self.results_queue = results_queue self.sequence_ds = sequence_ds self.options = options self._min_qual = min_qual self._keep_converted = keep_converted self._mode = mode self.finished = False def finish(self): self.finished = True def run(self): while not self.finished: try: task, alignments = self.alignments_queue.get() chrom, start, end, filters, context = dataclasses.astuple(task) if alignments: # Parse alignments # start_time = time.time() parsed = list(filter(lambda parsed: parsed is not None, iter(self.parse_alignment(alignment) for alignment in alignments))) data_lazy = self.calls_to_df(parsed, start, end) if context != "all": data_lazy = data_lazy.filter(context=context) if filters: data = data_lazy.collect() filtered_regions = [data.filter(self.filters_to_expr(f)) for f in filters] else: filtered_regions = None if self._mode == "report": merged = pl.concat(filtered_regions).lazy() if filters else data_lazy merged = self.group_counts(chrom, start, end, context, merged.lazy()) final = self.mutate(merged) else: if filtered_regions is not None: final = [PivotRegion.from_calls(region, chrom) for region in filtered_regions] else: final = PivotRegion.from_calls(data_lazy.collect(), chrom) else: final = None self.results_queue.put(AlignmentResult(final, chrom, start, end, len(alignments))) self.alignments_queue.task_done() except Exception as e: print("Got exception in Report Thread") print(e) parsed_schema_pl = dict( ref_position=pl.List(pl.UInt32), call=pl.List(pl.String), qual=pl.List(pl.UInt8), position=pl.List(pl.UInt32), qname=pl.String, strand_conv=pl.String ) @staticmethod def filters_to_expr(f): # Not filtering by chrome because assert all regions are on the same chrom expr = (pl.col("position") >= f["start"]) & (pl.col("position") <= f["end"]) if f["strand"] in ["+", "-"]: expr = expr & (pl.col("strand") == (True if f["strand"] == "+" else False)) return expr def calls_to_df(self, parsed: list[tuple], start, end) -> pl.LazyFrame: # Convert to polars # Explode ref_positions, calls and quals # Add absolute position column # Filter by read regions bounds # Map strand_conv values and drop this column pass parsed_df = ( pl.LazyFrame(parsed, schema=self.parsed_schema_pl) .explode(["ref_position", "call", "qual", "position"]) .filter((pl.col("position") >= start) & (pl.col("position") <= end)) .join(self.options.strand_conv_df.lazy(), on="strand_conv", how="left") .drop("strand_conv") ) # If specified, filter by converted if not self._keep_converted: parsed_df = parsed_df.filter(converted=False) # If specified, filter by converted if self._min_qual is not None: parsed_df = parsed_df.filter(pl.col("qual") > self._min_qual) # Convert calls values to context and methylation status parsed_df = ( parsed_df .join(self.options.calls_df.lazy(), on="call", how="left") .drop("call") .filter(pl.col('context') != "U") ) parsed_df = parsed_df # Count methylation data = ( parsed_df .sort("qual", descending=True) # .unique(["qname", "position"], keep="first") ) return data def parse_alignment(self, alignment): if alignment.alen == 0: return None calls_string = alignment.tags[2][1] strand_conv = "" for tag_idx in self.options.strand_conv: strand_conv += alignment.tags[tag_idx][1] # parsed: tuple ( 0 , 1 , 2 , 3 , 4 , 5 ) # parsed: tuple (read_pos_arr, calls_arr, qual_arr, reference_pos, query_name, strand_conv) parsed = self.parse_calls(calls_string, alignment.query_alignment_qualities, alignment.get_reference_positions(full_length=True), alignment.qlen) + (alignment.query_name, strand_conv) return parsed @staticmethod def parse_calls(calls_string: str, qual_arr: str, positions: list[int], qlen: int): ref_positions = [] calls = [] quals = [] abs_positions = [] for read_pos in range(qlen): if calls_string[read_pos] == "." or positions[read_pos] is None: continue ref_positions.append(read_pos) calls.append(calls_string[read_pos]) quals.append(qual_arr[read_pos]) abs_positions.append(positions[read_pos] + 1) return ref_positions, calls, quals, abs_positions def mutate(self, batch_lazy: pl.LazyFrame): if self._mode == "report": if "trinuc" not in batch_lazy.columns: batch_lazy = batch_lazy.with_columns(pl.col("context").alias("trinuc")) batch = ( batch_lazy .with_columns((pl.col("count_m") / pl.col("count_total")).alias("density")) .fill_nan(0) .collect() ) return UniversalBatch(batch, raw=None) def group_counts(self, chrom, start, end, context, data_lazy: pl.LazyFrame): if self.sequence_ds is not None: sequence_df = pl.from_arrow(self.sequence_ds.filter((pc.field("chr") == chrom) & (pc.field("position") > start) & (pc.field("position") < end)).to_table()) else: sequence_df = None if context != "all" and sequence_df is not None: sequence_df = sequence_df.filter(context=context) new_batch = ( data_lazy .group_by("position") .agg([ pl.sum("m").alias("count_m"), pl.count("m").alias("count_total"), pl.first('context'), pl.first('strand') ]) ) if sequence_df is not None: new_batch = ( sequence_df.lazy() .cast(dict(position=new_batch.schema["position"])) .sort("position") .join( new_batch.select(["position", "count_m", "count_total"]), on="position", how="left") .fill_null(0) ) else: new_batch = ( new_batch .with_columns(pl.lit(chrom, pl.String).alias("chr")) .sort("position") ) return new_batch class QCThread(Thread): def __init__(self, quals_queue: queue.Queue, result_queue: queue.Queue, daemon=True): Thread.__init__(self, daemon=daemon) self.finished = False self.quals_queue = quals_queue self.result_queue = result_queue def run(self): while not self.finished: try: task, alignments = self.quals_queue.get() chrom, start, end, filters, context = dataclasses.astuple(task) if alignments: quals = [alignment.query_alignment_qualities for alignment in alignments if alignment.alen != 0] quals_count, pos_count = self._qc_stats(quals) self.result_queue.put( QCResult(QualsCounter(dict(quals_count)), [QualsCounter(dict(counter)) for counter in pos_count], chrom, start, end)) except Exception as e: print("Got exception in QC Thread") print(e) def finish(self): self.finished = True @staticmethod def _qc_stats(quals): # Gather batch_stats quals_count = Counter(itertools.chain(*quals)) pos_count = [Counter() for _ in range(max(map(lambda seq: len(seq), quals)))] for qual_seq, counter in zip(itertools.zip_longest(*quals, fillvalue=-1), pos_count): counter += Counter(list(qual_seq)) if -1 in counter.keys(): del counter[-1] return quals_count, pos_count
[docs]class BAMReader: """ Class for reading sorted and indexed BAM files. Parameters ---------- bam_filename Path to SORTED BAM file. index_filename Path to BAM index (.bai) file. cytosine_file Preprocessed with :class:`BinomialData` class cytosine file. Reads will be aligned to cytosine coordinates (None to disable). bamtype Aligner type, which was used during BAM calculation. regions DataFrame from :class:`Genome` with genomic regions to align to (None to disable). threads Number of threads used to decompress BAM. batch_num Number of reads per batch. min_qual Filter reads by quality (None to disable). context Filter reads by conext. Possible options "CG", "CHG", "CHH", "all". Set "all" for no filtering. keep_converted Keep converted reads. Default: True. qc Calculate QC data. Default: True. readahead Number of batches to read before calculation. pysam_kwargs Keyword arguements for pysam.AlignmentFile. """
[docs] def __init__( self, bam_filename: str | Path, index_filename: str | Path, cytosine_file: str | Path = None, bamtype: AvailableBAM = "bismark", regions: pl.DataFrame = None, threads: int = 1, batch_num: int = 1e4, min_qual: int = None, context: Literal["CG", "CHG", "CHH", "all"] = "all", keep_converted: bool = True, qc: bool = True, readahead=5, **pysam_kwargs ): # Init BAM bam_filename = check_path(bam_filename) index_filename = check_path(index_filename) self.bamfile = pysam.AlignmentFile(filename=str(bam_filename), index_filename=str(index_filename), threads=threads, **pysam_kwargs) # Init reference path self.sequence_ds = pads.dataset(cytosine_file) if cytosine_file is not None else None # Init inner attributes self.regions = regions self._batch_num = int(batch_num) * threads self._min_qual = min_qual self._options = BAMOptions(bamtype) self._keep_converted = keep_converted self._readahead = readahead self._mode = "report" self._context = context self.qc = qc if qc: self.quals_count = QualsCounter() self.pos_count = [] self.reg_qual = []
[docs] def report_iter(self): """ Iter BAM file, returns :class:`UniversalBatch`. Returns ------- iterator """ self._mode = "report" return self._iter()
[docs] def stats_iter(self): """ Iter BAM file, returns :class:`PivotRegion`. Returns ------- iterator """ self._mode = "stats" return self._iter()
def _setup_threads(self): self.alignments_queue = queue.Queue(maxsize=self._readahead) self.results_queue = queue.Queue() self._result_iterator = iter([]) self.thread = BAMThread( self.alignments_queue, self.results_queue, self._options, self.sequence_ds, self._min_qual, self._keep_converted, self._mode, daemon=True ) self.thread.start() if self.qc: self.qc_queue = queue.Queue(maxsize=self._readahead) self.qc_results_queue = queue.Queue() self.qc_thread = QCThread(self.qc_queue, self.qc_results_queue, daemon=True) self.qc_thread.start() def _setup_tasks(self): # Init tasks for threads # Task: tuple (chrom, start, end) chr_step = sum(self.bamfile.lengths) // (self.bamfile.mapped // self._batch_num) + 1 tasks = [] for chrom, length in zip(self.bamfile.references, self.bamfile.lengths): for start in range(0, length, chr_step): end = start + chr_step if start + chr_step < length else length if self.regions is None: filters = [] else: filters = self.regions.filter( (pl.col("chr") == chrom) & (pl.col("upstream") >= start) & (pl.col("downstream") <= end) ).select([ "chr", pl.col("upstream").alias("start"), pl.col("downstream").alias("end"), "strand" ]) if filters.is_empty(): continue else: filters = filters.to_dicts() tasks.append(ReadTask(chrom, start, end, filters, self._context)) self._tasks_iterator = iter(tasks) def _iter(self): # Init threads self._setup_threads() self._setup_tasks() # Bar self._bar = BAMBar( message=f"{self.bamfile.references[0]} ({1}/{len(self.bamfile.references)})", max=self.bamfile.lengths[0], reads_total=self.bamfile.mapped) self._bar.start() tasks_ended = False while True: if tasks_ended and self.results_queue.qsize() == 0: self._finish() break try: while not self.alignments_queue.full(): task: ReadTask = next(self._tasks_iterator) alignments = list(self.bamfile.fetch(contig=task.chrom, start=task.start, end=task.end)) self.alignments_queue.put((task, alignments)) if self.qc: self.qc_queue.put((task, alignments)) except StopIteration: tasks_ended = True alignment_result: AlignmentResult = self.results_queue.get() self._upd_bar(alignment_result.chrom, alignment_result.end, alignment_result.alignments) if isinstance(alignment_result.data, list): for result in alignment_result.data: yield result else: yield alignment_result.data
[docs] def qc_data(self) -> tuple[QualsCounter, list[QualsCounter], list[tuple]]: """ Returns QC data from read fragments. Tuple of (Count of Phred score, Count of Phred score by position, Average quality for region) Returns ------- tuple """ quals_stat = QualsCounter() pos_stat = [] reg_stat = [] while not self.qc_results_queue.all_tasks_done: time.sleep(1e-5) for qc_result in list(self.qc_results_queue.queue).copy(): assert isinstance(qc_result, QCResult) quals_stat += qc_result.quals_count for index, pos_count in enumerate(qc_result.pos_count): if index > len(pos_stat) - 1: pos_stat.append(QualsCounter()) pos_stat[index] += pos_count reg_avg = qc_result.quals_count.total() reg_stat.append((qc_result.chrom, qc_result.end, reg_avg)) return quals_stat, pos_stat, reg_stat
[docs] def plot_qc(self): """ Plot QC stats. Returns ------- ``plotly.graph_objects.Figure`` See Also -------- `plotly.graph_objects.Figure <https://plotly.com/python-api-reference/generated/plotly.graph_objects.Figure>`_ """ quals_stat, pos_stat, reg_stat = self.qc_data() fig = plt.figure(constrained_layout=True) gs = fig.add_gridspec(2, 2) # Chr chr_ax = fig.add_subplot(gs[0, :]) chr_ax.set_title("lg(Reads count) by position") chr_ax.set_xlabel("Position") chr_ax.set_ylabel("lg(Count)") chr_ticks = list(itertools.accumulate(self.bamfile.lengths)) x_data, y_data = list(zip(*[(chr_ticks[self.bamfile.references.index(chrom)] + end, qual) for chrom, end, qual in reg_stat])) y_data = np.log10(np.array(y_data)) chr_ax.plot(x_data, y_data, 'b') chr_ax.set_xticks(chr_ticks, labels=self.bamfile.references, rotation=-90, fontsize=8) # Quals qual_ax = fig.add_subplot(gs[1, 0]) qual_ax.set_title("Hist. of map qualities") qual_ax.set_xlabel("Quality") qual_ax.set_ylabel("Density") qual_ax.hist( x=list(quals_stat.keys()), weights=list(quals_stat.values()), color='b', linewidth=1, edgecolor="white", ) # Pos pos_ax = fig.add_subplot(gs[1, 1]) pos_ax.set_title("Read quality by position") pos_ax.set_xlabel("Read position") pos_ax.set_ylabel("Quality") y_data = np.array([s.mean_qual() for s in pos_stat]) pos_ax.plot(y_data, 'b') return fig
def _upd_bar(self, chrom, end, reads): self._bar.message = f"{chrom} ({self.bamfile.references.index(chrom) + 1}/{len(self.bamfile.references)})" self._bar.max = self.bamfile.lengths[self.bamfile.references.index(chrom)] self._bar.index = end self._bar.reads += reads self._bar.update() def _finish(self): self.thread.finish() if self.qc: self.qc_thread.finish() self._bar.goto(self._bar.max) self._bar.update() self._bar.finish() def __iter__(self): raise NotImplementedError("BAMReader can't be iterated itself, use BAMReader.report_iter() or BAMReader.stats_iter() instead.")