incomplete download of pgx-main

This commit is contained in:
2025-08-18 12:12:55 +02:00
parent fe48df8676
commit 75c821b975
984 changed files with 0 additions and 878657 deletions

View File

@@ -1,78 +0,0 @@
#!/usr/bin/env python3
import os
import sys
def get_backgroud_alleles(database, core_vars):
dbs = []
dbs_temp = []
core_vars_list = core_vars.split(";")
core_temp1 = core_vars_list[-1][:-4]
core_temp2 = core_vars_list[0][:-4]
for line in open(database, "r"):
line = line.strip().split("\t")
dbs.append(line)
for record in dbs:
temp_rec = record[1]
if core_temp1 and core_temp2 in temp_rec:
dbs_temp.append(record)
scores = []
candidates = []
cand_vars = []
for elem in dbs_temp:
candidates.append(elem[0])
record_core_var = elem[1].split(";")
cand_vars.append(record_core_var)
counter = 0
for i in record_core_var:
if i in core_vars_list:
counter += 3
elif i[:-4] in core_vars:
counter += 1
else:
counter += -2
scores.append(counter)
cand_diplos = []
diplo_vars2 = []
if len(scores) == 0:
diplo1 = '1.v1_1.v1'
allele_res = '*1/*1'
else:
max_score = max(scores)
indices = [i for i, x in enumerate(scores) if x == max_score or x == max_score - 1]
for i in indices:
diplo = candidates[i]
diplo_vars1 = len(cand_vars[i])
cand_diplos.append(diplo)
diplo_vars2.append(diplo_vars1)
min_index = diplo_vars2.index(min(diplo_vars2))
diplo1 = cand_diplos[min_index]
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [allele_res, diplo1];

View File

