使用Queue、共享变量加锁或concurrent.futures结合回调可实现Python多线程进度监控,推荐根据任务结构和更新频率选择线程安全的方案。

在Python中使用多线程执行耗时任务时,常需要实时监控任务进度并回调通知主线程。由于GIL的存在,Python的多线程适合I/O密集型场景,但实现进度回调的核心在于线程间通信机制。
1. 使用 Queue 实现线程安全的进度回调
Queue 是线程安全的,非常适合用于从工作线程向主线程传递进度信息。
示例:模拟文件下载任务的进度更新
立即学习“Python免费学习笔记(深入)”;
import threading import time import queuedef download_task(task_id, total_steps, progress_queue): for step in range(1, total_steps + 1): time.sleep(0.1) # 模拟 I/O 延迟 progress = int(step / total_steps * 100) progress_queue.put({ 'task_id': task_id, 'progress': progress })
def monitor_progress(progress_queue, total_tasks): completed = 0 while completed < total_tasks: try: update = progress_queue.get(timeout=1) print(f"任务 {update['task_id']} 进度: {update['progress']}%") if update['progress'] == 100: completed += 1 except queue.Empty: continue print("所有任务完成")
启动任务
if name == "main": q = queue.Queue() threads = []
for i in range(3): t = threading.Thread(target=download_task, args=(i, 10, q)) t.start() threads.append(t) monitor_thread = threading.Thread(target=monitor_progress, args=(q, 3)) monitor_thread.start() for t in threads: t.join() monitor_thread.join()2. 使用共享变量 + 回调函数
通过共享数据结构(如字典)记录各任务进度,并配合锁保证线程安全。
import threading import timeclass ProgressTracker: def init(self, task_count, callback): self.lock = threading.Lock() self.progress = {i: 0 for i in range(task_count)} self.callback = callback
def update(self, task_id, value): with self.lock: self.progress[task_id] = value self.callback(task_id, value)def progress_callback(task_id, progress): print(f"[回调] 任务 {task_id} 当前进度: {progress}%")
def worker(task_id, steps, tracker): for i in range(1, steps + 1): time.sleep(0.1) progress = int(i / steps * 100) tracker.update(task_id, progress)
使用示例
if name == "main": tracker = ProgressTracker(3, progress_callback) threads = []
for i in range(3): t = threading.Thread(target=worker, args=(i, 10, tracker)) t.start() threads.append(t) for t in threads: t.join()3. 结合 concurrent.futures 的异步回调
使用 ThreadPoolExecutor 可以更方便地管理线程池,并通过
add_done_callback监控任务完成状态。虽然不能直接获取中间进度,但可通过共享对象间接实现。
from concurrent.futures import ThreadPoolExecutor import timeshared_status = {}
def long_task(task_id, steps): for i in range(1, steps + 1): time.sleep(0.1) shared_status[task_id] = int(i / steps * 100) return f"任务 {task_id} 完成"
def done_callback(future): print(future.result())
with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(long_task, i, 10) for i in range(3)]
for f in futures: f.add_done_callback(done_callback) # 实时监控进度 while any(status < 100 for status in shared_status.values()): for tid, p in shared_status.items(): print(f"任务 {tid}: {p}%", end=" | ") print() time.sleep(0.5)4. 实际应用建议
根据场景选择合适的方案:
- 需要高频率更新进度 → 使用 Queue 避免频繁加锁
- 需结构化管理多个任务 → 使用 共享对象 + 锁
- 任务数量固定且关注完成状态 → concurrent.futures 更简洁
- 避免使用全局变量裸奔,务必保护共享数据
- GUI或Web应用中,回调可触发界面刷新
基本上就这些方法,核心是线程安全的数据传递。选哪种取决于你的任务结构和更新频率需求。










