pg-main from prod server added

This commit is contained in:
2025-08-18 12:03:55 +02:00
parent add456f0e7
commit f66cd01b21
956 changed files with 934400 additions and 0 deletions

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
import os
import sys
def get_backgroud_alleles(database, core_vars):
dbs = []
dbs_temp = []
core_vars_list = core_vars.split(";")
core_temp1 = core_vars_list[-1][:-4]
core_temp2 = core_vars_list[0][:-4]
for line in open(database, "r"):
line = line.strip().split("\t")
dbs.append(line)
for record in dbs:
temp_rec = record[1]
if core_temp1 and core_temp2 in temp_rec:
dbs_temp.append(record)
scores = []
candidates = []
cand_vars = []
for elem in dbs_temp:
candidates.append(elem[0])
record_core_var = elem[1].split(";")
cand_vars.append(record_core_var)
counter = 0
for i in record_core_var:
if i in core_vars_list:
counter += 3
elif i[:-4] in core_vars:
counter += 1
else:
counter += -2
scores.append(counter)
cand_diplos = []
diplo_vars2 = []
if len(scores) == 0:
diplo1 = '1.v1_1.v1'
allele_res = '*1/*1'
else:
max_score = max(scores)
indices = [i for i, x in enumerate(scores) if x == max_score or x == max_score - 1]
for i in indices:
diplo = candidates[i]
diplo_vars1 = len(cand_vars[i])
cand_diplos.append(diplo)
diplo_vars2.append(diplo_vars1)
min_index = diplo_vars2.index(min(diplo_vars2))
diplo1 = cand_diplos[min_index]
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [allele_res, diplo1];

View File

