注释
#java -Xmx10G -jar ~/snpEff/snpEff.jar eff -c ~/soft/snpeff/snpEff/snpEff.config NIP pop3.vcf> snpann.vcf -csvStats snpann.csv -stats snpann.html
#!/bin/bash
# 配置snpEff的路径和配置文件
SNPEFF_JAR=~/snpEff/snpEff.jar
SNPEFF_CONFIG=~/soft/snpeff/snpEff/snpEff.config
GENOME=NIP
# 遍历当前目录下所有.vcf文件
for vcf_file in *.vcf; do
# 检查是否存在.vcf文件
if [ -f "$vcf_file" ]; then
# 提取文件名(不含扩展名)作为输出前缀
base_name=$(basename "$vcf_file" .vcf)
echo "正在处理文件: $vcf_file"
# 执行snpEff命令,输出文件以原文件名加后缀命名
java -Xmx10G -jar "$SNPEFF_JAR" eff -c "$SNPEFF_CONFIG" \
-csvStats "${base_name}_ann.csv" \
-stats "${base_name}_ann.html" \
"$GENOME" "$vcf_file" > "${base_name}_ann.vcf"
echo "处理完成,输出文件: ${base_name}_ann.vcf"
echo "统计文件: ${base_name}_ann.csv 和 ${base_name}_ann.html"
echo "----------------------------------------"
fi
done
echo "所有VCF文件处理完毕"
统计
import argparse
import glob
import os
from typing import List, Dict
def read_gid_file(gid_path: str) -> Dict[str, Dict[str, int]]:
"""读取gid文件,为每个基因初始化计数器"""
gene_counters = {}
try:
with open(gid_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
gene_id = line.strip()
if not gene_id:
continue
# 为每个基因初始化计数器
gene_counters[gene_id] = {
'INS': 0,
'DEL': 0,
'DUP': 0,
'INV': 0,
'OTHER': 0
}
if not gene_counters:
raise ValueError("gid文件中未找到有效基因标识")
print(f"✅ 从 {gid_path} 读取到 {len(gene_counters)} 个基因标识")
return gene_counters
except FileNotFoundError:
raise FileNotFoundError(f"❌ 未找到gid文件:{gid_path}")
except Exception as e:
raise Exception(f"❌ 读取gid文件失败(行号 {line_num}):{str(e)}")
def count_svtypes_per_gene(vcf_input: str, gene_counters: Dict[str, Dict[str, int]]) -> None:
"""为每个基因统计各种SVTYPE的出现次数"""
target_genes = set(gene_counters.keys())
total_matched_lines = 0
total_variant_count = 0
try:
with open(vcf_input, 'r', encoding='utf-8') as in_f:
for line in in_f:
line_strip = line.strip()
# 跳过头部行
if line_strip.startswith('#'):
continue
# 处理变异行
total_variant_count += 1
vcf_fields = line_strip.split('\t')
# 检查是否有足够的列
if len(vcf_fields) < 8:
print(f"⚠️ 跳过无效变异行(列数不足):{line_strip[:50]}...")
continue
# 提取INFO字段
info_field = vcf_fields[7]
info_entries = info_field.split(';')
# 查找ANN条目
ann_entry = next((entry for entry in info_entries if entry.startswith('ANN=')), None)
if not ann_entry:
continue # 没有ANN字段,跳过
# 解析ANN字段
ann_value = ann_entry[4:] # 去除"ANN="前缀
ann_parts = ann_value.split('|')
# 提取SVTYPE
svtype = "OTHER"
svtype_entry = next((entry for entry in info_entries if entry.startswith('SVTYPE=')), None)
if svtype_entry:
svtype = svtype_entry.split('=')[1].strip()
# 确保是我们关注的类型
if svtype not in ['INS', 'DEL', 'DUP', 'INV']:
svtype = "OTHER"
# 检查哪些目标基因在ANN字段中出现
matched_genes = []
for part in ann_parts:
for gene in target_genes:
if gene in part:
matched_genes.append(gene)
# 为每个匹配的基因更新计数器
if matched_genes:
total_matched_lines += 1
# 去重,避免同一行中同一基因被多次计数
unique_matched_genes = list(set(matched_genes))
for gene in unique_matched_genes:
gene_counters[gene][svtype] += 1
print(f"\n📊 {vcf_input} 统计完成!")
print(f" - 总变异行数(不含头部):{total_variant_count}")
print(f" - 匹配到的行数:{total_matched_lines}")
except FileNotFoundError:
raise FileNotFoundError(f"❌ 未找到VCF文件:{vcf_input}")
except Exception as e:
raise Exception(f"❌ 统计失败:{str(e)}")
def print_summary(gene_counters: Dict[str, Dict[str, int]], vcf_files: List[str]) -> None:
"""打印所有文件的汇总统计结果"""
print("\n" + "="*60)
print(f"📋 所有 {len(vcf_files)} 个VCF文件的汇总统计结果:")
print(f"{'基因ID':<15} {'INS':<5} {'DEL':<5} {'DUP':<5} {'INV':<5} {'OTHER':<5} {'总计':<5}")
print("-" * 60)
total_all = 0
for gene_id, counts in gene_counters.items():
gene_total = sum(counts.values())
total_all += gene_total
print(f"{gene_id:<15} {counts['INS']:<5} {counts['DEL']:<5} {counts['DUP']:<5} {counts['INV']:<5} {counts['OTHER']:<5} {gene_total:<5}")
print("-" * 60)
print(f"{'总计':<15} {sum(c['INS'] for c in gene_counters.values()):<5} "
f"{sum(c['DEL'] for c in gene_counters.values()):<5} "
f"{sum(c['DUP'] for c in gene_counters.values()):<5} "
f"{sum(c['INV'] for c in gene_counters.values()):<5} "
f"{sum(c['OTHER'] for c in gene_counters.values()):<5} {total_all:<5}")
def main():
parser = argparse.ArgumentParser(
description="为每个基因统计当前文件夹下所有VCF文件中INS、DEL、DUP、INV的出现次数",
epilog="示例:python count_svtypes_all_vcf.py -g gid.txt"
)
parser.add_argument('-g', '--gid', required=True, help="gid文件路径(每行一个基因标识)")
parser.add_argument('-p', '--pattern', default='*_ann.vcf',
help="VCF文件匹配模式,默认为'*_ann.vcf'")
args = parser.parse_args()
try:
# 读取基因列表并初始化计数器
gene_counters = read_gid_file(args.gid)
# 查找当前文件夹下所有匹配的VCF文件
vcf_files = glob.glob(args.pattern)
if not vcf_files:
raise FileNotFoundError(f"❌ 未找到匹配的VCF文件,模式:{args.pattern}")
print(f"✅ 找到 {len(vcf_files)} 个VCF文件,开始处理...")
# 逐个处理每个VCF文件
for i, vcf_file in enumerate(vcf_files, 1):
print(f"\n{'='*20} 处理文件 {i}/{len(vcf_files)} {'='*20}")
count_svtypes_per_gene(vcf_file, gene_counters)
# 打印汇总结果
print_summary(gene_counters, vcf_files)
print("\n🎉 所有文件处理完成!")
except Exception as e:
print(f"\n❌ 任务失败:{str(e)}")
if __name__ == "__main__":
main()
合并结果
import os
def merge_txt_files(output_filename="merged_files.txt", delimiter="\t"):
# 获取当前目录下所有的txt文件
txt_files = [f for f in os.listdir('.') if f.endswith('.txt') and os.path.isfile(f)]
if not txt_files:
print("未找到任何txt文件")
return
# 按文件名排序
txt_files.sort()
with open(output_filename, 'w', encoding='utf-8') as outfile:
total_files = len(txt_files)
print(f"找到 {total_files} 个txt文件,开始合并...")
for i, filename in enumerate(txt_files, 1):
# 移除文件名的.txt后缀作为标识
file_identifier = os.path.splitext(filename)[0]
print(f"正在处理 {i}/{total_files}: {filename}")
try:
with open(filename, 'r', encoding='utf-8') as infile:
# 读取所有行
lines = infile.readlines()
# 从第8行开始取(索引为7,因为Python从0开始计数)
if len(lines) >= 8:
content_lines = lines[7:]
else:
# 如果文件不足8行,取所有内容并提示
content_lines = lines
print(f"警告: {filename} 不足8行,已将所有内容加入")
# 写入内容,每行前添加文件名作为第一列
# outfile.write(f"=== 开始: {filename} ===\n")
for line in content_lines:
# 去除行尾的换行符,然后添加分隔符、文件名和新的换行符
cleaned_line = line.rstrip('\n')
outfile.write(f"{file_identifier}{delimiter}{cleaned_line}\n")
# outfile.write(f"=== 结束: {filename} ===\n\n")
except Exception as e:
print(f"处理 {filename} 时出错: {str(e)}")
print(f"合并完成!结果已保存到 {output_filename}")
if __name__ == "__main__":
# 使用制表符作为列分隔符,如需其他分隔符可修改此处(如",")
merge_txt_files(delimiter="\t")
合并group
import csv
import sys
def merge_files(group_file="group", merged_file="merged_file.txt", output_file="merged_result.txt", delimiter="\t"):
"""
按sample列合并两个文件
参数:
group_file: group文件路径
merged_file: merged_file.txt文件路径
output_file: 合并结果输出路径
delimiter: 文件分隔符,默认为制表符
"""
# 读取group文件,存储为字典,以sample为键
group_data = {}
group_headers = []
try:
with open(group_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=delimiter)
group_headers = reader.fieldnames
# 检查是否包含sample列
if 'sample' not in group_headers:
print(f"错误: {group_file} 中未找到'sample'列")
return
for row in reader:
sample = row['sample']
if sample in group_data:
print(f"警告: {group_file} 中'sample'列存在重复值: {sample},仅保留最后一行")
group_data[sample] = row
print(f"成功读取 {group_file},共 {len(group_data)} 条记录")
except FileNotFoundError:
print(f"错误: 找不到文件 {group_file}")
return
except Exception as e:
print(f"读取 {group_file} 时出错: {str(e)}")
return
# 读取merged_file.txt并合并
merged_headers = []
output_headers = []
total_merged = 0
try:
with open(merged_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', encoding='utf-8', newline='') as outfile:
reader = csv.DictReader(infile, delimiter=delimiter)
merged_headers = reader.fieldnames
# 检查是否包含sample列
if 'sample' not in merged_headers:
print(f"错误: {merged_file} 中未找到'sample'列")
return
# 确定输出文件的表头(合并两个文件的表头,去除重复的sample列)
output_headers = group_headers.copy()
for header in merged_headers:
if header not in output_headers:
output_headers.append(header)
writer = csv.DictWriter(outfile, fieldnames=output_headers, delimiter=delimiter)
writer.writeheader()
# 处理每一行,进行合并
for row in reader:
sample = row['sample']
if sample in group_data:
# 合并两行数据,group_data中的数据优先
merged_row = {**group_data[sample],** row}
writer.writerow(merged_row)
total_merged += 1
# 可以添加else分支来处理只存在于merged_file中的sample
print(f"成功处理 {merged_file},共合并 {total_merged} 条匹配记录")
print(f"合并结果已保存到 {output_file}")
except FileNotFoundError:
print(f"错误: 找不到文件 {merged_file}")
return
except Exception as e:
print(f"处理 {merged_file} 时出错: {str(e)}")
return
if __name__ == "__main__":
# 支持命令行参数指定输入文件
group_file = sys.argv[1] if len(sys.argv) > 1 else "group"
merged_file = sys.argv[2] if len(sys.argv) > 2 else "merged_file.txt"
merge_files(group_file, merged_file)
```
替换
import csv
import os
def process_file(input_file="merged_result.txt", output_file="processed_result.txt", delimiter="\t"):
"""
处理文件,将第四列之后所有数值大于1的替换为1
参数:
input_file: 输入文件路径
output_file: 输出文件路径
delimiter: 文件分隔符,默认为制表符
"""
if not os.path.exists(input_file):
print(f"错误: 找不到输入文件 {input_file}")
return
processed_rows = 0
modified_cells = 0
try:
with open(input_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', encoding='utf-8', newline='') as outfile:
reader = csv.reader(infile, delimiter=delimiter)
writer = csv.writer(outfile, delimiter=delimiter)
# 读取并写入表头
headers = next(reader)
writer.writerow(headers)
processed_rows += 1
# 处理每一行数据
for row in reader:
processed_rows += 1
# 检查行是否有足够的列
if len(row) < 4:
print(f"警告: 第 {processed_rows} 行列数不足4列,将原样保留")
writer.writerow(row)
continue
# 处理第四列之后的列(索引从4开始,因为Python从0计数)
for i in range(3, len(row)):
# 尝试将值转换为数值
try:
value = float(row[i])
if value > 1:
row[i] = "1" # 替换为1
modified_cells += 1
except (ValueError, TypeError):
# 如果无法转换为数值,不做处理
pass
writer.writerow(row)
print(f"处理完成!")
print(f"共处理 {processed_rows} 行数据")
print(f"共修改 {modified_cells} 个大于1的数值")
print(f"处理结果已保存到 {output_file}")
except Exception as e:
print(f"处理文件时出错: {str(e)}")
if __name__ == "__main__":
# 可以修改输入输出文件路径
process_file(
input_file="merged_result.txt",
output_file="processed_result.txt",
delimiter="\t"
)
统计结果
import csv
import os
from collections import defaultdict
def count_variants(input_file="processed_result.txt", output_file="variant_counts.txt", delimiter="\t"):
"""
按Group和gene列分组,统计INS、DEL、DUP、INV列的总数
参数:
input_file: 输入文件路径
output_file: 输出结果文件路径
delimiter: 文件分隔符,默认为制表符
"""
# 检查输入文件是否存在
if not os.path.exists(input_file):
print(f"错误: 找不到输入文件 {input_file}")
return
# 定义需要统计的列
count_columns = ["INS", "DEL", "DUP", "INV"]
# 用于存储统计结果的字典
# 结构: { (Group, gene): { "INS": 总数, "DEL": 总数, ... }, ... }
stats = defaultdict(lambda: defaultdict(int))
try:
with open(input_file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile, delimiter=delimiter)
headers = reader.fieldnames
# 检查必要的列是否存在
required_columns = ["Group", "gene"] + count_columns
missing_columns = [col for col in required_columns if col not in headers]
if missing_columns:
print(f"错误: 输入文件缺少必要的列: {', '.join(missing_columns)}")
return
# 读取并统计数据
total_rows = 0
for row in reader:
total_rows += 1
group = row["Group"]
gene = row["gene"]
# 统计各列数值
for col in count_columns:
try:
# 尝试将值转换为整数并累加
value = int(row[col])
stats[(group, gene)][col] += value
except (ValueError, TypeError):
# 如果无法转换为整数,跳过该值
print(f"警告: 第 {total_rows} 行的 {col} 列值 '{row[col]}' 不是有效数字,已跳过")
print(f"成功读取 {total_rows} 行数据")
# 写入统计结果
with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
# 输出文件表头
output_headers = ["Group", "gene"] + count_columns
writer = csv.DictWriter(outfile, fieldnames=output_headers, delimiter=delimiter)
writer.writeheader()
# 按Group和gene排序并写入结果
for (group, gene), counts in sorted(stats.items()):
result_row = {
"Group": group,
"gene": gene,
"INS": counts["INS"],
"DEL": counts["DEL"],
"DUP": counts["DUP"],
"INV": counts["INV"]
}
writer.writerow(result_row)
print(f"统计完成!共统计 {len(stats)} 个不同的(Group, gene)组合")
print(f"统计结果已保存到 {output_file}")
except Exception as e:
print(f"处理文件时出错: {str(e)}")
if __name__ == "__main__":
count_variants(
input_file="processed_result.txt",
output_file="variant_counts.txt",
delimiter="\t"
)
```











网友评论