
当snakemake工作流在slurm集群上运行时,用户可能会发现,与直接在本地执行或运行外部非python程序(如star)不同,python脚本中的print()语句输出并不会实时显示在slurm的输出文件中,而是在脚本完成或失败后才一次性输出。这通常是由于python的标准输出(stdout)默认是行缓冲或块缓冲的,当输出被重定向到文件(如slurm的.out文件)时,只有缓冲区满、程序结束或显式刷新时,内容才会被写入文件。
为了强制Python实时输出,可以在print()语句后显式地刷新标准输出缓冲区。
使用print()函数的flush参数: 从Python 3.3开始,print()函数支持flush=True参数,可以直接强制刷新。
print("========RUNNING JOB SPLADDER=========", flush=True)
print("\n\n\n", flush=True)
# ... 其他print语句 ...
print(f"running spladder for {genome} with {bam_files}", flush=True)手动导入sys模块并刷新: 对于更复杂的场景或兼容性考虑,可以使用sys.stdout.flush()。
import sys
# ... 在需要立即输出的地方 ...
print("========RUNNING JOB SPLADDER=========")
sys.stdout.flush()
print("\n\n\n")
sys.stdout.flush()
# ...
print(f"running spladder for {genome} with {bam_files}")
sys.stdout.flush()尽管刷新标准输出可以解决实时显示的问题,但这通常只是治标不治本。更深层次的问题可能在于Snakemake规则的设计不符合最佳实践,导致工作流难以管理和扩展。
原始规则将多个基因组的处理逻辑封装在一个Snakemake规则的run块中,这与Snakemake的声明式、基于通配符的并行化设计理念相悖。以下是针对此类问题的重构建议和最佳实践。
Snakemake的核心思想是让每个规则处理一个“单元”或“样本”,通过通配符(wildcards)来定义输入和输出模式,从而让Snakemake调度器自动处理并行化。原始规则在一个run块内循环处理所有基因组,这有以下缺点:
立即学习“Python免费学习笔记(深入)”;
建议: 将规则设计为处理单个基因组或单个样本。
为了实现单单元处理,我们需要将动态的输入文件列表和参数构建逻辑从run块中提取出来。
使用输入函数(Input Functions): Snakemake允许使用Python函数来动态生成规则的输入文件列表。这些函数接收wildcards作为参数,可以根据当前规则实例的通配符值来查找和构建输入。
使用params指令: params指令可以定义规则运行时所需的额外参数,这些参数可以基于通配符或输入文件动态生成,并在shell或run块中通过{params.param_name}访问。
对于执行外部命令行工具(如spladder、STAR等),强烈建议使用shell指令而不是在run块中手动构建命令字符串并调用shell()函数。shell指令提供了更简洁、更安全的方式来执行外部命令,并且Snakemake会自动处理变量替换。
rule all是Snakemake工作流的入口点,它定义了最终需要生成的所有文件。通过expand()函数,可以根据所有可能的通配符组合来生成完整的目标文件列表。在定义rule all时,应确保只请求那些能够被实际生成的输出,避免因某些输入条件不满足而导致Snakemake尝试生成不存在的输出。
以下是根据上述最佳实践重构后的Snakefile示例。
import re
from pathlib import Path
# 假设 accessions 是一个预先加载的 pandas DataFrame
# 例如:
# import pandas as pd
# accessions = pd.DataFrame({
# 'genome_id': ['genomeA', 'genomeB', 'genomeA', 'genomeC'],
# 'rsa_id': ['rsa1', 'rsa2', 'rsa3', 'rsa4']
# }, index=['rsa1', 'rsa2', 'rsa3', 'rsa4'])
# 1. 定义最终目标:rule all
rule all:
'''
定义工作流的最终目标。
这里使用列表推导式和expand函数,
确保只请求那些实际存在rsa_ids的基因组的输出。
'''
input:
expand(
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle",
genome = [
genome_id
for genome_id in accessions['genome_id'].unique()
if len(accessions[accessions['genome_id'] == genome_id]) > 0
]
)
# 2. 定义动态输入函数
def spladder_input(wildcards):
'''
根据通配符 {genome} 动态查找对应的bam文件和基因组注释文件。
'''
filtered_accessions = accessions[accessions['genome_id'] == wildcards.genome]
rsa_ids = filtered_accessions.index.values
return {
'genome_annotation': f"../ressources/genomes/{wildcards.genome}/genomic.gtf",
'bams': expand("data/alignments/{rsa}/{rsa}_Aligned.sortedByCoord.out.bam", rsa=rsa_ids),
}
# 3. 重构 spladder 规则,使其处理单个基因组
rule spladder:
input:
# 使用 unpack 解包 spladder_input 函数返回的字典
unpack(spladder_input)
output:
# 输出文件只包含一个基因组的通配符
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle"
threads: 20 # 根据实际资源情况调整,有时减少线程数增加作业数更优
resources:
mem_mb=1024*20,
runtime=60*8
params:
# 将bams列表转换为逗号分隔的字符串,供命令行使用
bams=lambda wildcards, input: ','.join(input.bams),
# 提取输出文件路径的父目录作为输出目录
outdir=lambda wildcards, output: Path(output).parent
shell:
# 使用 shell 指令,结构清晰,参数通过 {input.key} 和 {params.key} 引用
'mkdir -p {params.outdir} && ' # 确保输出目录存在
'spladder build '
'--set-mm-tag nM '
'--bams {params.bams} '
'--annotation {input.genome_annotation} '
'--outdir {params.outdir} '
'--parallel {threads}'重构说明:
通过上述重构,Snakemake工作流将变得更加健壮、可扩展,并且能更好地利用集群的并行计算能力,同时也能更清晰地管理每个步骤的输入和输出。
以上就是Snakemake规则在Slurm模式下Python输出实时显示与最佳实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号