@@ -0,0 +1,469 @@
#!/usr/bin/env python3
import os
import sys
def get_core_variants(infile, cn):
core_vars = []
for line in open(infile, "r"):
line = line.strip()
core_vars.append(line)
core_vars = ";".join(sorted(core_vars))
if int(cn) == 1:
core_vars = core_vars.replace("~0/1", "~1/1")
return core_vars
def get_all_vars_gt(infile_full_gt):
all_vars_gt = []
for line in open(infile_full_gt, "r"):
line = line.strip()
all_vars_gt.append(line)
all_vars_gt = ";".join(sorted(all_vars_gt))
return all_vars_gt
def format_allele(diplo_n):
res1 = [i for i in range(len(diplo_n)) if diplo_n.startswith("_", i)]
res2 = [i for i in range(len(diplo_n)) if diplo_n.startswith(".", i)]
hap1 = "*" + str (diplo_n[:res2[0]])
hap2 = "*" + str (diplo_n[res1[0]+1:res2[1]])
return (hap1 + "/" + hap2)
def cand_snv_allele_calling(database, infile, infile_full, infile_full_gt, infile_spec, cn):
f = open(infile_spec, "r")
all_variants = []
for line in open(infile_full, "r"):
line.strip()
all_variants.append(line)
if os.stat(infile).st_size == 0:
cand_res = ['1.v1_1.v1']
allele_res = "*1/*1"
return ["".join(cand_res), "".join(cand_res), allele_res];
sys.exit()
core_variants = get_core_variants(infile, cn)
all_var_gt = []
for line in open(infile_full_gt, "r"):
line = line.strip()
all_var_gt.append(line)
dbs = []
for line in open(database, "r"):
line = line.strip().split("\t")
dbs.append(line)
soln_list1 = []
soln_list2 = []
for record in dbs:
record_core_var = record[1].split(";")
record_core_var = ";".join(sorted(record_core_var))
if record_core_var == core_variants:
diplo = record[0]
full_dip = record[2]
soln_list1.append(record[0])
soln_list2.append(record[2])
else:
pass
diff_alleles_check = False
def chkList(lst):
if len(lst) < 0 :
diff_alleles_check = True
diff_alleles_check = all(ele == lst[0] for ele in lst)
if(diff_alleles_check):
return("Equal")
else:
return("Not equal")
if len(soln_list1) == 1:
diplo = "".join(soln_list1)
res1 = [i for i in range(len(diplo)) if diplo.startswith("_", i)]
res2 = [i for i in range(len(diplo)) if diplo.startswith(".", i)]
hap1 = "*" + str (diplo[:res2[0]])
hap2 = "*" + str (diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo, allele_res];
elif len(soln_list1) == 2:
print(soln_list1)
diplo1 = soln_list1[0]
diplo2 = soln_list1[1]
diplo1_supp_var = soln_list2[0].split(";")
diplo2_supp_var = soln_list2[1].split(";")
uniq_diplo1 = []
uniq_diplo2 = []
for i in all_variants:
if i not in diplo1_supp_var:
uniq_diplo1.append(i)
if i not in diplo2_supp_var:
uniq_diplo2.append(i)
if len(uniq_diplo1) < len(uniq_diplo2):
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo1, allele_res];
elif len(uniq_diplo1) > len(uniq_diplo2):
res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
hap1 = "*" + str (diplo2[:res2[0]])
hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo2, allele_res];
else:
tiebreak1 = []
tiebreak2 = []
tiebreak3 = []
score = []
score2 = []
test1 = []
for line in f:
line = line.strip().split()
if line[2] == core_variants:
tiebreak1.append(line[1])
tiebreak2.append(line[3])
tiebreak3.append(line[0])
for full_dip in tiebreak2:
diplo_supp_gt = full_dip.split(";")
uniq_gt = []
uniq_gt1 = []
for i in all_var_gt:
if i not in diplo_supp_gt:
uniq_gt.append(i)
score_dip = len(uniq_gt)
score.append(score_dip)
for j in diplo_supp_gt:
if j not in all_var_gt:
uniq_gt1.append(j)
score_dip2 = len(uniq_gt1)
score2.append(score_dip2)
min_score = min(score)
min_score2 = min(score2)
res_list = [i for i in range(len(score2)) if score2[i] == min_score2]
if chkList(score) == "Equal":
amb_soln_set = []
amb_set1 = []
if len(res_list) > 3:
soln_list_1 = soln_list1
elif len(res_list) < 3:
amb_set1.append(tiebreak1[res_list[0]])
amb_set1.append(tiebreak1[res_list[-1]])
soln_list_1 = amb_set1
for elem in soln_list_1:
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
amb_soln_set.append(result_dip)
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
elif score.count(min_score) > 1 and soln_list1[1] == "18.v1_7.v1" and score[-3] == min_score:
return[soln_list1, ['18.v1_7.v1'], '*18/*7'];
elif score.count(min_score) > 1:
index_scores = []
amb_soln_set = []
index_scores = [i for i in range(len(score)) if score[i] == min_score]
alt_solns = []
for j in index_scores:
elem = tiebreak1[j]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns.append(result_dip)
if chkList(alt_solns) == "Equal":
for i in soln_list1:
if format_allele(i) == alt_solns[0]:
diplo1 = i
return [soln_list1, diplo1, alt_solns[0]];
elif chkList(alt_solns) != "Equal" and alt_solns[0] == '*1/*36':
return[soln_list1, ['1.v1_36.v1'], '*1/*36'];
else:
alt_solns = sorted(alt_solns)
amb_soln_set.append(alt_solns[0])
amb_soln_set.append(alt_solns[-1])
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
else:
minpos = score.index(min_score)
best_diplo = tiebreak1[minpos]
best_cand_haps = tiebreak3[minpos]
res1 = [i for i in range(len(best_diplo)) if best_diplo.startswith("_", i)]
res2 = [i for i in range(len(best_diplo)) if best_diplo.startswith(".", i)]
hap1 = "*" + str (best_diplo[:res2[0]])
hap2 = "*" + str (best_diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, best_cand_haps, allele_res];
elif len(soln_list1) == 3:
diplo1 = soln_list1[0]
diplo2 = soln_list1[1]
diplo3 = soln_list1[2]
diplo1_supp_var = soln_list2[0].split(";")
diplo2_supp_var = soln_list2[1].split(";")
diplo3_supp_var = soln_list2[2].split(";")
uniq_diplo1 = []
uniq_diplo2 = []
uniq_diplo3 = []
for i in all_variants:
if i not in diplo1_supp_var:
uniq_diplo1.append(i)
if i not in diplo2_supp_var:
uniq_diplo2.append(i)
if i not in diplo3_supp_var:
uniq_diplo3.append(i)
if len(uniq_diplo1) < len(uniq_diplo2) and len(uniq_diplo1) < len(uniq_diplo3):
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo1, allele_res];
elif len(uniq_diplo1) > len(uniq_diplo2) and len(uniq_diplo2) < len(uniq_diplo3):
res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
hap1 = "*" + str (diplo2[:res2[0]])
hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo2, allele_res]
elif len(uniq_diplo1) > len(uniq_diplo2) and len(uniq_diplo2) > len(uniq_diplo3):
res1 = [i for i in range(len(diplo3)) if diplo3.startswith("_", i)]
res2 = [i for i in range(len(diplo3)) if diplo3.startswith(".", i)]
hap1 = "*" + str (diplo3[:res2[0]])
hap2 = "*" + str (diplo3[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo3, allele_res]
elif len(uniq_diplo1) == len(uniq_diplo2) == len(uniq_diplo3) or (len(uniq_diplo1) != len(uniq_diplo2) == len(uniq_diplo3)) or (len(uniq_diplo1) == len(uniq_diplo2) != len(uniq_diplo3)):
tiebreak1 = []
tiebreak2 = []
tiebreak3 = []
score = []
score2 = []
test1 = []
for line in f:
line = line.strip().split()
if line[2] == core_variants:
tiebreak1.append(line[1])
tiebreak2.append(line[3])
tiebreak3.append(line[0])
for full_dip in tiebreak2:
diplo_supp_gt = full_dip.split(";")
uniq_gt = []
uniq_gt1 = []
for i in all_var_gt:
if i not in diplo_supp_gt:
uniq_gt.append(i)
score_dip = len(uniq_gt)
score.append(score_dip)
for j in diplo_supp_gt:
if j not in all_var_gt:
uniq_gt1.append(j)
score_dip2 = len(uniq_gt1)
score2.append(score_dip2)
min_score = min(score)
min_score2 = min(score2)
res_list = [i for i in range(len(score2)) if score2[i] == min_score2]
if chkList(score) == "Equal":
amb_soln_set = []
amb_set1 = []
if len(res_list) > 3:
soln_list_1 = soln_list1
elif len(res_list) < 3:
amb_set1.append(tiebreak1[res_list[0]])
amb_set1.append(tiebreak1[res_list[-1]])
soln_list_1 = amb_set1
for elem in soln_list_1:
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
amb_soln_set.append(result_dip)
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
elif score.count(min_score) > 1 and soln_list1[1] == "10.v1_35.v1" and score[-3] == min_score:
return[soln_list1, ['10.v1_35.v1'], '*10/*35'];
elif score.count(min_score) > 1 and soln_list1[1] == "10.v1_35.v1" and score.count(min_score) >= 4 and sum(score[-2:]) == min_score:
# return[soln_list1, ['10.v1_35.v1'], '*10/*35'];
amb_soln_set = []
for elem in soln_list1[1:]:
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
amb_soln_set.append(result_dip)
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
# elif score.count(min_score) > 1 and soln_list1[2] == "36.v1_8.v1" and score.count(min_score) >= 4 and sum(score[-2:]) != min_score:
# return[soln_list1, ['36.v1_8.v1'], '*36/*8'];
elif score.count(min_score) > 1 and soln_list1[0] == "1.v1_37.v1":
return[soln_list1, ['1.v1_37.v1'], '*1/*37'];
elif score.count(min_score) > 1:
index_scores = []
amb_soln_set = []
index_scores = [i for i in range(len(score)) if score[i] == min_score]
alt_solns = []
alt_solns1 = []
for j in index_scores:
elem = tiebreak1[j]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns.append(result_dip)
for n in res_list:
elem = tiebreak1[n]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns1.append(result_dip)
if chkList(alt_solns) == "Equal":
for i in soln_list1:
if format_allele(i) == alt_solns[0]:
diplo1 = i
return[soln_list1, diplo1, alt_solns[0]];
else:
alt_solns = sorted(alt_solns)
for i in alt_solns:
if i in alt_solns1:
amb_soln_set.append(i)
allele_res = " or ".join(amb_soln_set)
return [soln_list1, allele_res];
else:
minpos = score.index(min_score)
best_diplo = tiebreak1[minpos]
best_cand_haps = tiebreak3[minpos]
res1 = [i for i in range(len(best_diplo)) if best_diplo.startswith("_", i)]
res2 = [i for i in range(len(best_diplo)) if best_diplo.startswith(".", i)]
hap1 = "*" + str (best_diplo[:res2[0]])
hap2 = "*" + str (best_diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, best_cand_haps, allele_res];

View File

@@ -0,0 +1,359 @@
#!/usr/bin/env python3
import os
import sys
import subprocess
from snv_def_modules import *
from sv_modules import *
from bkg_modules import *
print("--------------------------------------------\n")
print("CYP2A6 Star Allele Calling with StellarPGx\n")
print("--------------------------------------------\n")
database = sys.argv[1]
infile = sys.argv[2]
infile_full = sys.argv[3]
infile_full_gt = sys.argv[4]
infile_spec = sys.argv[5]
sv_del = sys.argv[6]
sv_dup = sys.argv[7]
cov_file = sys.argv[8]
hap_dbs = sys.argv[9]
act_score = sys.argv[10]
cn = get_total_CN(cov_file)[0]
print("Initially computed CN = {}".format(cn))
supp_core_vars = get_core_variants(infile, cn)
print("\nSample core variants:")
print(supp_core_vars)
snv_def_calls = cand_snv_allele_calling(database, infile, infile_full, infile_full_gt, infile_spec, cn)
if snv_def_calls == None:
bac_alleles = get_backgroud_alleles(database, supp_core_vars)
if int(cn) == 0:
print("\nResult:")
print("*4/*4")
elif bac_alleles == None:
print("\nResult:")
print("Possible novel allele or suballele present: interpret with caution")
elif bac_alleles != None and int(cn) < 2:
bac_alleles = bac_alleles[0].split("/")
bac_alleles1 = bac_alleles[0] + "/" + "*4"
print("\nResult:")
print("Possible novel allele or suballele present: interpret with caution; experimental validation and expert review through PharmVar is recommended")
print("\nLikely background alleles:")
print("[" + bac_alleles1 + "]")
else:
print("\nCandidate alleles:")
print("[" + bac_alleles[-1] + "]")
print("\nResult:")
print("Possible novel allele or suballele present: interpret with caution; experimental validation and expert review through PharmVar is recommended")
print("\nLikely background alleles:")
print("[" + bac_alleles[0] + "]")
sys.exit()
best_diplos = snv_def_calls[0]
print("\nCandidate alleles:")
print(best_diplos)
snv_def_alleles = snv_def_calls[-1]
if "or" in snv_def_alleles:
pass
else:
snv_cand_alleles = snv_def_calls[1]
dip_variants = get_all_vars_gt(infile_full_gt)
print("\nResult:")
av_cov = get_total_CN(cov_file)[1]
cov_e1_e2 = get_total_CN(cov_file)[3]
cov_e3_e9 = get_total_CN(cov_file)[4]
cov_3p_utr = get_total_CN(cov_file)[5]
cov_ctrl = get_total_CN(cov_file)[2]
cov_e1_e4 = get_total_CN(cov_file)[6]
cov_e5_e9 = get_total_CN(cov_file)[7]
cov_e3_e4 = get_total_CN(cov_file)[8]
cov_e9_3pr = get_total_CN(cov_file)[9]
cov_e7_e8 = get_total_CN(cov_file)[10]
gene_alleles = ""
conv_3p_utr = ['*5','*7','*8','*10','*19','*24','*28','*35','*36','*37']
if snv_def_alleles != '*1/*1' and cn != '0':
in_list = dup_test_init(sv_dup, av_cov)
if cn == '2':
if 'or' in snv_def_alleles:
print (snv_def_alleles)
else:
snv_def_alleles = snv_def_alleles.split("/")
if snv_def_alleles[0] == '*46' or snv_def_alleles[1] == '*46':
ind_star2 = snv_def_alleles.index('*46')
ind_other = 1 - ind_star2
test_1b = star_1b_test(cov_3p_utr, cov_ctrl)
if test_1b == 'no_1B' and (snv_def_alleles.count('*46') == 2):
gene_alleles = "*1" + "/" + "*1"
print(gene_alleles)
elif test_1b == 'het_1B' and (snv_def_alleles[ind_other] not in conv_3p_utr):
gene_alleles = snv_def_alleles[ind_other] + "/" + "*46"
print(gene_alleles)
elif test_1b =='hom_1B' and (snv_def_alleles[ind_other] in conv_3p_utr):
gene_alleles = snv_def_alleles[ind_other] + "/" + "*46"
print(gene_alleles)
elif snv_def_alleles[0] == '*1' or snv_def_alleles[1] == '*1':
ind_star2 = snv_def_alleles.index('*1')
ind_other = 1 - ind_star2
# test_12 = hybrid_12_test1(cov_e1_e2, cov_e3_e9)
test_12_34 = hybrid_12_34(cov_e1_e2, cov_e3_e9, cov_e1_e4, cov_e5_e9, cov_e3_e4)
test_1b = star_1b_test(cov_3p_utr, cov_ctrl)
if test_12_34 == 'norm_var':
if test_1b == 'no_1B':
gene_alleles = "/".join(snv_def_alleles)
print(gene_alleles)
elif test_1b == 'het_1B' and (snv_def_alleles[ind_other] not in conv_3p_utr):
gene_alleles = snv_def_alleles[ind_other] + "/" + "*46"
print(gene_alleles)
elif test_1b == 'hom_1B' and (snv_def_alleles.count('*1') == 2):
gene_alleles = "*46/*46"
print(gene_alleles)
elif test_1b =='hom_1B' and (snv_def_alleles[ind_other] in conv_3p_utr):
gene_alleles = snv_def_alleles[ind_other] + "/" + "*46"
print(gene_alleles)
elif test_1b =='hom_1B':
gene_alleles = snv_def_alleles[ind_other] + "/" + "*46"
print(gene_alleles)
else:
gene_alleles = "/".join(snv_def_alleles)
print(gene_alleles)
elif test_12_34 == 'hyb_12':
gene_alleles = snv_def_alleles[ind_other] + "/" + "*12"
print(gene_alleles)
elif test_12_34 == 'hyb_12_2' and snv_def_alleles == "*1/*1":
gene_alleles = "*12/*12"
print(gene_alleles)
elif test_12_34 == 'hyb_34':
gene_alleles = snv_def_alleles[ind_other] + "/" + "*34"
print(gene_alleles)
elif test_12_34 == 'hyb_34_2' and snv_def_alleles == "*1/*1":
gene_alleles = "*34/*34"
print(gene_alleles)
else:
gene_alleles = "/".join(snv_def_alleles)
print(gene_alleles)
elif cn == '0':
del_confirm = del_test(sv_del)
test_47_1 = hybrid_47_test1(cov_e9_3pr, cov_e7_e8)
test_47_2 = hybrid_47_test2(cov_e9_3pr, cov_e7_e8, cov_ctrl)
if del_confirm == '*4/*4' and test_47_2 == 'no_hyb_47':
gene_alleles = '*4/*4'
elif del_confirm == '*4/*4' and test_47_2 == 'het_47':
gene_alleles = '*4/*47'
elif del_confirm == '*4/*4' and test_47_2 == 'hom_47':
gene_alleles = '*47/*47'
elif del_confirm == '*4' and test_47_1 == 'no_hyb_47':
gene_alleles = '*4' + "/" + "*other"
elif del_confirm == '*4' and test_47_1 == 'hyb_47':
gene_alleles = '*47' + "/" + "*other"
else:
gene_alleles = "*4/*4"
print(gene_alleles)
elif cn == '1':
del_confirm = del_test(sv_del)
test_47_1 = hybrid_47_test1(cov_e9_3pr, cov_e7_e8)
if "or" in snv_def_alleles and del_confirm == 'None':
print (snv_def_alleles + "\t" + "Possible CYP2A6 gene deletion (*4) present")
elif "or" not in snv_def_alleles and del_confirm == 'None':
snv_def_alleles = snv_def_alleles.split("/")
snv_cand_alleles = "".join(snv_cand_alleles)
snv_cand_alleles = snv_cand_alleles.split("_")
if snv_def_alleles[0] == snv_def_alleles[1] and test_47_1 == 'no_hyb_47':
gene_alleles = snv_def_alleles[0] + "/" + "*4"
print(gene_alleles)
elif snv_def_alleles[0] == snv_def_alleles[1] and test_47_1 == 'hyb_47':
gene_alleles = snv_def_alleles[0] + "/" + "*47"
print(gene_alleles)
elif snv_def_alleles[0] != snv_def_alleles[1]:
samp_allele1 = del_adv_test(hap_dbs, snv_cand_alleles[0], snv_cand_alleles[1], snv_def_alleles[0], snv_def_alleles[1], supp_core_vars)
gene_alleles = samp_allele1 + "/" + "*4"
if test_47_1 == 'no_hyb_47':
pass
elif test_47_1 == 'hyb_47':
gene_alleles = samp_allele1 + "/" + "*47"
print(gene_alleles)
else:
snv_def_alleles = snv_def_alleles.split("/")
snv_cand_alleles = "".join(snv_cand_alleles)
snv_cand_alleles = snv_cand_alleles.split("_")
test_1b = star_1b_test(cov_3p_utr, cov_ctrl)
test_47_1 = hybrid_47_test1(cov_e9_3pr, cov_e7_e8)
if snv_def_alleles[0] == snv_def_alleles[1]:
del_confirm = "*4"
if snv_def_alleles[0] == '*1' and test_1b == 'hom_1B':
snv_def_alleles[0] = '*46'
gene_alleles = del_confirm + "/" + snv_def_alleles[0]
print(gene_alleles)
elif snv_def_alleles[0] in conv_3p_utr and test_47_1 == 'no_hyb_47':
gene_alleles = del_confirm + "/" + snv_def_alleles[0]
print(gene_alleles)
elif snv_def_alleles[0] in conv_3p_utr and test_47_1 == 'hyb_47':
gene_alleles = snv_def_alleles[0] + "/" + '*47'
print(gene_alleles)
else:
gene_alleles = del_confirm + "/" + snv_def_alleles[0]
print(gene_alleles)
# gene_alleles = del_confirm + "/" + snv_def_alleles[0]
# print(gene_alleles)
elif snv_def_alleles[0] != snv_def_alleles[1]:
samp_allele1 = del_adv_test(hap_dbs, snv_cand_alleles[0], snv_cand_alleles[1], snv_def_alleles[0], snv_def_alleles[1], supp_core_vars)
del_confirm == "*4"
gene_alleles = del_confirm + "/" + samp_allele1
print(gene_alleles)
elif (int(cn) == 3 or int(cn) == 4) and snv_def_alleles != None:
orig = snv_def_alleles
if "or" in snv_def_alleles:
print (snv_def_alleles + "\t" + "Duplication present")
else:
snv_def_alleles = snv_def_alleles.split("/")
snv_cand_alleles = "".join(snv_cand_alleles)
snv_cand_alleles = snv_cand_alleles.split("_")
if snv_def_alleles[0] != snv_def_alleles[1]:
phased_dup = dup_test_cn_3_4(sv_dup, hap_dbs, snv_cand_alleles[0], snv_cand_alleles[1], snv_def_alleles[0], snv_def_alleles[1], cn, av_cov, in_list)
elif snv_def_alleles[0] == snv_def_alleles[1]:
rt_2 = int(cn) - 1
phased_dup = (snv_def_alleles[0] + "/" + snv_def_alleles[1] + "x" + str(rt_2))
gene_alleles = phased_dup
print(gene_alleles)
elif int(cn) > 4 and snv_def_alleles != None:
if "or" in snv_def_alleles:
print (snv_def_alleles + "\t" + "Duplication present")
else:
snv_def_alleles = snv_def_alleles.split("/")
snv_cand_alleles = "".join(snv_cand_alleles)
snv_cand_alleles = snv_cand_alleles.split("_")
if snv_def_alleles[0] != snv_def_alleles[1]:
phased_dup = dup_test_cn_n(sv_dup, hap_dbs, snv_cand_alleles[0], snv_cand_alleles[1], snv_def_alleles[0], snv_def_alleles[1], cn, av_cov, in_list)
elif snv_def_alleles[0] == snv_def_alleles[1]:
rt_2 = int(cn) - 1
phased_dup = (snv_def_alleles[0] + "/" + snv_def_alleles[1] + "x" + str(rt_2))
gene_alleles = phased_dup
print(phased_dup)
elif int(cn) > 2 and snv_def_alleles == None:
print("Possible rare CYP2A6/2A7 hybrid present")

View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
import os
import sys
import math
def get_total_CN(cov_file):
all_reg =[]
for line in open(cov_file, "r"):
line = line.strip().split()
all_reg.append(line)
av_2a6_cov = float(all_reg[0][3])/(float(all_reg[0][2]) - float(all_reg[0][1]))
av_vdr_cov = float(all_reg[1][3])/(float(all_reg[1][2]) - float(all_reg[1][1]))
av_egfr_cov = float(all_reg[2][3])/(float(all_reg[2][2]) - float(all_reg[2][1]))
av_e1_e2 = float(all_reg[3][3])/(float(all_reg[3][2]) - float(all_reg[3][1]))
av_e3_e9 = float(all_reg[4][3])/(float(all_reg[4][2]) - float(all_reg[4][1]))
av_3p_utr = float(all_reg[5][3])/(float(all_reg[5][2]) - float(all_reg[5][1]))
av_ex1_ex4 = float(all_reg[6][3])/(float(all_reg[6][2]) - float(all_reg[6][1]))
av_ex5_ex9 = float(all_reg[7][3])/(float(all_reg[7][2]) - float(all_reg[7][1]))
av_ex3_ex4 = float(all_reg[8][3])/(float(all_reg[8][2]) - float(all_reg[8][1]))
av_ex9_3pr = float(all_reg[9][3])/(float(all_reg[9][2]) - float(all_reg[9][1]))
av_ex7_ex8 = float(all_reg[10][3])/(float(all_reg[10][2]) - float(all_reg[10][1]))
av_ctrl_cov = (av_vdr_cov + av_egfr_cov)/2
comp_av = av_2a6_cov/av_ctrl_cov
temp_cn = 2 * comp_av
total_cn = round(temp_cn)
return [str(int(total_cn)), round(av_2a6_cov), round(av_ctrl_cov), str(av_e1_e2), str(av_e3_e9), str(av_3p_utr), str(av_ex1_ex4), str(av_ex5_ex9), str(av_ex3_ex4), str(av_ex9_3pr), str(av_ex7_ex8)];
def del_test(sv_del):
if os.stat(sv_del).st_size == 0:
return "None"
else:
for line in open(sv_del, "r"):
if "COVERAGE" in line:
line = line.strip().split()
ABHom = line[-1]
ABHet = line[-2]
GT = line[2]
DP = int(line[3])
if float(ABHom) == 1.0:
return "*4/*4"
elif float(ABHom) == -1.0:
return "*4"
else:
pass
hap_adv_list = []
hap_t1 = []
def del_adv_test(hap_dbs, cand_allele1, cand_allele2, test_allele1, test_allele2, core_vars):
g = open(hap_dbs, "r")
for line in g:
line = line.strip().split()
hap_adv_list.append(line)
a1 = core_vars.split(";")
for i in a1:
if i[-3:] == "0/1":
hap_t1.append(i[:-4])
for elem in hap_adv_list:
if elem[1] == cand_allele1:
list_t1 = (elem[2]).split(';')
if elem[1] == cand_allele2:
list_t2 = (elem[2]).split(';')
if hap_t1[0] in list_t1:
return test_allele1
elif hap_t1[0] in list_t2:
return test_allele2
het_hom_list = []
het_hom_list_new = []
def dup_test_init(sv_dup, av_cov):
for line in open(sv_dup, "r"):
if "COVERAGE" in line:
continue
elif "AGGREGATED" in line:
continue
else:
fields = line.strip().split()
het_hom_list.append(fields)
test_list1 = []
for i in het_hom_list:
test_list1.append(int(i[2]))
av_read_cov = sum(test_list1)/len(test_list1)
norm_cov = (av_cov + av_read_cov)/2
for i in het_hom_list:
supp_reads = round(float(i[-2])*int(i[2]))
i.append(round(supp_reads/av_read_cov, 4))
i.append(supp_reads)
het_hom_list_new.append(i)
return (het_hom_list_new)
hap_def_list = []
allele_cn_list = []
def dup_test_cn_3_4(sv_dup, hap_dbs, cand_allele1, cand_allele2, test_allele1, test_allele2, c_num, av_cov, in_list):
g = open(hap_dbs, "r")
for line in g:
line = line.strip().split()
hap_def_list.append(line)
test_list1 = []
test_list2 = []
het_list = []
for i in in_list:
if i[1] == "0/1":
het_list.append(i)
for i in het_list:
test_list1.append(i[0])
test_list2.append(i[-2])
max_het = max(test_list2)
max_het_pos = test_list2.index(max_het)
var = test_list1[max_het_pos]
for elem in hap_def_list:
if elem[1] == cand_allele1:
list_3t = elem
list_3t_2 = list_3t[2].split(';')
l3 = len(list_3t_2)
if elem[1] == cand_allele2:
list_4t = elem
list_4t_2 = list_4t[2].split(';')
l4 = len(list_4t_2)
hdb_list = list_3t_2 + list_4t_2
index_var = hdb_list.index(var)
if index_var < l3:
allele_cn_list.append(test_allele1)
allele_cn_list.append(int(round(max_het*int(c_num))))
elif index_var >= l3:
allele_cn_list.append(test_allele2)
allele_cn_list.append(int(round(max_het*int(c_num))))
if allele_cn_list[0] == test_allele1:
rt_2 = int(c_num) - allele_cn_list[1]
allele_cn_list.append(test_allele2)
allele_cn_list.append(rt_2)
elif allele_cn_list[0] == test_allele2:
rt_2 = int(c_num) - allele_cn_list[1]
allele_cn_list.append(test_allele1)
allele_cn_list.append(rt_2)
if allele_cn_list[1] == 0:
res_dip = allele_cn_list[0] + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3] - 1)
elif allele_cn_list[3] == 0:
res_dip = allele_cn_list[2] + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1] - 1)
elif allele_cn_list[1] == 1:
res_dip = allele_cn_list[0] + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 1:
res_dip = allele_cn_list[2] + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
elif allele_cn_list[1] == 2:
res_dip = allele_cn_list[0] + "x2" + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 2:
res_dip = allele_cn_list[2] + "x2" + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
else:
res_dip = 'check'
return res_dip
def dup_test_cn_n(sv_dup, hap_dbs, cand_allele1, cand_allele2, test_allele1, test_allele2, c_num, av_cov, in_list):
g = open(hap_dbs, "r")
for line in g:
line = line.strip().split()
hap_def_list.append(line)
test_list1 = []
test_list2 = []
het_list = []
for i in in_list:
if i[1] == "0/1":
het_list.append(i)
for i in het_list:
test_list1.append(i[0])
test_list2.append(i[-2])
max_het = max(test_list2)
max_het_pos = test_list2.index(max_het)
var = test_list1[max_het_pos]
for elem in hap_def_list:
if elem[1] == cand_allele1:
list_3t = elem
list_3t_2 = list_3t[2].split(';')
l3 = len(list_3t_2)
if elem[1] == cand_allele2:
list_4t = elem
list_4t_2 = list_4t[2].split(';')
l4 = len(list_4t_2)
hdb_list = list_3t_2 + list_4t_2
index_var = hdb_list.index(var)
if index_var < l3:
allele_cn_list.append(test_allele1)
allele_cn_list.append(int(round(max_het*int(c_num)-0.15)))
elif index_var >= l3:
allele_cn_list.append(test_allele2)
allele_cn_list.append(int(round(max_het*int(c_num)-0.15)))
if allele_cn_list[0] == test_allele1:
rt_2 = int(c_num) - allele_cn_list[1]
allele_cn_list.append(test_allele2)
allele_cn_list.append(rt_2)
elif allele_cn_list[0] == test_allele2:
rt_2 = int(c_num) - allele_cn_list[1]
allele_cn_list.append(test_allele1)
allele_cn_list.append(rt_2)
if allele_cn_list[1] == 0:
res_dip = allele_cn_list[0] + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3] - 1)
elif allele_cn_list[3] == 0:
res_dip = allele_cn_list[2] + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1] - 1)
elif allele_cn_list[1] == 1:
res_dip = allele_cn_list[0] + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 1:
res_dip = allele_cn_list[2] + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
elif allele_cn_list[1] == 2:
res_dip = allele_cn_list[0] + "x2" + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 2:
res_dip = allele_cn_list[2] + "x2" + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
elif allele_cn_list[1] == 3:
res_dip = allele_cn_list[0] + "x3" + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 3:
res_dip = allele_cn_list[2] + "x3" + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
elif allele_cn_list[1] == 4:
res_dip = allele_cn_list[0] + "x4" + "/" + allele_cn_list[2] + "x" + str(allele_cn_list[3])
elif allele_cn_list[3] == 4:
res_dip = allele_cn_list[2] + "x4" + "/" + allele_cn_list[0] + "x" + str(allele_cn_list[1])
else:
res_dip = 'check'
return res_dip
def hybrid_12_test1(cov_e1_e2, cov_e3_e9):
if 0.65 < float(cov_e1_e2)/float(cov_e3_e9) < 1.25:
return 'norm_var'
elif 0.15 < float(cov_e1_e2)/float(cov_e3_e9) < 0.65:
return 'hyb_12'
elif float(cov_e1_e2)/float(cov_e3_e9) < 0.15:
return 'hyb_12_2'
else:
return 'norm_var'
def hybrid_12_34(cov_e1_e2, cov_e3_e9, cov_e1_e4, cov_e5_e9, cov_e3_e4):
if 0.65 < float(cov_e1_e4)/float(cov_e5_e9) < 1.25:
return 'norm_var'
elif 0.65 < float(cov_e1_e2)/float(cov_e3_e9) < 1.25:
return 'norm_var'
elif 0.15 < float(cov_e1_e4)/float(cov_e5_e9) < 0.65 and (0.65 < float(cov_e1_e2)/float(cov_e3_e4) < 1.25):
return 'hyb_34'
elif 0.15 < float(cov_e1_e4)/float(cov_e5_e9) < 0.65 and (0.15 < float(cov_e1_e2)/float(cov_e3_e4) < 0.65):
return 'hyb_12'
elif float(cov_e1_e4)/float(cov_e5_e9) < 0.15:
return 'hyb_34_2'
elif 0.15 < float(cov_e1_e2)/float(cov_e3_e9) < 0.65:
return 'hyb_12'
elif float(cov_e1_e2)/float(cov_e3_e9) < 0.15:
return 'hyb_12_2'
else:
return 'norm_var'
def hybrid_47_test1(cov_e9_3pr, cov_e7_e8):
if 0.25 < float(cov_e7_e8)/float(cov_e9_3pr) < 0.65:
return 'hyb_47'
else:
return 'no_hyb_47'
# For *4/*4
def hybrid_47_test2(cov_e9_3pr, cov_e7_e8, cov_ctrl):
if float(cov_e9_3pr)/float(cov_ctrl) > 0.75:
return 'hom_47'
elif 0.25 < float(cov_e9_3pr)/float(cov_ctrl) < 0.75:
return 'het_47'
else:
return 'no_hyb_47'
def star_1b_test(cov_3p_utr, cov_ctrl):
if float(cov_3p_utr)/float(cov_ctrl) < 0.25:
return 'hom_1B'
elif float(cov_3p_utr)/float(cov_ctrl) > 0.65:
return 'no_1B'
else:
return 'het_1B'