直接用 queue.Queue 易卡死,因其 get() 默认无限阻塞且无超时/异常穿透机制;asyncio.Queue 需配 timeout 和 task_done;Redis 用 zset + bzpopmin 支持优先级与持久化;须通过 full() 或 zcard 实现反压控制。

为什么直接用 queue.Queue 在爬虫里容易卡死
多线程爬虫中,如果直接用标准库的 queue.Queue 做任务分发,常出现消费者线程全部阻塞在 get()、生产者却因异常退出而不再放新任务——队列既没满也没空,但整个调度就僵住了。根本原因是它默认的阻塞行为缺乏超时兜底和异常穿透机制。
-
get(block=True)会无限等待,一旦上游断流,线程就挂起不响应中断 - 没有内置重试计数或失败归档逻辑,单个坏 URL 可能导致任务永久滞留
- 无法跨进程共享,后续想加分布式调度就得重写整套队列层
用 asyncio.Queue 实现轻量异步调度的关键配置
对中小规模 HTTP 爬取(比如每秒 10–50 请求),asyncio.Queue 比线程队列更省资源,但必须显式控制生命周期,否则协程会泄漏。
import asyncioasync def worker(queue: asyncio.Queue, session): while True: try: url = await asyncio.wait_for(queue.get(), timeout=3.0) # 必须设超时 async with session.get(url) as resp:
处理响应...
queue.task_done() # 必须调用,否则 join() 不返回 except asyncio.TimeoutError: break # 超时即退出,避免死循环 except Exception as e: print(f"Worker error on {url}: {e}") queue.task_done() # 错误也要标记完成,否则队列卡住
-
asyncio.wait_for(..., timeout=...)是刚需,不能依赖get_nowait()—— 它抛queue.Empty异常,但协程里没地方 catch - 每个
get()后必须配对task_done(),哪怕出错也要调,否则queue.join()永远不结束 - 不要在 worker 里用
await queue.put(...)回填重试任务——容易引发循环等待,应由独立的 retry manager 处理
需要持久化或扩缩容?绕过内存队列直连 Redis 的最小可行方案
当爬虫要跑几天、或需横向加机器时,内存队列不可靠。用 redis-py 的 lpop/rpush 组合比引入 Celery 更轻,且天然支持失败重入队。
import redis import jsonr = redis.Redis()
立即学习“Python免费学习笔记(深入)”;
def add_task(url: str, priority: int = 0): payload = json.dumps({"url": url, "retry": 0}) r.zadd("pending_tasks", {payload: priority}) # 用有序集合支持优先级
def get_task(timeout=1) -> dict | None:
阻塞式取一个,超时返回 None
result = r.bzpopmin("pending_tasks", timeout=timeout) if result: return json.loads(result[1]) return None
- 别用
list类型的lpop—— 无法去重、不支持优先级、无超时原语;zset或stream更稳妥 -
bzpopmin是原子操作,避免“取到但崩溃未处理”导致任务丢失 - 任务体里必须带
retry字段,失败时r.zadd("pending_tasks", {payload: time.time() + 60})实现指数退避
调度器里最容易被忽略的「反压」信号:如何让生产者感知下游拥堵
很多爬虫把 URL 批量塞进队列就不管了,结果内存暴涨 OOM。真正的调度必须让生产者知道“慢点来”。
- 用
queue.qsize()做阈值判断不可靠(多线程下非原子),改用queue.full()+time.sleep()组合 - 异步场景下,在
put()前加if queue.qsize() > MAX_SIZE: await asyncio.sleep(0.1) - Redis 方案中,用
r.zcard("pending_tasks")监控积压量,超过阈值则暂停解析新页面链接
队列不是管道,是缓冲区;缓冲区满了还硬塞,系统就从调度问题变成运维事故。










