
本文详解在 sqlalchemy 中使用 `bulk_save_objects` 进行高效批量插入时,如何确保 `created_at`/`updated_at` 等数据库默认时间字段被正确回填并返回,同时对比 `add_all` 与 `bulk_save_objects` 的适用场景及性能差异。
在 Flask + MySQL 应用中,批量插入大量记录(如学生成绩)时,为兼顾性能与数据完整性,开发者常面临一个关键矛盾:bulk_save_objects 虽然执行快、网络开销小,但默认不触发 ORM 层的默认值处理逻辑(如 server_default 和 onupdate),导致 created_at/updated_at 字段在 Python 对象中仍为 None;而 add_all() + commit() 虽能自动填充这些字段,却因逐条注册和 flush 开销,在大数据量下显著拖慢性能。
✅ 正确解法:bulk_save_objects + return_defaults=True + 显式 flush
核心要点是:仅设置 return_defaults=True 不够,必须配合 session.flush()(而非 commit())才能将数据库生成的默认值同步回 Python 对象实例。
db.session.commit() 会提交事务,但 return_defaults 的值回填发生在 flush 阶段 —— 即 SQL 发送至数据库并收到响应后立即完成。若跳过 flush 直接 commit,对象状态未更新,created_at/updated_at 仍为空。
以下是修正后的推荐实现:
from sqlalchemy import exc
@staticmethod
def create_student_scores(
student_scores: list[StudentScore],
) -> list[StudentScore]:
# 构建模型实例列表(不设 created_at/updated_at,交由 DB 填充)
student_scores_model: list[StudentScoresModel] = [
StudentScoresModel(
id=str(uuid.uuid4()),
student_id=ss.student_id,
attribute_id=ss.attribute_id,
score=ss.score,
) for ss in student_scores
]
try:
# 关键步骤:bulk_save_objects + return_defaults=True + flush()
db.session.bulk_save_objects(student_scores_model, return_defaults=True)
db.session.flush() # ← 必须!使 server_default 值写回对象属性
# 此时所有 student_scores_model[i].created_at / updated_at 已为 datetime 实例
return [
model.convert_to_entity()
for model in student_scores_model
]
except exc.SQLAlchemyError as e:
db.session.rollback()
raise e⚠️ 注意事项:return_defaults=True 仅对 server_default(如 func.now())、default(Python 端默认值)及主键自增生效;onupdate 仅在后续更新时触发,首次插入时 updated_at 与 created_at 均由 server_default 填充。MySQL 中 func.now() 在 server_default 下是服务端时间,确保一致性;避免使用 default=func.now()(客户端时间,易偏差)。bulk_save_objects 不调用 __init__ 或事件钩子(如 @event.listens_for(..., 'before_insert')),如有依赖 ORM 事件的逻辑,需改用 add_all + bulk_insert_mappings 或分批处理。
❌ 为什么 add_all() + commit() 不是最佳选择?
虽然以下写法能自动填充时间字段:
db.session.add_all(student_scores_model) db.session.commit() # created_at/updated_at 自动可用
但它本质是 N 条 INSERT 语句(或受 bulk_insert_mappings 优化为多值 INSERT),且每条记录都经历完整 ORM 生命周期(state tracking、dirty checking、event dispatch)。当插入 10,000+ 记录时,内存占用和 CPU 开销远高于 bulk_save_objects,实测性能可能下降 3–5 倍。
✅ 替代高性能方案:bulk_insert_mappings
若模型字段较固定、无需实例方法(如 convert_to_entity),可进一步提升性能:
mappings = [
{
"id": str(uuid.uuid4()),
"student_id": ss.student_id,
"attribute_id": ss.attribute_id,
"score": ss.score,
}
for ss in student_scores
]
db.session.bulk_insert_mappings(StudentScoresModel, mappings, return_defaults=True)
db.session.flush() # 同样必须 flush 才能取回默认值bulk_insert_mappings 绕过模型实例化,直接构造 SQL,是纯批量插入的最快路径,适合 ETL 或日志类场景。
总结
| 方法 | 性能 | 默认值回填 | ORM 事件支持 | 推荐场景 |
|---|---|---|---|---|
| bulk_save_objects(..., return_defaults=True) + flush() | ★★★★★ | ✅(需 flush) | ❌ | 通用首选:需实例方法 + 高性能 |
| bulk_insert_mappings(..., return_defaults=True) + flush() | ★★★★★★ | ✅(需 flush) | ❌ | 纯数据导入、字段简单 |
| add_all() + commit() | ★★☆ | ✅(自动) | ✅ | 小批量( |
牢记:return_defaults=True 是开关,flush() 是执行器——二者缺一不可,方能在高性能前提下,完整获得数据库生成的时间戳。










