from functools import cached_property
import logging
import re
from typing import Iterable, NamedTuple
from urllib.parse import quote
from pysam import TabixFile, VariantRecord # pylint: disable=E0611
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("Annotate CNV")
class Region(NamedTuple):
start: int
end: int
name: str
class Transcript:
def __init__(self, row: list[str]) -> None:
self.transcript = row[1]
self.chrom = row[2]
self.strand = row[3]
# self.tx_start = int(row[4])
# self.tx_end = int(row[5])
# self.cds_start = int(row[6])
# self.cds_end = int(row[7])
self.exon_count = int(row[8])
self.exon_starts = list(map(int, row[9].strip(",").split(",")))
self.exon_ends = list(map(int, row[10].strip(",").split(",")))
self.gene = row[12]
self.is_mrna = "Y" if row[4] == row[5] else "N"
@cached_property
def gene_pk(self) -> str:
return f"{self.chrom}:{self.gene}"
@cached_property
def regions(self) -> list[Region]:
regions: list[Region] = []
for i in range(self.exon_count):
name = f"exon{i + 1}" if self.strand == "+" else f"exon{self.exon_count - i}"
regions.append(Region(start=self.exon_starts[i], end=self.exon_ends[i], name=name))
if i > 0:
name = f"intron{i}" if self.strand == "+" else f"intron{self.exon_count - i}"
regions.append(Region(start=self.exon_ends[i - 1], end=self.exon_starts[i], name=name))
return regions
def calc_detail(self, start: int, end: int) -> str:
regions = sorted(filter(lambda x: x.start < end and x.end >= start, self.regions), key=lambda x: x.start, reverse=self.strand == "-")
region = regions[0].name if len(regions) == 1 else f"{regions[0].name}_{regions[1].name}"
return f"{self.gene}:{self.transcript}:{region}:{self.is_mrna}"
class CNV:
def __init__(self, record: VariantRecord): # pylint: disable=W0231
self.record = record
def validate_overlap(self, start: int, end: int, overlap: float) -> int:
overlap_size = min(self.record.stop, end) - max(self.record.start, start)
return overlap_size >= min(self.record.stop - self.record.start, end - start) * overlap
@property
def pk(self) -> str:
return f"{self.record.chrom}:{self.record.start}-{self.record.stop}"
def query_tabix(self, tbx: TabixFile, chrom_with_chr: bool = False) -> Iterable[str]:
chrom_without_chr = re.sub(r"^[Cc][Hh][Rr]", "", self.record.chrom)
chrom = f"chr{chrom_without_chr}" if chrom_with_chr else chrom_without_chr
try:
return tbx.fetch(chrom, self.record.start, self.record.stop)
except Exception: # pylint: disable=W0718
return []
def anno_gene(self, tbx: TabixFile, gene_ids: dict):
data = {}
for line in self.query_tabix(tbx, True):
transcript = Transcript(line.strip().split("\t"))
gene_id = gene_ids.get(transcript.gene_pk, ".")
detail = transcript.calc_detail(self.record.start, self.record.stop)
data.setdefault(transcript.gene_pk, {"gene": transcript.gene, "gene_id": gene_id, "detail": ""})
sep = "|" if data[transcript.gene_pk]["detail"] else ""
data[transcript.gene_pk]["detail"] += f"{sep}{detail}"
if data:
genes, gene_ids, details = list(zip(*map(lambda x: (x["gene"], x["gene_id"], x["detail"]), data.values())))
self.record.info["GENE"] = genes
self.record.info["GENE_ID"] = gene_ids
self.record.info["DETAIL"] = details
def anno_cytoband(self, tbx: TabixFile):
start_name, end_name = "", ""
records = [line.rstrip().split("\t") for line in self.query_tabix(tbx, True)]
if records:
start_name = records[0][3]
end_name = records[-1][3]
name = start_name if start_name == end_name else f"{start_name}{end_name}"
self.record.info["LOCATION"] = re.sub("[Cc][Hh][Rr]", "", f"{self.record.chrom}{name}")
def anno_dgv(self, tbx: TabixFile, overlap: float):
texts = []
for line in self.query_tabix(tbx):
row = line.rstrip().split("\t")
accession, start, end, an, gain_ac, loss_ac = row[0], int(row[2]), int(row[3]), int(row[14]), row[15], row[16]
if gain_ac and loss_ac:
gain_ac, loss_ac = int(gain_ac), int(loss_ac)
if self.validate_overlap(start, end, overlap):
gain_af = gain_ac / an if an else 0
loss_af = loss_ac / an if an else 0
texts.append(f"{accession}:{self.record.chrom}:{start}:{end}:{gain_af}:{loss_af}")
if texts:
self.record.info["DGV"] = "|".join(texts)
def anno_decipher(self, tbx: TabixFile, overlap: float):
texts = []
for line in self.query_tabix(tbx):
row = line.rstrip().split("\t")
accession, start, end, an, gain_ac, loss_ac = row[0], int(row[2]), int(row[3]), int(row[4]), row[7], row[14]
if gain_ac and loss_ac:
gain_ac, loss_ac = int(gain_ac), int(loss_ac)
if self.validate_overlap(start, end, overlap):
gain_af = gain_ac / an if an else 0
loss_af = loss_ac / an if an else 0
texts.append(f"{accession}:{self.record.chrom}:{start}:{end}:{gain_af}:{loss_af}")
if texts:
self.record.info["DECIPHER"] = "|".join(texts)
def anno_clinvar(self, tbx: TabixFile):
texts = []
for line in self.query_tabix(tbx):
row = line.rstrip().split("\t")
typo, clnsig, assembly, start, end, revstat, variant_id = (row[1], row[6], row[16], int(row[19]), int(row[20]), row[24], row[30])
if assembly == "GRCh38" and typo.startswith("copy_number"):
clnsig = quote(clnsig)
texts.append(f"{variant_id}:{self.record.chrom}:{start}:{end}:{revstat}:{clnsig}")
if texts:
self.record.info["CLINVAR"] = "|".join(texts)
def anno_clingen(self, tbx: TabixFile):
texts = []
for line in self.query_tabix(tbx):
row = line.rstrip().split("\t")
start, end, hi_score, ts_score = int(row[1]), int(row[2]), row[3], row[4]
texts.append(f"{self.record.chrom}:{start}:{end}:{hi_score}:{ts_score}")
if texts:
self.record.info["CLINGEN"] = "|".join(texts)
def anno_local(self, tbx: TabixFile, overlap: float):
texts = []
for line in self.query_tabix(tbx):
row = line.rstrip().split("\t")
start, end, gain_ac, loss_ac, an, gain_af, loss_af = int(row[1]), int(row[2]), row[3], row[4], row[5], row[6], row[7]
if gain_ac and loss_ac:
if self.validate_overlap(start, end, overlap):
texts.append(f"{self.record.chrom}:{start}:{end}:{gain_ac}:{loss_ac}:{an}:{gain_af}:{loss_af}")
if texts:
self.record.info["LOCAL"] = "|".join(texts)
LeonDL168
- 粉丝: 2935
- 资源: 779
最新资源
- 基于树莓派的人脸识别全部资料+详细文档+高分项目.zip
- 基于树莓派的甲醛,二氧化碳等环境监控全部资料+详细文档+高分项目.zip
- 基于树莓派的实时图传&数传(天空端)全部资料+详细文档+高分项目.zip
- 基于树莓派的食堂点餐系统嵌入式课设,全部资料+详细文档+高分项目.zip
- 基于树莓派的双目视觉智能小车全部资料+详细文档+高分项目.zip
- 基于树莓派的延时摄影程序全部资料+详细文档+高分项目.zip
- 基于树莓派和NODE的智能镜子项目全部资料+详细文档+高分项目.zip
- 基于树莓派的医疗语音识别应用全部资料+详细文档+高分项目.zip
- 基于树莓派使用运营商网络的免流量WIFI路由器全部资料+详细文档+高分项目.zip
- 基于树莓派网页控制LED和视频监控的项目全部资料+详细文档+高分项目.zip
- 基于树莓派实现ADIS16505 IMU的数据采集全部资料+详细文档+高分项目.zip
- 基于腾讯云IOT平台实现树莓派上面的蜂鸣器控制全部资料+详细文档+高分项目.zip
- 焊接机器人的分类及应用 - .pdf
- 焊接机器人工作站系统中焊接工艺的设计 - .pdf
- 焊接机器人工作站系统设计原则探讨 - .pdf
- 焊接机器人工作站在VHS高速列车转向架构架生产中的应用 - .pdf
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