
future 是一个容器,可以保存计算结果或计算期间发生的错误。创建 future 时,它以 pending 状态开始。该库不打算手动创建此对象,除非出于测试目的。
import concurrent.futures as futures f = futures.future() assert(f._result is none) assert(f._exception is none) assert(f._state == 'pending')
pending 状态表示用户请求的计算已注册到线程池中并放入队列中,但尚未被任何线程拾取执行。一旦空闲线程从队列中获取任务(回调),future 就会转换为 running 状态。 future 只能在处于 pending 状态时被取消。因此,在 pending 和 running 状态之间存在一个时间窗口,在此期间可以取消请求的计算。
import concurrent.futures as futures
def should_cancel_pending_future():
f = futures.future()
assert(f._state == 'pending')
assert(f.cancel())
assert(f._state == 'cancelled')
def should_not_cancel_running_future():
f = futures.future()
f.set_running_or_notify_cancel()
assert(f._state == 'running')
assert(not f.cancel())
def cancel_is_idempotent():
f = futures.future()
assert(f.cancel())
assert(f.cancel())
should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()
线程池中请求的操作可以完成计算值或导致错误。无论结果如何,未来都会过渡到 finished 状态。然后结果或错误将存储在相应的字段中。
import concurrent.futures as futures
def future_completed_with_result():
f = futures.future()
f.set_result('foo')
assert(f._state == 'finished')
assert(f._result == 'foo')
assert(f._exception is none)
def future_completed_with_exception():
f = futures.future()
f.set_exception(nameerror())
assert(f._state == 'finished')
assert(f._result is none)
assert(isinstance(f._exception, nameerror))
future_completed_with_result()
future_completed_with_exception()
要检索计算结果,请使用 result 方法。如果计算尚未完成,此方法将阻塞当前线程(从中调用结果),直到计算完成或等待超时。
如果计算成功完成且没有错误,则 result 方法返回计算值。
import concurrent.futures as futures
import time
import threading
f = futures.future()
def target():
time.sleep(1)
f.set_result('foo')
threading.thread(target=target).start()
assert(f.result() == 'foo')
如果计算过程中发生异常,结果将引发该异常。
import concurrent.futures as futures
import time
import threading
f = futures.future()
def target():
time.sleep(1)
f.set_exception(nameerror)
threading.thread(target=target).start()
try:
f.result()
raise exception()
except nameerror:
assert(true)
如果方法在等待时超时,则会引发 timeouterror。
import concurrent.futures as futures
f = futures.future()
try:
f.result(1)
raise exception()
except timeouterror:
assert(f._result is none)
assert(f._exception is none)
尝试获取已取消的计算结果将引发 cancellederror。
import concurrent.futures as futures
f = futures.future()
assert(f.cancel())
try:
f.result()
raise exception()
except futures.cancellederror:
assert(true)
在开发过程中,需要在线程池上运行n次计算并等待其完成是很常见的。为了实现这一点,该库提供了等待函数。有几种等待策略:first_completed、first_exception、all_completed。
所有等待策略的共同点是,如果传递给 wait 方法的 future 已经完成,则无论选择何种策略,都会返回传递的 future 的集合。无论它们是如何完成的,无论是有错误、结果还是被取消,都无关紧要。
import concurrent.futures as futures
def test(return_when):
f1, f2, f3 = futures.future(), futures.future(), futures.future()
f1.cancel()
f1.set_running_or_notify_cancel() # required
f2.set_result('foo')
f3.set_exception(nameerror)
r = futures.wait([f1, f2, f3], return_when=return_when)
assert(len(r.done) == 3)
assert(len(r.not_done) == 0)
for return_when in [futures.all_completed, futures.first_exception, futures.first_completed]:
test(return_when)
all_completed 策略保证等待所有传递的 future 完成,或者在超时后退出,并收集截至该时刻完成的 future,这可能是不完整的。
import concurrent.futures as futures
import threading
import time
def should_wait_for_all_futures_to_complete():
f1 = futures.future()
f1.set_result('foo')
f2 = futures.future()
def target():
time.sleep(1)
f2.set_result('bar')
threading.thread(target=target).start()
r = futures.wait([f1, f2], return_when=futures.all_completed)
assert(len(r.done) == 2)
def should_exit_on_timeout():
f1 = futures.future()
f1.set_result('foo')
f2 = futures.future()
r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.all_completed)
assert(len(r.done) == 1)
should_wait_for_all_futures_to_complete()
should_exit_on_timeout()
first_completed 策略保证返回至少有一个已完成的 future 的集合,或者在超时的情况下返回空集合。 此策略并不意味着返回的集合不能包含多个元素。
import concurrent.futures as futures
import threading
import time
f1 = futures.future()
f2 = futures.future()
def target():
time.sleep(1)
f1.set_result(true)
threading.thread(target=target).start()
r = futures.wait([f1, f2], return_when=futures.first_completed)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)
如果其中一个计算完成时出现错误,first_exception 策略会中断等待。如果没有发生异常,则行为与 all_completed 未来相同。
import concurrent.futures as futures
import threading
import time
f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()
def target():
time.sleep(1)
f2.set_exception(NameError())
threading.Thread(target=target).start()
r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)
该对象负责创建线程池。与该对象交互的主要方法是 submit 方法。它允许在线程池中注册计算。作为响应,返回一个 future 对象,用于监控计算状态并获取最终结果。
属性
立即学习“Python免费学习笔记(深入)”;
以上就是python 并发期货的详细内容,更多请关注php中文网其它相关文章!
python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号