BackgroundTasks 默认不传播异常,必须在任务函数内手动 try/except 捕获并记录(需 exc_info=True),全局异常处理器和外层 try/except 均无效;重要任务应封装为带重试、超时的类。

BackgroundTasks 默认不传播异常
FastAPI 的 BackgroundTasks 本质是把函数扔进事件循环后台执行,**不会阻塞请求,也不会把异常抛给主请求上下文**。这意味着:任务里 raise ValueError("oops") 不会触发 HTTP 错误响应,也不会被日志自动捕获(除非你手动处理),更不会让调用方感知到失败。
必须手动 try/except 包裹任务函数
最直接可靠的方式是在传给 add_task() 的函数内部做异常兜底:
def send_email_async(to: str, subject: str):
try:
# 实际发信逻辑
smtp.send_message(...)
except Exception as e:
# 记录完整 traceback,否则日志里只有 "Exception"
logger.error("Email task failed", exc_info=True)
# 可选:发告警、写 DB 失败记录、重试等
注意:exc_info=True 是关键,否则 logger.error() 只打一行,看不到堆栈。
不能依赖全局异常处理器
FastAPI 的 @app.exception_handler() 对 BackgroundTasks 无效——它只拦截路由函数抛出的异常。后台任务是独立协程,脱离了请求生命周期,异常不会冒泡到那里。
- 别写
@app.exception_handler(Exception)期待它捕获后台错误 - 别在任务外层用
try/except包add_task()调用本身——那只能捕获“加任务失败”(比如传了非可调用对象),不是任务执行失败
需要重试或监控时,用封装类替代裸函数
当任务重要性升高(如支付回调通知、关键数据同步),建议封装成带重试、超时、状态追踪的类,而不是每次手写 try/except:
class ReliableTask:
def __init__(self, max_retries=3):
self.max_retries = max_retries
async def run(self, *args, **kwargs):
for i in range(self.max_retries + 1):
try:
return await self._execute(*args, **kwargs)
except Exception as e:
if i == self.max_retries:
logger.critical("Task exhausted retries", exc_info=True)
raise
logger.warning(f"Task retry {i+1}/{self.max_retries}", exc_info=True)
await asyncio.sleep(2 ** i) # 指数退避
这类封装容易漏掉的是:异步函数必须用 await 调用,且 BackgroundTasks.add_task() 不支持直接传 awaitable,所以得包装一层 asyncio.create_task() 或改用 asyncio.to_thread()(Python 3.9+)。
后台任务异常捕获的核心就一条:它不自动发生,你得在任务函数体内亲手接住、记清、处置好——没有捷径,也没有默认行为可依赖。










