multiprocessing.Pool不能直接传带状态的类实例,因pickle无法序列化实例属性(如session、锁),需改用纯函数+参数传入、进程内独立初始化资源、队列分发代理、PID哈希选UA、连接池限流、指数退避重试、异步回调落地结果。

为什么 multiprocessing.Pool 不能直接传可调用对象(如带状态的类实例)
Python 的 multiprocessing 模块底层依赖 pickle 序列化函数和参数,在子进程中反序列化执行。而带实例状态的类方法(比如 self.session 或 self.proxy_pool)无法被安全 pickle,会抛出 AttributeError: Can't pickle local object 或 TypeError: cannot pickle '_thread.RLock' object。
- 解决思路:把任务逻辑拆成纯函数,所有依赖通过参数传入(如 URL、headers、超时值)
- 避免在 worker 函数里新建
requests.Session()或全局共享锁——每个进程应独立初始化自己的会话和资源 - 若需共享配置(如 User-Agent 列表),用
multiprocessing.Manager().list()或只读的模块级常量,不要传可变对象引用
如何安全地给每个进程分配独立代理和请求头
多个进程共用同一代理池或随机 UA 生成器会导致冲突或重复使用失效代理。关键不是“共享”,而是“隔离初始化”。
- 在每个 worker 函数开头调用
init_worker(),内部创建独立requests.Session()并设置专属proxies和headers - 代理来源建议用队列分发:主进程预加载代理列表,用
multiprocessing.Queue分配,每个 worker 启动时 pop 一个,用完再放回(配合queue.task_done()) - UA 不要靠全局 random.choice(),改用
secrets.choice()或基于进程 PID 生成哈希后取模,避免多进程下随机种子相同导致 UA 雷同
遇到 requests.exceptions.ConnectionError: Max retries exceeded 怎么办
这不是代码写错了,而是并发量超过目标站承受能力或本地端口耗尽。multiprocessing 下每个进程都可能开几十个 TCP 连接,叠加后极易触发连接拒绝或超时。
- 必须限制总并发数:用
Pool(processes=4)控制进程数,再在每个进程内用session.mount('http://', HTTPAdapter(pool_connections=10, pool_maxsize=10))控制连接池大小 - 加指数退避重试:捕获
ConnectionError后用time.sleep((2 ** attempt) + random.uniform(0, 1)),最多重试 3 次 - 检查系统限制:
ulimit -n(Linux/macOS)或 Windows 的动态端口范围,必要时调高;避免短连接高频发起
def fetch_with_retry(url, timeout=10, max_retries=3):
session = requests.Session()
adapter = HTTPAdapter(pool_connections=5, pool_maxsize=5)
session.mount('http://', adapter)
session.mount('https://', adapter)
for i in range(max_retries):
try:
resp = session.get(url, timeout=timeout)
resp.raise_for_status()
return resp.text
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
if i == max_retries - 1:
raise e
time.sleep((2 ** i) + random.uniform(0, 1))
return None结果怎么汇总又不卡死主进程
别用 pool.map() 直接返回大 HTML 字符串——内存爆炸风险高,且无法实时监控进度。要用异步回调 + 中间存储。
立即学习“Python免费学习笔记(深入)”;
- 用
pool.apply_async(func, args, callback=save_result),让每个 worker 执行完立刻调save_result写入文件或数据库,而不是等全部结束 -
回调函数里用线程安全方式记录状态:例如
manager.dict()存 {url: 'success'},或写入 SQLite(加 WAL 模式 + timeout 参数) - 主进程用
pool.close(); pool.join()等待完成,但期间可通过轮询中间状态判断是否异常中断
真正难的不是并发本身,是让每个进程像一个封闭的、自洽的小系统:自己管连接、自己选代理、自己重试、自己落地结果。一旦试图共享状态,问题就从网络转移到进程间同步上了。










