
本文探讨了在 celery 中处理动态创建子任务并等待其完成的挑战,尤其是在传统 celery 编排(如 `chain` 或 `chord`)不适用的场景。由于 celery 的内置编排机制要求任务签名在创建时已知,对于运行时动态生成的子任务,需要一种自定义的解决方案。文章提供了一种基于手动收集子任务 id 和轮询其状态的实现方法,以确保父任务在所有动态子任务完成后才继续执行。
在构建复杂的异步任务流时,Celery 提供了强大的编排工具,如 chain、group 和 chord。然而,当业务逻辑需要在父任务执行过程中动态生成并调度子任务,并且父任务必须等待所有这些动态子任务完成后才能继续时,传统的编排方式便显得力不从心。本文将深入探讨这一问题,并提供一种实用的解决方案。
设想一个场景:一个主任务负责从外部 API 分页获取数据。每获取一页数据,都需要触发一个子任务来处理该页数据并写入数据库。由于 API 响应时间不确定,以及处理和写入数据库的时间可能较长,我们希望将每页数据的处理卸载到独立的子任务中,以提高整体的墙钟时间效率。关键在于,主任务必须确保所有这些动态生成的数据库写入子任务完成后,才能执行下一个顶层操作,以维护数据完整性。
Celery 的 chain、group 和 chord 等编排原语,其核心设计理念是基于预先定义的任务签名(signature)。这意味着,当一个 chain 或 chord 被创建时,它所包含的所有任务及其依赖关系都必须是已知的。
因此,对于在父任务运行时动态创建子任务并要求父任务等待其完成的需求,Celery 的原生编排机制无法直接满足。我们需要一种手动管理依赖关系的方法。
解决此问题的核心思路是:在父任务中动态创建子任务时,收集这些子任务的唯一标识符(AsyncResult.id)。然后,父任务进入一个循环,周期性地检查这些子任务的执行状态,直到所有子任务都完成(成功或失败),父任务才能继续执行后续逻辑。
这种方法将任务间的同步控制从 Celery 的编排层下放到了应用程序代码层。
以下是一个详细的 Python 示例,演示了如何在 Celery 中实现动态子任务的等待机制。
首先,假设我们定义了两个 Celery 任务:一个主任务 task_dummy_task1 和一个子任务 task_dummy_subtask,以及一个用于创建子任务的中间函数。
import time
from celery import Celery, Task
from celery.result import AsyncResult
from typing import List, Tuple
# 假设 app 已经配置好,broker 和 backend 都已设置
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# 模拟一个 JobMaster 类用于日志记录,实际应用中替换为您的日志系统
class JobMaster:
def __init__(self, job_id, job_title):
self.job_id = job_id
self.job_title = job_title
@staticmethod
def get_job(job_id: int, job_title: str) -> Tuple['JobMaster', int]:
# 实际应用中可能从数据库获取或创建 Job 实例
if job_id is None:
job_id = int(time.time()) # 简单模拟一个 ID
return JobMaster(job_id, job_title), job_id
def log_message(self, log_message: str, status=None, job_score=None):
print(f"[{self.job_title} - {self.job_id}] {log_message}")
# 假设 consts 包含状态常量
class consts:
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERRORS_FOUND = "ERRORS_FOUND"
@app.task(bind=True)
def task_dummy_subtask(self: Task, parent_task_name: str, job_id: int = None):
job, _ = JobMaster.get_job(job_id, job_title="dummy subtask")
job.log_message(log_message=f"Entered subtask for {parent_task_name}. Simulating work...")
time.sleep(2) # 模拟耗时操作
job.log_message(log_message=f"Finished subtask for {parent_task_name}.")
return f"Subtask {parent_task_name} completed successfully."
def intermediary_dummy_subtask_function(parent_task_name: str, job_id: int) -> AsyncResult:
job, _ = JobMaster.get_job(job_id, job_title="intermediary task")
job.log_message(
log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
# 注意:add_to_parent=True 仅用于在结果后端建立父子关系,不影响阻塞行为
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
"""
等待一组 Celery 任务完成。
Args:
async_ids: 需要等待的子任务 ID 列表。
job_id: 关联的作业 ID,用于日志记录。
msg: 等待过程中的提示信息。
timeout: 最长等待时间(秒)。
"""
job, _ = JobMaster.get_job(job_id, job_title="waiting for tasks")
job.log_message(log_message=f"等待 {len(async_ids)} 个任务完成, {msg}", status=consts.IN_PROGRESS)
job.log_message(log_message=f"任务ID: {async_ids}", status=consts.IN_PROGRESS)
remaining_ids = list(async_ids) # 复制一份,因为会修改
start_time = time.time()
while remaining_ids and (time.time() - start_time < timeout):
# 遍历剩余任务,检查状态
tasks_to_remove = []
for async_id in remaining_ids:
result = app.AsyncResult(async_id)
status = result.status
if status == "SUCCESS":
returned_value = result.result
job.log_message(log_message=f"任务 {async_id} 状态: SUCCESS, 返回值: {returned_value}")
tasks_to_remove.append(async_id)
elif status in ("FAILURE", "REVOKED", "RETRY"):
job.log_message(log_message=f"任务 {async_id} 状态: {status}. 错误信息: {result.traceback if status == 'FAILURE' else 'N/A'}", status=consts.ERRORS_FOUND)
tasks_to_remove.append(async_id) # 视为完成,但可能需要进一步处理错误
# else: PENDING, STARTED 等状态,继续等待
# 移除已完成的任务
for tid in tasks_to_remove:
if tid in remaining_ids: # 避免重复移除或并发问题
remaining_ids.remove(tid)
if not remaining_ids:
job.log_message(log_message="所有任务均已完成。", status=consts.COMPLETED, job_score=100)
return
job.log_message(log_message=f"仍有 {len(remaining_ids)} 个任务待完成。")
time.sleep(1) # 每秒检查一次
# 超时处理
job.log_message(
log_message=f"等待超时 ({timeout}s)。仍有 {len(remaining_ids)} 个任务未完成。",
status=consts.ERRORS_FOUND,
job_score=100
)
@app.task(bind=True)
def task_dummy_task1(self: Task, part_number: int, job_id: int = None):
job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
sleeping_duration = 1 # 模拟一些前期工作
subtask_ids = []
job.log_message(log_message=f"进入主任务 1,模拟前期工作 {sleeping_duration} 秒。")
# 直接创建子任务
job.log_message(log_message="主任务 1: 创建子任务 a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 创建子任务 b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 创建子任务 c")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_c", "job_id": job_id}, add_to_parent=True)
subtask_ids.append(subtask.id)
# 通过中间函数创建子任务
job.log_message(log_message="主任务 1: 通过中间函数创建子任务 d")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_d", job_id=job_id)
subtask_ids.append(subtask.id)
job.log_message(log_message="主任务 1: 通过中间函数创建子任务 e")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_e", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration) # 模拟主任务在创建子任务后继续做一些工作
# 等待所有子任务完成
job.log_message(log_message=f"主任务 1: 开始等待 {len(subtask_ids)} 个子任务完成。")
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="等待所有动态子任务完成", timeout=60) # 设置一个合理的超时时间
job.log_message(log_message="主任务 1: 所有子任务已完成或超时,继续执行主任务的后续逻辑。")
return part_number
# 如何调用主任务 (在另一个脚本或 Celery worker 启动后)
# if __name__ == '__main__':
# task_dummy_task1.delay(part_number=123)task_dummy_task1 (父任务):
wait_for_tasks_to_complete (等待函数):
intermediary_dummy_subtask_function (中间函数):
当 Celery 的静态编排原语无法满足动态创建子任务并等待其完成的需求时,手动收集子任务 ID 并实现轮询等待机制是一个有效且直接的解决方案。这种方法虽然会在父任务执行期间阻塞 Worker 进程,但在许多需要严格顺序和数据完整性的场景中是可接受的。在实际应用中,应根据业务需求和系统负载,合理配置轮询间隔和超时时间,并完善错误处理逻辑,以确保系统的健壮性和效率。
以上就是Celery 动态子任务的等待机制:绕过静态编排限制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号