Python数据清洗流水线分为读取、校验、转换、输出四阶段:结构化读取并提取元数据;字段级校验生成问题报告;按YAML配置动态执行转换;原子化输出+变更留痕+日志追踪,强调元数据传递、错误隔离与配置解耦。

Python可以通过分阶段、可复用的函数组合,配合配置驱动和日志追踪,构建稳定可控的自动化数据清洗流水线。核心不是写一个大脚本,而是把清洗逻辑拆成“读取→校验→转换→输出”四个可独立测试、按需启用的阶段。
阶段一:结构化读取与元数据感知
避免直接用 pandas.read_csv() 硬编码路径。改用统一入口函数,自动识别文件类型、编码、分隔符,并提取表名、时间戳、版本等元数据,存入上下文字典(如 ctx = {"source": "sales_202405.csv", "dt": "2024-05-01", "schema": {...}})。
- 支持 Excel / CSV / Parquet / JSON 多格式,用
pathlib.Path.suffix判断 - 对 CSV 尝试 utf-8 → gbk → utf-8-sig 编码回退
- 读取时跳过明显乱码行(正则匹配含大量或控制字符的行)
阶段二:字段级规则校验与问题标记
不直接删数据,而是生成清洗报告(DataFrame 或 dict),记录每列的空值率、异常值数量、格式错误位置、业务规则冲突行索引。例如:
- 手机号列:用正则
r"^1[3-9]\d{9}$"校验,标出不匹配的行号 - 金额列:检查是否全为数字+小数点,且值 ≥ 0;负数单独归类为“待复核”
- 日期列:用
pd.to_datetime(..., errors="coerce")转换后检查 NaT 比例
结果存为 ctx["report"],后续阶段可据此决定是否中断流程或启用修复策略。
立即学习“Python免费学习笔记(深入)”;
阶段三:按配置执行转换与填充
把清洗动作参数化:哪些列要标准化(如大小写、去空格)、哪些缺失值用均值/前向填充/固定值补、哪些枚举字段映射别名。配置存在 YAML 文件中,例如:
columns:
user_name: {strip: true, upper: false}
status: {map: {"A": "active", "I": "inactive"}}
amount: {fillna: "median"}
清洗函数根据该配置动态调用对应方法,不写死逻辑,方便 QA 修改规则而不动代码。
阶段四:原子化输出与变更留痕
清洗后数据不直接覆盖原文件。而是:
— 输出清洗后 CSV/Parquet 到 output/cleaned/,带时间戳命名
— 同步生成 diff/ 目录,存每列的变更统计(如 “phone: 12 行格式修正,3 行设为空”)
— 记录完整执行日志(含输入哈希、各阶段耗时、报告摘要),便于回溯和监控
- 使用
shutil.copy2()保留原始文件修改时间 - 关键步骤加
try/except+logging.error,失败时仍输出当前中间结果供排查 - 支持 dry-run 模式:只生成报告和 diff,不写入最终文件
基本上就这些。流程本身不复杂,但容易忽略元数据传递、错误隔离和配置解耦——这三点才是让清洗流水线真正可维护的关键。










