Python程序设置超时机制可通过signal、threading、multiprocessing或第三方库实现,其中signal仅限Unix系统且无法中断CPU密集型任务,而threading和multiprocessing提供跨平台支持,通过线程或进程隔离实现更可靠超时控制。

Python程序设置超时机制,本质上是给一段代码执行设定一个时间上限,一旦超出这个时间,程序就会中断执行或抛出异常。这在处理外部服务调用、耗时计算或防止无限循环时非常有用。在Python中,我们可以利用操作系统级别的信号(
signal
要实现Python程序的超时控制,主要有以下几种策略:
利用signal
SIGALRM
import signal
import time
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Function execution timed out!")
def long_running_function():
print("Starting long running function...")
# 模拟一个耗时操作,例如网络请求或复杂计算
time.sleep(5)
print("Long running function completed.")
return "Success"
# 设置信号处理器
signal.signal(signal.SIGALRM, timeout_handler)
try:
# 设置3秒的超时
signal.alarm(3)
result = long_running_function()
print(f"Result: {result}")
except TimeoutException as e:
print(f"Error: {e}")
finally:
# 清除定时器,避免影响后续代码
signal.alarm(0)这种方式简单高效,但它的局限性在于只能在Unix-like系统(如Linux, macOS)上工作,并且它只能中断那些可以被信号中断的系统调用(如
time.sleep()
signal.alarm
通过threading
multiprocessing
使用threading
立即学习“Python免费学习笔记(深入)”;
import threading
import time
def worker_function(event, result_list):
print("Worker thread starting...")
try:
for i in range(1, 6): # 模拟一个5秒的任务
if event.is_set(): # 检查是否被主线程要求停止
print("Worker thread received stop signal.")
break
print(f"Worker processing {i}...")
time.sleep(1)
else:
result_list.append("Worker finished successfully.")
except Exception as e:
result_list.append(f"Worker encountered error: {e}")
finally:
print("Worker thread exiting.")
# 共享列表用于存储结果,Event用于线程间通信
results = []
stop_event = threading.Event()
worker_thread = threading.Thread(target=worker_function, args=(stop_event, results))
worker_thread.start()
# 主线程等待子线程完成,设置4秒超时
worker_thread.join(timeout=4)
if worker_thread.is_alive():
print("Function timed out! Attempting to signal worker to stop.")
stop_event.set() # 通知worker停止
worker_thread.join() # 再次等待worker,确保其退出
print("Worker thread terminated (or attempted to).")
results.append("Timeout occurred.")
else:
print("Function completed within timeout.")
print(f"Final results: {results}")使用multiprocessing
multiprocessing
import multiprocessing
import time
def cpu_bound_task(queue, duration):
print(f"Process started for {duration} seconds...")
start_time = time.time()
# 模拟CPU密集型计算
while True:
if time.time() - start_time > duration:
break
_ = [i*i for i in range(10000)] # 耗时操作
queue.put("Task completed successfully.")
print("Process finished.")
result_queue = multiprocessing.Queue()
process_timeout = 3 # 秒
task_duration = 5 # 秒,模拟一个会超时的任务
p = multiprocessing.Process(target=cpu_bound_task, args=(result_queue, task_duration))
p.start()
p.join(timeout=process_timeout)
if p.is_alive():
print(f"Process timed out after {process_timeout} seconds. Terminating...")
p.terminate() # 强制终止进程
p.join() # 等待进程彻底终止
print("Process terminated.")
result_queue.put("Timeout occurred.")
else:
print("Process completed within timeout.")
try:
final_result = result_queue.get_nowait()
print(f"Final result: {final_result}")
except multiprocessing.queues.Empty:
print("No result from process (might have terminated abruptly).")利用第三方库 Python社区提供了许多优秀的第三方库,它们封装了上述复杂逻辑,提供了更简洁、更易用的API来设置超时。
func_timeout
multiprocessing
from func_timeout import func_timeout, FunctionTimedOut
import time
def my_long_function(duration):
print(f"Function starting for {duration} seconds...")
time.sleep(duration)
print("Function finished.")
return "Done"
try:
# 尝试执行一个5秒的函数,但只给3秒的超时
result = func_timeout(3, my_long_function, args=(5,))
print(f"Result: {result}")
except FunctionTimedOut:
print("Error: Function timed out!")
except Exception as e:
print(f"An unexpected error occurred: {e}")timeout_decorator
from timeout_decorator import timeout, TimeoutError
import time
@timeout(3) # 设置3秒超时
def another_long_function(duration):
print(f"Another function starting for {duration} seconds...")
time.sleep(duration)
print("Another function finished.")
return "Completed"
try:
result = another_long_function(5) # 实际执行5秒
print(f"Result: {result}")
except TimeoutError:
print("Error: Another function timed out!")
except Exception as e:
print(f"An unexpected error occurred: {e}")signal
signal
signal
它的局限性主要体现在以下几个方面:
signal.alarm
SIGALRM
signal
SIGALRM
signal
time.sleep()
while True: pass
signal.alarm
signal.alarm
signal
signal.alarm
总的来说,
signal
在我看来,跨平台的问题往往是Python开发者不得不面对的“甜蜜负担”。
threading
multiprocessing
1. 基于threading
threading
Thread.join(timeout=...)
join
join
is_alive()
优雅之处在于,我们不能粗暴地“杀死”一个线程(Python没有提供这样的API),而是应该通过一个共享的
Event
import threading
import time
import queue # 用于线程间传递结果
def timed_task_thread(task_id, stop_event, result_queue):
"""一个模拟耗时任务的线程函数,会检查停止信号"""
print(f"Thread {task_id}: Starting task...")
try:
for i in range(1, 10):
if stop_event.is_set():
print(f"Thread {task_id}: Stop event received, exiting early.")
result_queue.put(f"Task {task_id} interrupted.")
return
print(f"Thread {task_id}: Working on step {i}...")
time.sleep(0.7) # 模拟I/O或计算
result_queue.put(f"Task {task_id} completed successfully.")
except Exception as e:
result_queue.put(f"Task {task_id} failed: {e}")
finally:
print(f"Thread {task_id}: Exiting.")
def run_with_timeout_thread(func, timeout_seconds, *args, **kwargs):
"""
使用线程实现带超时的函数执行。
func: 要执行的函数
timeout_seconds: 超时时间(秒)
*args, **kwargs: 传递给func的参数
"""
stop_event = threading.Event()
result_queue = queue.Queue() # 用于获取线程的执行结果
# 创建并启动线程
task_thread = threading.Thread(target=func, args=(stop_event, result_queue, *args), kwargs=kwargs)
task_thread.start()
# 等待线程完成,设置超时
task_thread.join(timeout=timeout_seconds)
if task_thread.is_alive():
print(f"Timeout: Task exceeded {timeout_seconds} seconds. Signaling to stop...")
stop_event.set() # 发送停止信号
task_thread.join() # 再次join,等待线程响应停止信号并退出
# 如果线程没有响应停止信号(例如CPU密集型循环没有检查event),这里可能仍会阻塞或需要更复杂的处理
raise TimeoutError(f"Function timed out after {timeout_seconds} seconds.")
else:
print("Task completed within timeout.")
# 获取线程执行结果
try:
return result_queue.get_nowait()
except queue.Empty:
# 如果线程异常退出或没有放入结果,这里可能为空
return "No explicit result from thread."
# 示例调用
print("\n--- Threading Timeout Example (Success) ---")
try:
thread_result = run_with_timeout_thread(timed_task_thread, 8, "A") # 任务预计6.3秒,超时8秒
print(f"Main received: {thread_result}")
except TimeoutError as e:
print(f"Main caught error: {e}")
print("\n--- Threading Timeout Example (Timeout) ---")
try:
thread_result = run_with_timeout_thread(timed_task_thread, 3, "B") # 任务预计6.3秒,超时3秒
print(f"Main received: {thread_result}")
except TimeoutError as e:
print(f"Main caught error: {e}")2. 基于multiprocessing
multiprocessing
process.terminate()
import multiprocessing
import time
def timed_task_process(task_id, duration, result_queue):
"""一个模拟CPU密集型任务的进程函数"""
print(f"Process {task_id}: Starting CPU-bound task for {duration} seconds...")
start_time = time.time()
count = 0
while True:
if time.time() - start_time > duration:
break
# 模拟CPU密集型计算
_ = [i*i for i in range(100000)]
count += 1
# print(f"Process {task_id}: Iteration {count}") # 不打印太多,避免I/O影响CPU模拟
result_queue.put(f"Task {task_id} completed {count} iterations.")
print(f"Process {task_id}: Exiting.")
def run_with_timeout_process(func, timeout_seconds, *args, **kwargs):
"""
使用进程实现带超时的函数执行。
func: 要执行的函数
timeout_seconds: 超时时间(秒)
*args, **kwargs: 传递给func的参数
"""
result_queue = multiprocessing.Queue()
# 将result_queue作为第一个参数传递给func,因为它需要在子进程中被写入
process_args = (result_queue, *args)
task_process = multiprocessing.Process(target=func, args=process_args, kwargs=kwargs)
task_process.start()
task_process.join(timeout=timeout_seconds)
if task_process.is_alive():
print(f"Timeout: Process exceeded {timeout_seconds} seconds. Terminating...")
task_process.terminate() # 强制终止进程
task_process.join() # 等待进程彻底终止
raise TimeoutError(f"Function timed out after {timeout_seconds} seconds.")
else:
print("Process以上就是Python怎么给程序设置超时_signal模块与第三方库实现程序超时的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号