@@ -1,560 +0,0 @@
#!/usr/bin/env python3
import os
import sys
def get_core_variants(infile, cn):
core_vars = []
for line in open(infile, "r"):
line = line.strip()
core_vars.append(line)
core_vars = ";".join(sorted(core_vars))
if int(cn) == 1:
core_vars = core_vars.replace("~0/1", "~1/1")
return core_vars
def get_all_vars_gt(infile_full_gt):
all_vars_gt = []
for line in open(infile_full_gt, "r"):
line = line.strip()
all_vars_gt.append(line)
all_vars_gt = ";".join(sorted(all_vars_gt))
return all_vars_gt
def cand_snv_allele_calling(database, infile, infile_full, infile_full_gt, infile_spec, cn):
f = open(infile_spec, "r")
all_variants = []
for line in open(infile_full, "r"):
line.strip()
all_variants.append(line)
# all_variants = line.strip().split(";")
# print(all_variants)
if os.stat(infile).st_size == 0:
cand_res = ['1.v1_1.v1']
allele_res = "*1/*1"
return ["".join(cand_res), allele_res];
#print("\nSupporting variants")
#print("\n" + "".join(all_variants))
sys.exit()
# core_variants = []
# for line in open(infile, "r"):
# line = line.strip()
# core_variants.append(line)
# core_variants = ";".join(sorted(core_variants))
core_variants = get_core_variants(infile, cn)
# if int(cn) == 1:
# core_variants = core_variants.replace("~0/1", "~1/1")
# else:
# pass
all_var_gt = []
for line in open(infile_full_gt, "r"):
line = line.strip()
all_var_gt.append(line)
dbs = []
for line in open(database, "r"):
line = line.strip().split("\t")
dbs.append(line)
soln_list1 = []
soln_list2 = []
for record in dbs:
record_core_var = record[1].split(";")
record_core_var = ";".join(sorted(record_core_var))
if record_core_var == core_variants:
diplo = record[0]
full_dip = record[2]
soln_list1.append(record[0])
soln_list2.append(record[2])
else:
pass
#return soln_list1
#print("\nResult:")
diff_alleles_check = False
def chkList(lst):
if len(lst) < 0 :
diff_alleles_check = True
diff_alleles_check = all(ele == lst[0] for ele in lst)
if(diff_alleles_check):
return("Equal")
else:
return("Not equal")
if len(soln_list1) == 1:
diplo = "".join(soln_list1)
res1 = [i for i in range(len(diplo)) if diplo.startswith("_", i)]
res2 = [i for i in range(len(diplo)) if diplo.startswith(".", i)]
hap1 = "*" + str (diplo[:res2[0]])
hap2 = "*" + str (diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo, allele_res];
#print ("\nSupporting variants:")
#print ("\n" + core_variants + "\n")
elif len(soln_list1) == 2:
print(soln_list1)
diplo1 = soln_list1[0]
diplo2 = soln_list1[1]
diplo1_supp_var = soln_list2[0].split(";")
diplo2_supp_var = soln_list2[1].split(";")
uniq_diplo1 = []
uniq_diplo2 = []
for i in all_variants:
if i not in diplo1_supp_var:
uniq_diplo1.append(i)
if i not in diplo2_supp_var:
uniq_diplo2.append(i)
#print("\nUnique variants in soln 1: {}".format(len(uniq_diplo1)))
#print("\nUnique variants in soln 2: {}".format(len(uniq_diplo2)))
if len(uniq_diplo1) < len(uniq_diplo2):
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo1, allele_res];
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
elif len(uniq_diplo1) > len(uniq_diplo2):
res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
hap1 = "*" + str (diplo2[:res2[0]])
hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo2, allele_res];
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
# elif len(uniq_diplo1) == len(uniq_diplo2) and (diplo1 == "4.v11_74.v1" and diplo2 == "4.v12_1.v1"):
# res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
# res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
# hap1 = "*" + str (diplo2[:res2[0]])
# hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
# allele_res = hap1 + "/" + hap2
# return [soln_list1, diplo2, allele_res];
# elif len(uniq_diplo1) == len(uniq_diplo2) and diplo2 == "41.v1_65.v1":
# res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
# res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
# hap1 = "*" + str (diplo2[:res2[0]])
# hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
# allele_res = hap1 + "/" + hap2
# return [soln_list1, diplo2, allele_res];
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
# elif len(uniq_diplo1) == len(uniq_diplo2) and (diplo1 == "4.v1_6.v1" and diplo2 == "4.v4_6.v2") :
# res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
# res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
# hap1 = "*" + str (diplo1[:res2[0]])
# hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
# allele_res = hap1 + "/" + hap2
# return [soln_list1, diplo1, allele_res];
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
else:
tiebreak1 = []
tiebreak2 = []
tiebreak3 = []
score = []
score2 = []
test1 = []
for line in f:
line = line.strip().split()
#print(line)
if line[2] == core_variants:
tiebreak1.append(line[1])
tiebreak2.append(line[3])
tiebreak3.append(line[0])
for full_dip in tiebreak2:
diplo_supp_gt = full_dip.split(";")
uniq_gt = []
uniq_gt1 = []
for i in all_var_gt:
if i not in diplo_supp_gt:
uniq_gt.append(i)
score_dip = len(uniq_gt)
score.append(score_dip)
for j in diplo_supp_gt:
if j not in all_var_gt:
uniq_gt1.append(j)
score_dip2 = len(uniq_gt1)
score2.append(score_dip2)
min_score = min(score)
min_score2 = min(score2)
res_list = [i for i in range(len(score2)) if score2[i] == min_score2]
# return [tiebreak1, res_list];
# if chkList(score) == "Equal" and soln_list1[0] == "17.v1_4.v1":
# elem = "17.v1_4.v1"
# res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
# res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
# hap1 = "*" + str (elem[:res2[0]])
# hap2 = "*" + str (elem[res1[0]+1:res2[1]])
# result_dip = hap1 + "/" + hap2
# return [soln_list1, elem, result_dip];
if chkList(score) == "Equal":
amb_soln_set = []
amb_set1 = []
if len(res_list) > 3:
soln_list_1 = soln_list1
elif len(res_list) < 3:
amb_set1.append(tiebreak1[res_list[0]])
amb_set1.append(tiebreak1[res_list[-1]])
soln_list_1 = amb_set1
# return [tiebreak1, 'true'];
for elem in soln_list_1:
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
amb_soln_set.append(result_dip)
#elem_pos = tiebreak1.index(elem)
#print ("Solution " + str(elem_pos) + ": " + result_dip)
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
# elif score.count(min_score) > 1 and soln_list1[0] == "11.v1_2.v2":
# elem = "11.v1_2.v2"
# res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
# res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
# hap1 = "*" + str (elem[:res2[0]])
# hap2 = "*" + str (elem[res1[0]+1:res2[1]])
# result_dip = hap1 + "/" + hap2
# return [soln_list1, elem, result_dip];
elif score.count(min_score) > 1:
index_scores = []
amb_soln_set = []
# for i in score:
# if i == min_score:
# index_scores.append(score.index(i))
index_scores = [i for i in range(len(score)) if score[i] == min_score]
alt_solns = []
for j in index_scores:
elem = tiebreak1[j]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns.append(result_dip)
# return [index_scores, 'true']
if chkList(alt_solns) == "Equal":
return[soln_list1, alt_solns[0]];
elif chkList(alt_solns) != "Equal" and alt_solns[0] == '*10/*1B10':
return[soln_list1, '*10/*1'];
else:
alt_solns = sorted(alt_solns)
amb_soln_set.append(alt_solns[0])
amb_soln_set.append(alt_solns[-1])
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
# allele_res = " or ".join(amb_soln_set)
return [soln_list1, allele_res];
# amb_soln_set = []
# temp_set = []
# temp_set.append(tiebreak1[0])
# temp_set.append(tiebreak1[-1])
# for elem in temp_set:
# res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
# res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
# hap1 = "*" + str (elem[:res2[0]])
# hap2 = "*" + str (elem[res1[0]+1:res2[1]])
# result_dip = hap1 + "/" + hap2
# amb_soln_set.append(result_dip)
#elem_pos = tiebreak1.index(elem)
#print ("Solution " + str(elem_pos) + ": " + result_dip)
# allele_res = " or ".join(amb_soln_set)
# return [soln_list1, allele_res];
#print ("\nSupporting core variants:")
#print ("\n" + core_variants + "\n")
else:
minpos = score.index(min_score)
best_diplo = tiebreak1[minpos]
best_cand_haps = tiebreak3[minpos]
res1 = [i for i in range(len(best_diplo)) if best_diplo.startswith("_", i)]
res2 = [i for i in range(len(best_diplo)) if best_diplo.startswith(".", i)]
hap1 = "*" + str (best_diplo[:res2[0]])
hap2 = "*" + str (best_diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, best_cand_haps, allele_res];
#print ("Supporting core variants:")
#print ("\n" + core_variants + "\n")
elif len(soln_list1) == 3:
diplo1 = soln_list1[0]
diplo2 = soln_list1[1]
diplo3 = soln_list1[2]
diplo1_supp_var = soln_list2[0].split(";")
diplo2_supp_var = soln_list2[1].split(";")
diplo3_supp_var = soln_list2[2].split(";")
uniq_diplo1 = []
uniq_diplo2 = []
uniq_diplo3 = []
for i in all_variants:
if i not in diplo1_supp_var:
uniq_diplo1.append(i)
if i not in diplo2_supp_var:
uniq_diplo2.append(i)
if i not in diplo3_supp_var:
uniq_diplo3.append(i)
if len(uniq_diplo1) < len(uniq_diplo2) and len(uniq_diplo1) < len(uniq_diplo3):
res1 = [i for i in range(len(diplo1)) if diplo1.startswith("_", i)]
res2 = [i for i in range(len(diplo1)) if diplo1.startswith(".", i)]
hap1 = "*" + str (diplo1[:res2[0]])
hap2 = "*" + str (diplo1[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo1, allele_res];
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
elif len(uniq_diplo1) > len(uniq_diplo2) and len(uniq_diplo2) < len(uniq_diplo3):
res1 = [i for i in range(len(diplo2)) if diplo2.startswith("_", i)]
res2 = [i for i in range(len(diplo2)) if diplo2.startswith(".", i)]
hap1 = "*" + str (diplo2[:res2[0]])
hap2 = "*" + str (diplo2[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo2, allele_res]
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
elif len(uniq_diplo1) > len(uniq_diplo2) and len(uniq_diplo2) > len(uniq_diplo3):
res1 = [i for i in range(len(diplo3)) if diplo3.startswith("_", i)]
res2 = [i for i in range(len(diplo3)) if diplo3.startswith(".", i)]
hap1 = "*" + str (diplo3[:res2[0]])
hap2 = "*" + str (diplo3[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, diplo3, allele_res]
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
# elif len(uniq_diplo1) == len(uniq_diplo2) == len(uniq_diplo3) and diplo3 == "39.v1_4.v4":
# res1 = [i for i in range(len(diplo3)) if diplo3.startswith("_", i)]
# res2 = [i for i in range(len(diplo3)) if diplo3.startswith(".", i)]
# hap1 = "*" + str (diplo3[:res2[0]])
# hap2 = "*" + str (diplo3[res1[0]+1:res2[1]])
# allele_res = hap1 + "/" + hap2
# return [soln_list1, diplo3, allele_res]
#print ("Supporting variants:")
#print ("\n" + core_variants + "\n")
elif len(uniq_diplo1) == len(uniq_diplo2) == len(uniq_diplo3) or (len(uniq_diplo1) != len(uniq_diplo2) == len(uniq_diplo3)) or (len(uniq_diplo1) == len(uniq_diplo2) != len(uniq_diplo3)):
tiebreak1 = []
tiebreak2 = []
tiebreak3 = []
score = []
score2 = []
test1 = []
for line in f:
line = line.strip().split()
#print(line)
if line[2] == core_variants:
tiebreak1.append(line[1])
tiebreak2.append(line[3])
tiebreak3.append(line[0])
for full_dip in tiebreak2:
diplo_supp_gt = full_dip.split(";")
uniq_gt = []
uniq_gt1 = []
for i in all_var_gt:
if i not in diplo_supp_gt:
uniq_gt.append(i)
score_dip = len(uniq_gt)
score.append(score_dip)
# return [tiebreak1, score];
for j in diplo_supp_gt:
if j not in all_var_gt:
uniq_gt1.append(j)
score_dip2 = len(uniq_gt1)
score2.append(score_dip2)
min_score = min(score)
min_score2 = min(score2)
res_list = [i for i in range(len(score2)) if score2[i] == min_score2]
# return[tiebreak1, res_list];
if chkList(score) == "Equal":
amb_soln_set = []
amb_set1 = []
if len(res_list) > 3:
soln_list_1 = soln_list1
elif len(res_list) < 3:
amb_set1.append(tiebreak1[res_list[0]])
amb_set1.append(tiebreak1[res_list[-1]])
soln_list_1 = amb_set1
for elem in soln_list_1:
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
amb_soln_set.append(result_dip)
#elem_pos = tiebreak1.index(elem)
#print ("Solution " + str(elem_pos) + ": " + result_dip)
if amb_soln_set[0] != amb_soln_set[1]:
allele_res = " or ".join(amb_soln_set)
else:
allele_res = amb_soln_set[0]
return [soln_list1, allele_res];
# allele_res = " or ".join(amb_soln_set)
# return [soln_list1, tiebreak1, allele_res];
#print ("\nSupporting core variants:")
#print ("\n" + core_variants + "\n")
elif score.count(min_score) > 1:
index_scores = []
amb_soln_set = []
index_scores = [i for i in range(len(score)) if score[i] == min_score]
# return[tiebreak1, index_scores];
alt_solns = []
alt_solns1 = []
for j in index_scores:
elem = tiebreak1[j]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns.append(result_dip)
for n in res_list:
elem = tiebreak1[n]
res1 = [i for i in range(len(elem)) if elem.startswith("_", i)]
res2 = [i for i in range(len(elem)) if elem.startswith(".", i)]
hap1 = "*" + str (elem[:res2[0]])
hap2 = "*" + str (elem[res1[0]+1:res2[1]])
result_dip = hap1 + "/" + hap2
alt_solns1.append(result_dip)
if chkList(alt_solns) == "Equal":
return[soln_list1, alt_solns[0]];
else:
alt_solns = sorted(alt_solns)
for i in alt_solns:
if i in alt_solns1:
amb_soln_set.append(i)
# amb_soln_set.append(alt_solns[0])
# amb_soln_set.append(alt_solns[-1])
allele_res = " or ".join(amb_soln_set)
return [soln_list1, allele_res];
else:
minpos = score.index(min_score)
best_diplo = tiebreak1[minpos]
best_cand_haps = tiebreak3[minpos]
res1 = [i for i in range(len(best_diplo)) if best_diplo.startswith("_", i)]
res2 = [i for i in range(len(best_diplo)) if best_diplo.startswith(".", i)]
hap1 = "*" + str (best_diplo[:res2[0]])
hap2 = "*" + str (best_diplo[res1[0]+1:res2[1]])
allele_res = hap1 + "/" + hap2
return [soln_list1, best_cand_haps, allele_res];
#print ("Supporting core variants:")
#print ("\n" + core_variants + "\n")
#print("\nFull diplotype variants:")
#print("\n" + ";".join(all_var_gt))

View File

@@ -1,70 +0,0 @@
#!/usr/bin/env python3
import os
import sys
import subprocess
from snv_def_modules import *
from bkg_modules import *
print("--------------------------------------------\n")
print("CYP3A5 Star Allele Calling with StellarPGx\n")
print("--------------------------------------------\n")
database = sys.argv[1]
infile = sys.argv[2]
infile_full = sys.argv[3]
infile_full_gt = sys.argv[4]
infile_spec = sys.argv[5]
cn = 2
supp_core_vars = get_core_variants(infile, cn)
print("\nSample core variants:")
print(supp_core_vars)
snv_def_calls = cand_snv_allele_calling(database, infile, infile_full, infile_full_gt, infile_spec, cn)
if snv_def_calls == None:
bac_alleles = get_backgroud_alleles(database, supp_core_vars)
if bac_alleles == None:
print("\nResult:")
print("Possible novel allele or suballele present: interpret with caution")
else:
print("\nCandidate alleles:")
print("[" + bac_alleles[-1] + "]")
print("\nResult:")
print("Possible novel allele or suballele present: interpret with caution; experimental validation and expert review through PharmVar is recommended")
print("\nLikely background alleles:")
print("[" + bac_alleles[0] + "]")
sys.exit()
snv_cand_alleles = snv_def_calls[0]
print("\nCandidate alleles:")
print(snv_cand_alleles)
snv_def_alleles = snv_def_calls[-1]
dip_variants = get_all_vars_gt(infile_full_gt)
print("\nResult:")
print(snv_def_alleles)