答案是推荐使用concurrent.futures.ThreadPoolExecutor。Python标准库中无官方threadpool模块,常用的是concurrent.futures.ThreadPoolExecutor,支持submit提交任务和map批量处理,适用于I/O密集型任务,如网络请求,并发下载等,而第三方threadpool库已过时不推荐使用。

Python 中并没有一个官方的 threadpool 模块作为标准库的一部分。你可能指的是第三方库 threadpool,或者更常见的是 Python 标准库中的 concurrent.futures.ThreadPoolExecutor。下面分别介绍这两种用法,重点推荐使用标准库方式。
使用 concurrent.futures.ThreadPoolExecutor(推荐)
这是 Python 内置的线程池模块,功能强大且易于使用。
基本用法示例:
import concurrent.futures import timedef task(name): print(f"任务 {name} 开始") time.sleep(2) return f"任务 {name} 完成"
创建线程池,最大3个线程
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
立即学习“Python免费学习笔记(深入)”;
提交多个任务
futures = [executor.submit(task, i) for i in range(5)] # 获取结果 for future in concurrent.futures.as_completed(futures): print(future.result())说明:
- max_workers:指定线程池中最多同时运行的线程数
- submit():提交单个任务,返回 Future 对象
- as_completed():可以实时获取已完成的任务结果
- 支持 map() 方法批量提交任务
使用 map 的简洁写法:
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(task, range(5))
for result in results:
print(result)
使用第三方 threadpool 模块(已过时)
这是一个较老的第三方库,不推荐在新项目中使用。如果你确实需要:
dmSOBC SHOP网店系统由北京时代胜腾信息技术有限公司(http://www.webzhan.com)历时6个月开发完成,本着简单实用的理念,商城在功能上摒弃了外在装饰的一些辅助功能,尽可能的精简各项模块开发,做到有用的才开发,网店V1.0.0版本开发完成后得到了很多用户的使用并获得了好评,公司立即对网店进行升级,其中包括修正客户提出的一些意见和建议,现对广大用户提供免费试用版本,如您在使用
安装:
pip install threadpool
简单示例:
import threadpool import timedef task(name): print(f"执行任务 {name}") time.sleep(1)
pool = threadpool.ThreadPool(3) # 3个线程 requests = threadpool.makeRequests(task, [1, 2, 3, 4, 5]) for req in requests: pool.putRequest(req) pool.wait()
注意:该库已多年未更新,兼容性和维护性较差。
常见使用场景和建议
线程池适合用于 I/O 密集型任务,比如网络请求、文件读写等。
实际例子:并发下载网页
import concurrent.futures import requestsdef fetch_url(url): response = requests.get(url) return len(response.text)
urls = [ "https://www.php.cn/link/5f69e19efaba426d62faeab93c308f5c", "https://www.php.cn/link/ef246753a70fce661e16668898810624", "https://www.php.cn/link/5f69e19efaba426d62faeab93c308f5c" ]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(fetch_url, urls) for length in results: print(f"页面大小: {length}")
基本上就这些。对于大多数情况,直接使用 concurrent.futures.ThreadPoolExecutor 就足够了,无需额外依赖。它简洁、安全,且是 Python 官方推荐的方式。










