python中绕过gil实现真正并行计算的最直接方式是使用multiprocessing模块;2. 该模块通过创建独立进程,每个进程拥有自己的解释器和内存空间,从而实现多核cpu并行计算;3. multiprocessing提供了process类创建和管理进程、queue/pipe实现进程间通信、以及pool用于高效管理大量任务;4. 多进程适用于cpu密集型任务,而多线程受限于gil更适合i/o密集型任务;5. 进程间通信可通过队列(queue)、管道(pipe)和共享内存(shared memory)实现,各自适用于不同场景;6. 使用pool能有效管理大量并发任务,简化资源调度与结果收集,提升代码可维护性和执行效率。
Python中要实现真正的并行计算,绕过全局解释器锁(GIL)的限制,最直接有效的方式就是使用multiprocessing模块。它允许我们创建独立的进程,每个进程都有自己的Python解释器和内存空间,从而能够充分利用多核CPU的计算能力。
使用multiprocessing模块的核心是Process类。你可以创建一个Process对象,指定它要执行的目标函数,然后启动它。
import multiprocessing import os import time def worker_function(name, delay): """一个简单的模拟工作函数""" pid = os.getpid() print(f"进程 {pid} ({name}) 开始工作...") time.sleep(delay) # 模拟耗时操作 print(f"进程 {pid} ({name}) 完成工作。") if __name__ == '__main__': print("主进程开始。") # 创建两个进程 process1 = multiprocessing.Process(target=worker_function, args=("任务A", 2)) process2 = multiprocessing.Process(target=worker_function, args=("任务B", 1)) # 启动进程 process1.start() process2.start() # 等待所有子进程完成 # join() 方法会阻塞主进程,直到对应的子进程执行完毕 process1.join() process2.join() print("所有子进程已完成,主进程结束。")
在这个例子里,worker_function会在独立的进程中运行。process1.start()和process2.start()会立即返回,而不会等待函数执行完毕,这就是并行执行的体现。join()方法则确保主进程在所有子进程结束之前不会退出。值得注意的是,if __name__ == '__main__':这行代码是必须的,尤其是在Windows系统上,它能确保子进程在启动时不会重复导入模块,避免无限循环创建进程的问题。
立即学习“Python免费学习笔记(深入)”;
这个问题,我个人觉得,是理解Python并发编程的关键。很多人刚开始接触时会混淆多线程和多进程,甚至觉得多线程就足够了。但实际上,对于CPU密集型任务,Python的多线程因为全局解释器锁(GIL)的存在,并不能实现真正的并行计算。GIL一次只允许一个线程执行Python字节码,这意味着即便你有多个线程,它们也只能轮流执行,无法同时利用多核CPU。
而多进程则完全不同。每个进程都有自己独立的内存空间和Python解释器实例,它们之间是相互隔离的。这意味着,一个进程的执行不会受到另一个进程中GIL的限制。所以,当你的任务是计算密集型(比如大量数学运算、数据处理等),需要榨干CPU的性能时,多进程就是你的不二之选。
简单来说:
所以,选择多进程还是多线程,很大程度上取决于你的任务类型。
一开始接触多进程,最头疼的就是数据共享。你不能像多线程那样直接改个全局变量就完事儿,那样会出大问题。因为每个进程都有自己的内存副本,直接修改是无效的。得换个思路,比如把数据“扔”进队列里,让另一个进程去“捞”出来,这才是正道。multiprocessing模块提供了几种机制来处理进程间通信(IPC):
队列(Queue): 这是最常用、最灵活的方式,适用于任何类型的数据。Queue是进程安全的,可以用来在生产者-消费者模式中传递消息。
import multiprocessing import time def producer(queue): for i in range(5): msg = f"消息 {i}" queue.put(msg) print(f"生产者发送: {msg}") time.sleep(0.5) queue.put(None) # 发送结束信号 def consumer(queue): while True: msg = queue.get() if msg is None: # 收到结束信号 break print(f"消费者接收: {msg}") time.sleep(0.8) if __name__ == '__main__': q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join() print("队列通信示例结束。")
管道(Pipe): Pipe用于两个进程之间的双向或单向通信。它比Queue更轻量级,但只能用于两个进程。
import multiprocessing def sender(conn): conn.send("你好,我是发送方!") conn.close() def receiver(conn): msg = conn.recv() print(f"接收方收到: {msg}") conn.close() if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() # 创建管道 p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join() print("管道通信示例结束。")
共享内存(Shared Memory): 对于简单的数值类型或数组,可以使用Value和Array来在进程间共享。但需要注意的是,它们本身不提供同步机制,你可能还需要Lock来避免竞态条件。
import multiprocessing def increment(num, arr, lock): lock.acquire() # 获取锁 try: num.value += 1 for i in range(len(arr)): arr[i] += 1 print(f"进程 {multiprocessing.current_process().name} 修改后: num={num.value}, arr={list(arr)}") finally: lock.release() # 释放锁 if __name__ == '__main__': num = multiprocessing.Value('i', 0) # 共享整数 arr = multiprocessing.Array('i', [0, 0, 0]) # 共享整数数组 lock = multiprocessing.Lock() # 共享锁 processes = [] for i in range(3): p = multiprocessing.Process(target=increment, args=(num, arr, lock), name=f"Worker-{i}") processes.append(p) p.start() for p in processes: p.join() print(f"最终结果: num={num.value}, arr={list(arr)}")
选择哪种方式取决于你的具体需求:Queue是最通用的,Pipe适用于点对点通信,而共享内存则适用于需要直接访问共享数据且对性能有较高要求的场景,但需要手动处理同步。
说实话,一开始我总想着自己手动Process().start()、join(),但任务一多起来,比如要处理成百上千个文件,或者对大量数据进行并行计算,那代码就没法看了,而且手动管理进程的生命周期、资源限制也变得非常麻烦。Pool这东西,简直是“解放双手”的神器,它把那些烦人的进程生命周期管理、任务调度都给封装好了,我们只需要关注任务本身就行。
Pool类提供了一种更高级的方式来管理一组工作进程,它可以自动分配任务给这些进程,并在任务完成后回收资源。
主要优势:
最常用的方法是map()和apply_async():
pool.map(func, iterable): 类似于内置的map()函数,它将iterable中的每个元素作为参数传递给func,并并行执行。它会阻塞直到所有结果都返回。
import multiprocessing import time def square(x): time.sleep(0.1) # 模拟计算 return x * x if __name__ == '__main__': with multiprocessing.Pool(processes=4) as pool: # 创建一个包含4个工作进程的进程池 # 使用map并行计算列表元素的平方 results = pool.map(square, range(10)) print(f"使用map计算结果: {results}")
pool.apply_async(func, args=(), kwds={}): 提交一个任务到进程池,并立即返回一个AsyncResult对象。你可以通过AsyncResult.get()方法获取结果(会阻塞直到结果可用),或者通过AsyncResult.ready()、AsyncResult.successful()检查任务状态。这适用于需要异步处理结果,或者任务参数不适合map的情况。
import multiprocessing import time def complex_task(a, b): time.sleep(0.5) return f"计算 {a} + {b} = {a + b}" if __name__ == '__main__': with multiprocessing.Pool(processes=3) as pool: # 提交异步任务 async_result1 = pool.apply_async(complex_task, args=(1, 2)) async_result2 = pool.apply_async(complex_task, args=(3, 4)) async_result3 = pool.apply_async(complex_task, args=(5, 6)) # 可以做其他事情,然后获取结果 print("主进程在等待结果...") result1 = async_result1.get() result2 = async_result2.get() result3 = async_result3.get() print(f"结果1: {result1}") print(f"结果2: {result2}") print(f"结果3: {result3}")
使用with语句管理Pool是非常推荐的,它能确保进程池在代码块执行完毕后被正确关闭,释放所有资源。Pool是处理大量独立任务的理想选择,它极大地简化了并行编程的复杂性。
以上就是Python中如何使用多进程?multiprocessing模块详解的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号