Python中应使用multiprocessing.Queue而非queue.Queue进行进程间通信,因其基于管道或共享内存并内置序列化与同步机制,而普通队列不支持跨进程内存共享。

Python 中的 queue 模块本身**不是为多进程设计的**,直接在多个进程间使用普通 queue.Queue 会导致数据丢失、阻塞异常甚至程序崩溃。真正用于进程间通信(IPC)的是 multiprocessing.Queue —— 它是专为跨进程安全传输数据而实现的底层队列。
为什么不能用 threading.Queue 做进程通信?
因为进程拥有独立内存空间,threading.Queue 依赖线程共享内存和锁机制,在多进程中无法同步状态。两个进程各自持有一个 Queue 实例,彼此完全隔离,消息根本传不过去。
而 multiprocessing.Queue 底层基于 pipe(匿名管道)或 spawned process + shared memory(取决于平台和 Python 版本),并自带序列化(pickle)、锁、条件变量等机制,确保跨进程读写安全。
multiprocessing.Queue 的基本用法
它接口与 queue.Queue 高度兼容,常用方法包括:
立即学习“Python免费学习笔记(深入)”;
本文档主要讲述的是Android AsyncChannel源码分析;AsyncChannel类用于处理两个Handler之间的异步消息传递,消息传递的Handler可以出于同一进程,也可以处于不同进程,不同进程之间的Handler消息传递使用Android的Binder通信机制来实现。希望本文档会给有需要的朋友带来帮助;感兴趣的朋友可以过来看看
- put(item, block=True, timeout=None):向队列插入对象(自动 pickle 序列化)
- get(block=True, timeout=None):取出对象(自动反序列化)
- empty() 和 qsize():注意——这两个方法在多进程下red">不保证实时准确,仅作参考,不可用于同步逻辑
- close() 和 join_thread():建议在进程退出前调用,确保后台线程清理完毕
实际使用中的关键注意事项
避免踩坑,需牢记以下几点:
- 所有通过
Queue传递的对象必须是可被pickle序列化的(如 dict、list、str、int,但不能是 lambda、嵌套函数、未导入模块的类实例等) - 子进程消费完数据后,父进程不要立即关闭 Queue;应等待子进程
join()或使用queue.close() + queue.join_thread()显式清理 - 若需双向通信,不要复用同一个 Queue,而是创建两个(一个 for send,一个 for recv),或改用
multiprocessing.Pipe - 大量小消息频繁收发时,性能可能受限于 pickle 开销和 IPC 延迟,此时可考虑
multiprocessing.Array或mmap等更底层方式
一个可靠的工作示例
以下代码演示父子进程间安全传递任务与结果:
from multiprocessing import Process, Queue import timedef worker(task_q, result_q): while True: try: task = task_q.get(timeout=1) if task is None: # 退出信号 break result_q.put(f"processed: {task}") except: continue
if name == "main": task_q = Queue() result_q = Queue()
p = Process(target=worker, args=(task_q, result_q)) p.start() for i in range(3): task_q.put(i) time.sleep(0.1) if not result_q.empty(): print(result_q.get()) task_q.put(None) # 发送结束信号 p.join() task_q.close() result_q.close() task_q.join_thread() result_q.join_thread()这段代码展示了典型生产者-消费者模式,且包含了资源清理的完整流程。









