Airflow企业级ETL核心在于可追溯、可重试、可监控、可维护,需聚焦任务设计、依赖表达、错误隔离与生产配置;DAG须声明业务逻辑而非线性脚本,各task应独立且明确定义IO边界,禁用catchup、限制并发、配置重试、关闭手动触发、埋点上报指标、统一SQL管理、封装业务逻辑、敏感信息走Secrets。

用 Airflow 搭建企业级 ETL 管道,核心不是写多少 DAG,而是让数据流动可追溯、可重试、可监控、可维护。重点在任务设计逻辑、依赖表达方式、错误隔离策略和生产就绪配置。
用 DAG 表达真实业务流,不是把脚本串起来
Airflow 的 DAG 是业务逻辑的声明式映射,不是执行顺序的线性列表。比如“每天同步订单库 → 清洗订单字段 → 关联用户画像 → 写入数仓宽表”,每个环节应独立成 task,且明确输入输出边界。
- 清洗任务不直接连数据库,而是读取上游 task 产出的临时 Parquet 文件路径(通过 XCom 或命名约定)
- 关联任务用 Spark 或 DuckDB 执行,避免在 Python 中做大数据量 join
- 写入宽表前加校验 task:检查行数波动、空值率、关键字段非空比例,失败则中止后续,触发告警
生产环境必须关闭的默认行为
Airflow 开箱即用的配置适合学习,上线前这几项必须改:
- catchup=False:避免补跑历史导致资源打满或重复写入
- max_active_runs=1:同一 DAG 不允许多次并发运行,防止时间窗口错乱(如今天任务还没跑完,明天调度又触发)
- default_args 中设 retries=2, retry_delay=timedelta(minutes=5):网络抖动、临时锁表等瞬时故障自动恢复
- 关闭 UI 上的“Trigger DAG”按钮(用 RBAC 控制),所有触发走 CI/CD 或运维平台
让 ETL 可观测:不只是看绿色圆点
绿色 success 不代表数据正确。要在关键节点埋点:
立即学习“Python免费学习笔记(深入)”;
- 每个 task 结束时,用 PythonOperator 调用内部指标服务,上报处理记录数、耗时、空值字段列表
- 用 SlackAlertOperator 替代默认 email,失败消息带 DAG 名、task_id、log URL、最近 3 行报错堆栈
- 定期用 SQLSensor 检查目标表最新分区是否已生成、行数是否达标,作为下游 DAG 的上游依赖
避免踩坑的三个硬约束
这些不是最佳实践,是血泪教训换来的强制规则:
- 所有 SQL 脚本统一放 dags/sql/ 目录,用 Jinja 模板注入 ds、ds_nodash,禁止在 Python 里拼接 SQL 字符串
- DAG 文件只负责编排,不写业务逻辑;清洗、转换逻辑封装成独立 Python 包,pip install 到 Airflow worker 环境
- 敏感配置(数据库密码、API key)全部走 Airflow Connections + AWS Secrets Manager 后端,DAG 文件里只写 conn_id










