生产者消费者模式是解耦任务生成与执行的并发模型,由生产者线程向线程安全队列put任务、消费者线程get并处理,配合task_done和join实现同步,适用于日志处理等高吞吐场景。

什么是生产者消费者模式
生产者消费者模式是一种经典的并发编程模型,用于解耦任务的生成与执行。在Python任务调度场景中,它表现为:一个或多个线程(或进程)负责生成任务(生产者),把任务放入共享队列;另一个或多个工作线程(消费者)持续从队列中取出任务并执行。这种结构天然适合异步、批量、高吞吐的任务调度系统,比如日志处理、定时爬虫、消息推送等。
核心组件:队列 + 线程/进程协作
Python标准库中的 queue.Queue 是实现该模式的关键——它线程安全,内置阻塞与超时机制,无需手动加锁。搭配 threading 或 multiprocessing 模块即可快速搭建调度骨架。
- 使用 queue.Queue(maxsize=0) 创建无限长队列;设为正整数可控制缓冲区大小,避免内存溢出
- 生产者调用 put(item) 入队,可选 block=True, timeout=2 防止无限等待
- 消费者调用 get() 出队,配合 task_done() 和 join() 实现任务完成同步
- 推荐用守护线程(daemon=True)运行消费者,主程序退出时自动终止
一个轻量可用的调度示例
下面是一个基于多线程的最小可行调度器,模拟“每秒生成3个任务,由2个工人并发处理”:
import queue import threading import time import random共享任务队列
task_queue = queue.Queue()
立即学习“Python免费学习笔记(深入)”;
def producer(): for i in range(10): task = f"task-{i}" print(f"[生产] {task}") task_queue.put(task) time.sleep(1) # 模拟不均匀生产节奏
def worker(worker_id): while True: try: task = task_queue.get(timeout=3) # 3秒无任务则退出 print(f"[工人{worker_id}] 正在处理 {task}") time.sleep(random.uniform(0.5, 2)) # 模拟耗时操作 task_queue.task_done() except queue.Empty: break
启动1个生产者、2个消费者
threading.Thread(target=producer).start() for i in range(2): threading.Thread(target=worker, args=(i+1,), daemon=True).start()
等待所有任务完成
task_queue.join() print("全部任务执行完毕")
进阶建议:应对真实生产环境
简单示例满足学习,但上线需考虑健壮性与可观测性:










