美文网首页
统计sv结果

统计sv结果

作者: 花生学生信 | 来源:发表于2025-09-24 15:04 被阅读0次

注释

#java -Xmx10G -jar ~/snpEff/snpEff.jar eff -c ~/soft/snpeff/snpEff/snpEff.config NIP pop3.vcf>  snpann.vcf -csvStats snpann.csv -stats snpann.html
#!/bin/bash

# 配置snpEff的路径和配置文件
SNPEFF_JAR=~/snpEff/snpEff.jar
SNPEFF_CONFIG=~/soft/snpeff/snpEff/snpEff.config
GENOME=NIP

# 遍历当前目录下所有.vcf文件
for vcf_file in *.vcf; do
    # 检查是否存在.vcf文件
    if [ -f "$vcf_file" ]; then
        # 提取文件名(不含扩展名)作为输出前缀
        base_name=$(basename "$vcf_file" .vcf)
        
        echo "正在处理文件: $vcf_file"
        # 执行snpEff命令,输出文件以原文件名加后缀命名
        java -Xmx10G -jar "$SNPEFF_JAR" eff -c "$SNPEFF_CONFIG" \
            -csvStats "${base_name}_ann.csv" \
            -stats "${base_name}_ann.html" \
            "$GENOME" "$vcf_file" > "${base_name}_ann.vcf"
        
        echo "处理完成,输出文件: ${base_name}_ann.vcf"
        echo "统计文件: ${base_name}_ann.csv 和 ${base_name}_ann.html"
        echo "----------------------------------------"
    fi
done

echo "所有VCF文件处理完毕"

统计

import argparse
import glob
import os
from typing import List, Dict


