feat: addition of coverage analysis

This commit is contained in:
2025-10-17 11:45:43 +02:00
parent 331ccf066e
commit c5a390638f

View File

@@ -0,0 +1,156 @@
import marimo
__generated_with = "0.16.4"
app = marimo.App(width="medium")
@app.cell
def _():
import os
import json
import statistics
import marimo as mo
import pandas as pd
import plotly.express as px
return json, mo, os, pd, statistics
@app.cell
def _(mo):
mo.md(
r"""
# EDA of LB Assay Coverage
Data was generated by `get_panel_coverage.py` that uses the LB panels LB covered bed (with flankning sequence added, i.e. the variant calling bed) with the deduplicated bam/cram.
The output json from this analysis for each sample has the following form🥇
```json
{
"chr<#>_<start>_<stop>": [
"depth_pos1",
"depth_pos2",
"depth_pos3",
...
"depth_posN"
]
}
```
Analysis Plan:
- Poorly covered regions (i.e. bad probes/challenging areas to align)
- Overly covered regions (i.e. potential areas that are challenging for aligners or have low complexity)
- Comparison of Serenomica and Quantgene in overlap areas
"""
)
return
@app.cell
def _():
# data
se_path = "../data/coverage_serenomica_QG/se1_se2/"
qg_path = "../data/coverage_serenomica_QG/QG_historical/"
return qg_path, se_path
@app.cell
def _(json, os):
def parse_data(dpath):
files = [f for f in os.listdir(dpath) if f.endswith(".json")]
parsed_data, mean_data = {}
for file in files:
name = file.split("_cov")[0]
with open(os.path.join(dpath, file), "r") as handle:
parsed_data[name] = json.load(handle)
return parsed_data
return (parse_data,)
@app.cell
def _(parse_data, qg_path, se_path):
se_data = parse_data(se_path)
qg_data = parse_data(qg_path)
return qg_data, se_data
@app.cell
def _(statistics):
def summerize_list(lst):
vals = list(map(int, lst))
return (
statistics.mean(vals),
statistics.median(vals),
statistics.stdev(vals),
min(vals),
max(vals),
sum(vals),
len(vals),
)
def get_summary_data(sample_data):
out_data = {}
sample_sum, sample_pos_count = 0, 0
for region, depths in sample_data.items():
(mean, median, std, min_val, max_val, sum_vals, count) = (
summerize_list(depths)
)
out_data[region] = {
"min": min_val,
"max": max_val,
"mean": mean,
"median": median,
"std": std,
}
sample_sum += sum_vals
sample_pos_count += count
out_data["overall"] = {"mean": sample_sum / sample_pos_count}
return out_data
return (get_summary_data,)
@app.cell
def _(get_summary_data, qg_data, se_data):
se_summary_data = {}
for se_sample, se_sample_data in se_data.items():
se_summary_data[se_sample] = get_summary_data(se_sample_data)
qg_summary_data = {}
for qg_sample, qg_sample_data in qg_data.items():
qg_summary_data[qg_sample] = get_summary_data(qg_sample_data)
return (se_summary_data,)
@app.cell
def _():
return
@app.cell
def _(pd):
def extract_stat_as_df(data_dict, access_key):
tmp_dict = {}
for name, sample_data in data_dict.items():
tmp_dict[name] = {
region: data[access_key]
for region, data in sample_data.items()
if region != "overall"
}
return pd.DataFrame.from_dict(tmp_dict, orient="index")
return (extract_stat_as_df,)
@app.cell
def _(extract_stat_as_df, se_summary_data):
extract_stat_as_df(se_summary_data, access_key="std")
return
@app.cell
def _():
return
if __name__ == "__main__":
app.run()