Python异常默认不跨线程传播,因各线程有独立栈和异常上下文;子线程未捕获异常时仅触发threading.excepthook,默认打印traceback后退出,不会通知主线程。

Python 中的异常默认不会跨线程传播。主线程无法直接捕获子线程中发生的异常,子线程崩溃也不会中断主线程或其他线程。
为什么异常不自动传播?
每个线程有独立的执行栈和异常处理上下文。当子线程抛出未捕获异常时,Python 会调用 threading.excepthook(默认打印 traceback 并退出该线程),但不会向上通知创建它的线程。
手动捕获并传递异常的常用方法
需要显式将异常信息从子线程“带出来”,常见做法有:
-
使用
queue.Queue:子线程把异常对象(或 (type, value, traceback) 元组)放入队列,主线程定时检查并重新抛出 -
保存到共享变量 + 标志位:用
threading.Event或普通变量标记失败,并将sys.exc_info()结果存入线程安全容器(如threading.local()或加锁的 dict) -
使用
concurrent.futures.ThreadPoolExecutor:调用future.result()时,若子线程出错,会原样抛出该异常(推荐,封装了传播逻辑)
ThreadPoolExecutor 是最简洁的方案
它内部通过 _result 和 _exception 属性保存执行结果或异常,并在 result() 调用时触发重抛:
立即学习“Python免费学习笔记(深入)”;
from concurrent.futures import ThreadPoolExecutor import timedef risky_task(): time.sleep(0.1) raise ValueError("子线程出错了")
with ThreadPoolExecutor() as executor: future = executor.submit(risky_task) try: future.result() # 这里会抛出 ValueError except ValueError as e: print(f"捕获到:{e}") # 输出:捕获到:子线程出错了
自定义 excepthook 可用于日志或调试
如果不想让线程静默退出,可设置全局钩子:
import threading import sysdef custom_hook(args): print(f"[线程异常] {args.thread.name}: {args.exc_value}")
threading.excepthook = custom_hook
def bad_func(): raise RuntimeError("boom")
threading.Thread(target=bad_func).start() # 触发 custom_hook
注意:这仅用于记录,不能替代异常传播逻辑。