def read_gid_file(gid_path: str) -> Dict[str, Dict[str, int]]:
    """读取gid文件,为每个基因初始化计数器"""
    gene_counters = {}
    try:
        with open(gid_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                gene_id = line.strip()
                if not gene_id:
                    continue
                # 为每个基因初始化计数器
                gene_counters[gene_id] = {
                    'INS': 0,
                    'DEL': 0,
                    'DUP': 0,
                    'INV': 0,
                    'OTHER': 0
                }
        
        if not gene_counters:
            raise ValueError("gid文件中未找到有效基因标识")
        print(f"✅ 从 {gid_path} 读取到 {len(gene_counters)} 个基因标识")
        return gene_counters
    except FileNotFoundError:
        raise FileNotFoundError(f"❌ 未找到gid文件:{gid_path}")
    except Exception as e:
        raise Exception(f"❌ 读取gid文件失败(行号 {line_num}):{str(e)}")


def count_svtypes_per_gene(vcf_input: str, gene_counters: Dict[str, Dict[str, int]]) -> None:
    """为每个基因统计各种SVTYPE的出现次数"""
    target_genes = set(gene_counters.keys())
    total_matched_lines = 0
    total_variant_count = 0

    try:
        with open(vcf_input, 'r', encoding='utf-8') as in_f:
            for line in in_f:
                line_strip = line.strip()
                # 跳过头部行
                if line_strip.startswith('#'):
                    continue
                
                # 处理变异行
                total_variant_count += 1
                vcf_fields = line_strip.split('\t')
                
                # 检查是否有足够的列
                if len(vcf_fields) < 8:
                    print(f"⚠️  跳过无效变异行(列数不足):{line_strip[:50]}...")
                    continue
                
                # 提取INFO字段
                info_field = vcf_fields[7]
                info_entries = info_field.split(';')
                
                # 查找ANN条目
                ann_entry = next((entry for entry in info_entries if entry.startswith('ANN=')), None)
                if not ann_entry:
                    continue  # 没有ANN字段,跳过
                
                # 解析ANN字段
                ann_value = ann_entry[4:]  # 去除"ANN="前缀
                ann_parts = ann_value.split('|')
                
                # 提取SVTYPE
                svtype = "OTHER"
                svtype_entry = next((entry for entry in info_entries if entry.startswith('SVTYPE=')), None)
                if svtype_entry:
                    svtype = svtype_entry.split('=')[1].strip()
                    # 确保是我们关注的类型
                    if svtype not in ['INS', 'DEL', 'DUP', 'INV']:
                        svtype = "OTHER"
                
                # 检查哪些目标基因在ANN字段中出现
                matched_genes = []
                for part in ann_parts:
                    for gene in target_genes:
                        if gene in part:
                            matched_genes.append(gene)
                
                # 为每个匹配的基因更新计数器
                if matched_genes:
                    total_matched_lines += 1
                    # 去重,避免同一行中同一基因被多次计数
                    unique_matched_genes = list(set(matched_genes))
                    for gene in unique_matched_genes:
                        gene_counters[gene][svtype] += 1

        print(f"\n📊 {vcf_input} 统计完成!")
        print(f"   - 总变异行数(不含头部):{total_variant_count}")
        print(f"   - 匹配到的行数:{total_matched_lines}")
        
    except FileNotFoundError:
        raise FileNotFoundError(f"❌ 未找到VCF文件:{vcf_input}")
    except Exception as e:
        raise Exception(f"❌ 统计失败:{str(e)}")


def print_summary(gene_counters: Dict[str, Dict[str, int]], vcf_files: List[str]) -> None:
    """打印所有文件的汇总统计结果"""
    print("\n" + "="*60)
    print(f"📋 所有 {len(vcf_files)} 个VCF文件的汇总统计结果:")
    print(f"{'基因ID':<15} {'INS':<5} {'DEL':<5} {'DUP':<5} {'INV':<5} {'OTHER':<5} {'总计':<5}")
    print("-" * 60)
    
    total_all = 0
    for gene_id, counts in gene_counters.items():
        gene_total = sum(counts.values())
        total_all += gene_total
        print(f"{gene_id:<15} {counts['INS']:<5} {counts['DEL']:<5} {counts['DUP']:<5} {counts['INV']:<5} {counts['OTHER']:<5} {gene_total:<5}")
    
    print("-" * 60)
    print(f"{'总计':<15} {sum(c['INS'] for c in gene_counters.values()):<5} "
          f"{sum(c['DEL'] for c in gene_counters.values()):<5} "
          f"{sum(c['DUP'] for c in gene_counters.values()):<5} "
          f"{sum(c['INV'] for c in gene_counters.values()):<5} "
          f"{sum(c['OTHER'] for c in gene_counters.values()):<5} {total_all:<5}")


def main():
    parser = argparse.ArgumentParser(
        description="为每个基因统计当前文件夹下所有VCF文件中INS、DEL、DUP、INV的出现次数",
        epilog="示例:python count_svtypes_all_vcf.py -g gid.txt"
    )
    parser.add_argument('-g', '--gid', required=True, help="gid文件路径(每行一个基因标识)")
    parser.add_argument('-p', '--pattern', default='*_ann.vcf', 
                      help="VCF文件匹配模式,默认为'*_ann.vcf'")
    
    args = parser.parse_args()

    try:
        # 读取基因列表并初始化计数器
        gene_counters = read_gid_file(args.gid)
        
        # 查找当前文件夹下所有匹配的VCF文件
        vcf_files = glob.glob(args.pattern)
        
        if not vcf_files:
            raise FileNotFoundError(f"❌ 未找到匹配的VCF文件,模式:{args.pattern}")
        
        print(f"✅ 找到 {len(vcf_files)} 个VCF文件,开始处理...")
        
        # 逐个处理每个VCF文件
        for i, vcf_file in enumerate(vcf_files, 1):
            print(f"\n{'='*20} 处理文件 {i}/{len(vcf_files)} {'='*20}")
            count_svtypes_per_gene(vcf_file, gene_counters)
        
        # 打印汇总结果
        print_summary(gene_counters, vcf_files)
        
        print("\n🎉 所有文件处理完成!")
    except Exception as e:
        print(f"\n❌ 任务失败:{str(e)}")


if __name__ == "__main__":
    main()
    


合并结果

import os

def merge_txt_files(output_filename="merged_files.txt", delimiter="\t"):
    # 获取当前目录下所有的txt文件
    txt_files = [f for f in os.listdir('.') if f.endswith('.txt') and os.path.isfile(f)]
    
    if not txt_files:
        print("未找到任何txt文件")
        return
    
    # 按文件名排序
    txt_files.sort()
    
    with open(output_filename, 'w', encoding='utf-8') as outfile:
        total_files = len(txt_files)
        print(f"找到 {total_files} 个txt文件,开始合并...")
        
        for i, filename in enumerate(txt_files, 1):
            # 移除文件名的.txt后缀作为标识
            file_identifier = os.path.splitext(filename)[0]
            print(f"正在处理 {i}/{total_files}: {filename}")
            
            try:
                with open(filename, 'r', encoding='utf-8') as infile:
                    # 读取所有行
                    lines = infile.readlines()
                    
                    # 从第8行开始取(索引为7,因为Python从0开始计数)
                    if len(lines) >= 8:
                        content_lines = lines[7:]
                    else:
                        # 如果文件不足8行,取所有内容并提示
                        content_lines = lines
                        print(f"警告: {filename} 不足8行,已将所有内容加入")
                    
                    # 写入内容,每行前添加文件名作为第一列
             #       outfile.write(f"=== 开始: {filename} ===\n")
                    for line in content_lines:
                        # 去除行尾的换行符,然后添加分隔符、文件名和新的换行符
                        cleaned_line = line.rstrip('\n')
                        outfile.write(f"{file_identifier}{delimiter}{cleaned_line}\n")
              #      outfile.write(f"=== 结束: {filename} ===\n\n")
            
            except Exception as e:
                print(f"处理 {filename} 时出错: {str(e)}")
    
    print(f"合并完成!结果已保存到 {output_filename}")

if __name__ == "__main__":
    # 使用制表符作为列分隔符,如需其他分隔符可修改此处(如",")
    merge_txt_files(delimiter="\t")

合并group

import csv
import sys

def merge_files(group_file="group", merged_file="merged_file.txt", output_file="merged_result.txt", delimiter="\t"):
    """
    按sample列合并两个文件
    
    参数:
        group_file: group文件路径
        merged_file: merged_file.txt文件路径
        output_file: 合并结果输出路径
        delimiter: 文件分隔符,默认为制表符
    """
    # 读取group文件,存储为字典,以sample为键
    group_data = {}
    group_headers = []
    
    try:
        with open(group_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter=delimiter)
            group_headers = reader.fieldnames
            
            # 检查是否包含sample列
            if 'sample' not in group_headers:
                print(f"错误: {group_file} 中未找到'sample'列")
                return
            
            for row in reader:
                sample = row['sample']
                if sample in group_data:
                    print(f"警告: {group_file} 中'sample'列存在重复值: {sample},仅保留最后一行")
                group_data[sample] = row
                
        print(f"成功读取 {group_file},共 {len(group_data)} 条记录")
        
    except FileNotFoundError:
        print(f"错误: 找不到文件 {group_file}")
        return
    except Exception as e:
        print(f"读取 {group_file} 时出错: {str(e)}")
        return
    
    # 读取merged_file.txt并合并
    merged_headers = []
    output_headers = []
    total_merged = 0
    
    try:
        with open(merged_file, 'r', encoding='utf-8') as infile, \
             open(output_file, 'w', encoding='utf-8', newline='') as outfile:
            
            reader = csv.DictReader(infile, delimiter=delimiter)
            merged_headers = reader.fieldnames
            
            # 检查是否包含sample列
            if 'sample' not in merged_headers:
                print(f"错误: {merged_file} 中未找到'sample'列")
                return
            
            # 确定输出文件的表头(合并两个文件的表头,去除重复的sample列)
            output_headers = group_headers.copy()
            for header in merged_headers:
                if header not in output_headers:
                    output_headers.append(header)
            
            writer = csv.DictWriter(outfile, fieldnames=output_headers, delimiter=delimiter)
            writer.writeheader()
            
            # 处理每一行,进行合并
            for row in reader:
                sample = row['sample']
                if sample in group_data:
                    # 合并两行数据,group_data中的数据优先
                    merged_row = {**group_data[sample],** row}
                    writer.writerow(merged_row)
                    total_merged += 1
                # 可以添加else分支来处理只存在于merged_file中的sample
            
            print(f"成功处理 {merged_file},共合并 {total_merged} 条匹配记录")
            print(f"合并结果已保存到 {output_file}")
            
    except FileNotFoundError:
        print(f"错误: 找不到文件 {merged_file}")
        return
    except Exception as e:
        print(f"处理 {merged_file} 时出错: {str(e)}")
        return

if __name__ == "__main__":
    # 支持命令行参数指定输入文件
    group_file = sys.argv[1] if len(sys.argv) > 1 else "group"
    merged_file = sys.argv[2] if len(sys.argv) > 2 else "merged_file.txt"
    merge_files(group_file, merged_file)
    ```

替换

import csv
import os

def process_file(input_file="merged_result.txt", output_file="processed_result.txt", delimiter="\t"):
    """
    处理文件,将第四列之后所有数值大于1的替换为1
    
    参数:
        input_file: 输入文件路径
        output_file: 输出文件路径
        delimiter: 文件分隔符,默认为制表符
    """
    if not os.path.exists(input_file):
        print(f"错误: 找不到输入文件 {input_file}")
        return
    
    processed_rows = 0
    modified_cells = 0
    
    try:
        with open(input_file, 'r', encoding='utf-8') as infile, \
             open(output_file, 'w', encoding='utf-8', newline='') as outfile:
            
            reader = csv.reader(infile, delimiter=delimiter)
            writer = csv.writer(outfile, delimiter=delimiter)
            
            # 读取并写入表头
            headers = next(reader)
            writer.writerow(headers)
            processed_rows += 1
            
            # 处理每一行数据
            for row in reader:
                processed_rows += 1
                # 检查行是否有足够的列
                if len(row) < 4:
                    print(f"警告: 第 {processed_rows} 行列数不足4列,将原样保留")
                    writer.writerow(row)
                    continue
                
                # 处理第四列之后的列(索引从4开始,因为Python从0计数)
                for i in range(3, len(row)):
                    # 尝试将值转换为数值
                    try:
                        value = float(row[i])
                        if value > 1:
                            row[i] = "1"  # 替换为1
                            modified_cells += 1
                    except (ValueError, TypeError):
                        # 如果无法转换为数值,不做处理
                        pass
                
                writer.writerow(row)
        
        print(f"处理完成!")
        print(f"共处理 {processed_rows} 行数据")
        print(f"共修改 {modified_cells} 个大于1的数值")
        print(f"处理结果已保存到 {output_file}")
        
    except Exception as e:
        print(f"处理文件时出错: {str(e)}")

if __name__ == "__main__":
    # 可以修改输入输出文件路径
    process_file(
        input_file="merged_result.txt",
        output_file="processed_result.txt",
        delimiter="\t"
    )

统计结果

import csv
import os
from collections import defaultdict

def count_variants(input_file="processed_result.txt", output_file="variant_counts.txt", delimiter="\t"):
    """
    按Group和gene列分组,统计INS、DEL、DUP、INV列的总数
    
    参数:
        input_file: 输入文件路径
        output_file: 输出结果文件路径
        delimiter: 文件分隔符,默认为制表符
    """
    # 检查输入文件是否存在
    if not os.path.exists(input_file):
        print(f"错误: 找不到输入文件 {input_file}")
        return
    
    # 定义需要统计的列
    count_columns = ["INS", "DEL", "DUP", "INV"]
    
    # 用于存储统计结果的字典
    # 结构: { (Group, gene): { "INS": 总数, "DEL": 总数, ... }, ... }
    stats = defaultdict(lambda: defaultdict(int))
    
    try:
        with open(input_file, 'r', encoding='utf-8') as infile:
            reader = csv.DictReader(infile, delimiter=delimiter)
            headers = reader.fieldnames
            
            # 检查必要的列是否存在
            required_columns = ["Group", "gene"] + count_columns
            missing_columns = [col for col in required_columns if col not in headers]
            
            if missing_columns:
                print(f"错误: 输入文件缺少必要的列: {', '.join(missing_columns)}")
                return
            
            # 读取并统计数据
            total_rows = 0
            for row in reader:
                total_rows += 1
                group = row["Group"]
                gene = row["gene"]
                
                # 统计各列数值
                for col in count_columns:
                    try:
                        # 尝试将值转换为整数并累加
                        value = int(row[col])
                        stats[(group, gene)][col] += value
                    except (ValueError, TypeError):
                        # 如果无法转换为整数,跳过该值
                        print(f"警告: 第 {total_rows} 行的 {col} 列值 '{row[col]}' 不是有效数字,已跳过")
            
            print(f"成功读取 {total_rows} 行数据")
            
            # 写入统计结果
            with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
                # 输出文件表头
                output_headers = ["Group", "gene"] + count_columns
                writer = csv.DictWriter(outfile, fieldnames=output_headers, delimiter=delimiter)
                writer.writeheader()
                
                # 按Group和gene排序并写入结果
                for (group, gene), counts in sorted(stats.items()):
                    result_row = {
                        "Group": group,
                        "gene": gene,
                        "INS": counts["INS"],
                        "DEL": counts["DEL"],
                        "DUP": counts["DUP"],
                        "INV": counts["INV"]
                    }
                    writer.writerow(result_row)
            
            print(f"统计完成!共统计 {len(stats)} 个不同的(Group, gene)组合")
            print(f"统计结果已保存到 {output_file}")
            
    except Exception as e:
        print(f"处理文件时出错: {str(e)}")

if __name__ == "__main__":
    count_variants(
        input_file="processed_result.txt",
        output_file="variant_counts.txt",
        delimiter="\t"
    )
    ```

相关文章

网友评论

      本文标题:统计sv结果

      本文链接:https://www.haomeiwen.com/subject/ckgwzjtx.html