
在基于Celery构建分布式任务系统时,我们经常会遇到需要严格顺序执行的业务流程。然而,当这些流程中的某个环节需要根据运行时数据动态生成并调度多个子任务,并且主任务必须等待所有这些动态子任务完成后才能继续时,Celery内置的编排原语(如chain、chord)往往显得力不从心。这是因为chain和chord通常要求在它们被创建时,所有参与任务的签名(signatures)都已明确定义。对于在父任务执行过程中才动态产生的子任务,这种静态编排模式无法有效支持。
尽管apply_async方法提供了add_to_parent参数(默认为True),它确实能够在结果后端(如Redis)中建立父子任务的关联。然而,这主要是一种元数据层面的记录,Celery并不会利用这一信息来动态调整已调度任务的依赖关系,也无法自动阻塞父任务的执行以等待动态子任务的完成。因此,为了实现动态子任务的同步等待,我们需要采取一种更手动、更精细的控制策略。
解决动态子任务同步等待问题的核心思路是:
这种方法绕过了Celery编排的静态限制,赋予了开发者对动态依赖关系更细粒度的控制权。
以下是一个具体的Python/Celery实现示例,演示了如何在一个主任务中动态创建子任务,并通过一个辅助函数等待它们的完成。
假设我们有一个主任务task_dummy_task1,它会创建多个task_dummy_subtask,有些直接创建,有些通过一个中间函数intermediary_dummy_subtask_function创建。所有这些子任务都必须在task_dummy_task1继续其最终逻辑之前完成。
主任务task_dummy_task1负责协调整个流程。它会直接或间接地调度子任务,并收集它们的异步结果ID。
import time
from celery import Celery, Task
from celery.result import AsyncResult
from typing import List
# 假设 app 已经初始化,并且配置了 Redis 作为 broker 和 result backend
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
# 假设 JobMaster 和 consts 是用于自定义日志和状态管理的模块
# 在实际应用中,您可以替换为自己的日志系统或直接使用 print
class JobMaster:
@staticmethod
def get_job(job_id, job_title):
# 模拟获取一个任务对象,用于记录日志
print(f"[{job_title}] Getting job {job_id if job_id else 'new'}")
return type('Job', (object,), {'log_message': lambda self, log_message, **kwargs: print(f"[{job_title}] {log_message}")})(), job_id if job_id else 1 # 模拟返回一个job对象和job_id
class consts:
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERRORS_FOUND = "ERRORS_FOUND"
@app.task(bind=True)
def task_dummy_task1(self: Task, part_number: int, job_id: int = None):
job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
sleeping_duration = 1 # 缩短等待时间以便测试
subtask_ids = []
job.log_message(log_message=f"Entered dummy task 1 with sleeping duration of {sleeping_duration}")
job.log_message(log_message="In dummy task1, creating subtask a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask c")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_c", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating intermediary subtask d")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_d", job_id=job_id)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating intermediary subtask e")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_e", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration) # 主任务执行一些自己的逻辑
# 等待所有动态子任务完成
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete")
job.log_message(log_message="Finished dummy task1 main body")
return part_number
@app.task
def task_dummy_subtask(parent_task_name: str, job_id: int):
job, _ = JobMaster.get_job(job_id, job_title=f"subtask-{parent_task_name}")
sleep_time = 2 # 模拟子任务耗时
job.log_message(log_message=f"Subtask {parent_task_name} started, will sleep for {sleep_time}s")
time.sleep(sleep_time)
job.log_message(log_message=f"Subtask {parent_task_name} finished")
return f"Result from {parent_task_name}"在上述代码中:
有时,子任务的创建逻辑可能封装在另一个辅助函数中。这并不影响我们的核心策略,只要该辅助函数能返回子任务的AsyncResult对象即可。
def intermediary_dummy_subtask_function(parent_task_name, job_id) -> AsyncResult:
job, _ = JobMaster.get_job(job_id, job_title="dummy task")
job.log_message(
log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r这个intermediary_dummy_subtask_function函数只是简单地封装了task_dummy_subtask.apply_async的调用,并返回了AsyncResult对象,其ID随后被主任务收集。
wait_for_tasks_to_complete函数是实现同步等待的核心。它会循环检查所有待完成子任务的状态。
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
job, _ = JobMaster.get_job(job_id, job_title="waiting for refresh data")
job.log_message(log_message=f"Waiting for {len(async_ids)} tasks to complete, {msg}", status=consts.IN_PROGRESS,
job_score=0)
job.log_message(log_message=f"tasks: {async_ids}", status=consts.IN_PROGRESS, job_score=0)
# 创建一个可变的列表用于跟踪未完成的任务ID
remaining_async_ids = list(async_ids)
count_down = timeout
while count_down > 0:
# 遍历 remaining_async_ids 的副本,因为我们可能在循环中修改它
for async_id in list(remaining_async_ids):
result = app.AsyncResult(async_id) # 获取任务结果对象
status = result.status
if status == "SUCCESS":
# 任务成功完成
returned_value = result.result
job.log_message(log_message=f"Confirmed status SUCCESS for task {async_id} with {returned_value=}")
remaining_async_ids.remove(async_id) # 从待处理列表中移除
elif status in ["PENDING", "STARTED", "RETRY"]:
# 任务仍在进行中或等待执行
pass
elif status in ["FAILURE", "REVOKED"]:
# 任务失败或被撤销,需要根据业务逻辑处理
job.log_message(log_message=f"Task {async_id} failed or revoked with status {status}. Error: {result.info}",
status=consts.ERRORS_FOUND)
# 可以在这里选择抛出异常,或将失败任务从列表中移除并继续等待其他任务
remaining_async_ids.remove(async_id)
# 示例:如果一个失败就认为整体失败,可以立即返回或抛出异常
# raise Exception(f"Subtask {async_id} failed!")
if not remaining_async_ids: # 所有任务都已完成或处理完毕
job.log_message(log_message="Finished waiting for refresh data, all tasks succeeded or handled",
status=consts.COMPLETED, job_score=100)
return
count_down -= 1
if count_down % 10 == 0 or count_down == timeout -1: # 每隔一段时间或首次轮询时打印进度
job.log_message(log_message=f"There are {len(remaining_async_ids)} tasks remaining. Timeout in {count_down}s")
time.sleep(1) # 每秒轮询一次,避免CPU空转
# 超时处理
job.log_message(log_message=f"After waiting for {timeout=}s, some tasks did not complete on time. Remaining tasks: {remaining_async_ids}",
status=consts.ERRORS_FOUND, job_score=100)
# 可以在这里抛出异常或返回特定状态此等待函数的核心逻辑如下:
阻塞性影响: 这种手动轮询的方法会阻塞父任务所在的Celery worker进程,直到所有子任务完成或超时。这意味着在等待期间,该worker无法处理其他任务。如果父任务的等待时间很长,这可能会影响系统的吞吐量。对于对响应时间要求极高的场景,可能需要考虑更复杂的非阻塞模式(如使用Celery的callbacks、errbacks或外部状态机)。
错误处理: 上述wait_for_tasks_to_complete函数中增加了对FAILURE和REVOKED状态的初步处理。在实际应用中,您需要根据业务需求细化错误处理逻辑:
性能考量:
非阻塞替代方案(高级): 对于需要完全非阻塞的场景,可以考虑以下模式:
尽管Celery的内置编排工具在处理静态任务流时非常强大,但在面对动态生成的子任务并需要同步等待其完成的场景时,开发者需要手动实现一套轮询机制。通过收集子任务ID并在父任务中主动查询这些任务的状态,我们可以有效地突破Celery编排的限制,确保业务逻辑的正确性和数据完整性。在实现过程中,务必关注阻塞性、错误处理和性能优化等关键因素,以构建健壮且高效的分布式任务系统。
以上就是Celery动态子任务的同步等待机制:突破编排限制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号