0

0

确保芹菜的公平加工——第一部分

DDD

DDD

发布时间:2024-11-15 18:40:32

|

645人浏览过

|

来源于dev.to

转载

确保芹菜的公平加工——第一部分

如果您熟悉 python,您很可能听说过 celery。它通常是异步处理任务的首选,例如图像处理或发送电子邮件。

与一些人交谈时,我开始注意到许多开发人员一开始都觉得 celery 令人印象深刻,但随着他们的项目规模和复杂性的增加,他们的兴奋开始消退。虽然有些人出于正当原因放弃了 celery,但其他人可能只是没有深入探索其核心,无法根据自己的需求进行定制。

在这篇博客中,我想讨论一些开发人员开始寻找替代方案甚至构建自定义后台工作框架的原因之一:公平处理。在用户/租户提交不同大小任务的环境中,一个租户的繁重工作量影响其他租户的风险可能会造成瓶颈并导致挫败感。

我将引导您了解在 celery 中实现公平处理的策略,确保平衡的任务分配,以便没有任何一个租户可以支配您的资源。

问题

让我们深入探讨多租户应用程序面临的常见挑战,特别是那些处理批处理的应用程序。想象一下,您有一个系统,用户可以将其图像处理任务排队,允许他们在短暂等待后收到处理后的图像。此设置不仅可以使您的 api 保持响应,还可以让您根据需要扩展工作线程以有效地处理负载。

一切都运行顺利 - 直到一个租户决定提交大量图像进行处理。您拥有多名工作人员,他们甚至可以自动扩展以满足不断增长的需求,因此您对您的基础设施充满信心。然而,当其他租户尝试对较小的批次(可能只是几张图像)进行排队并突然发现自己面临长时间的等待而没有任何更新时,麻烦就开始了。在您不知不觉中,支持请求开始涌入,用户抱怨您的服务速度缓慢甚至没有响应。

这种情况太常见了,因为 celery 默认情况下按照接收到的顺序处理任务。当一个租户因大量涌入的任务而让您的工作人员不堪重负时,即使是最好的自动扩展策略也可能不足以防止其他租户出现延误。因此,这些用户体验到的服务水平可能达不到承诺或预期的水平。

使用 celery 进行速率限制

确保公平处理的一个有效策略是实施速率限制。它允许您控制每个租户在特定时间范围内可以提交的任务数量。这可以防止任何单个租户垄断您的工人,并确保所有租户都有公平的机会来处理他们的任务。

celery 具有内置的任务级别速率限制功能:

# app.py
from celery import celery

app = celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # limit to 10 tasks per minute
def process_data(data):
    print(f"processing data: {data}")

# call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

您可以通过执行以下命令来运行工作线程:

celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

现在,运行app.py脚本来触发20个任务:

python app.py

如果您设法在本地运行它,您会注意到每个任务之间存在延迟,以确保执行速率限制。现在您可能认为这并不能真正帮助我们解决问题,您完全正确。 celery 的内置速率限制对于我们的任务可能涉及调用具有严格速率限制的外部服务的场景非常有用。

这个示例强调了内置功能对于复杂场景来说可能过于简单。然而,我们可以通过更深入地探索 celery 的框架来克服这个限制。让我们看看如何为每个租户设置适当的速率限制和自动重试。

我们将使用 redis 来跟踪每个租户的速率限制。 redis 是 celery 的流行数据库和代理,因此让我们利用这个可能已经在您的堆栈中的组件。

让我们导入几个库:

import time
import redis
from celery import celery, task

现在我们将为我们的速率限制任务实现一个自定义基任务类:

Audo Studio
Audo Studio

AI音频清洗工具(噪音消除、声音平衡、音量调节)

下载
# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)

class ratelimitedtask(task):
    def __init__(self, *args, **kwargs):
        # set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

这个自定义类将跟踪特定租户使用 redis 触发的任务量,并将 ttl 设置为 10 秒。如果超出速率限制,任务将在 10 秒后重试。所以基本上我们的默认速率限制是 10 秒内完成 10 个任务。

让我们定义一个模拟处理的示例任务:

@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    mock processing task that takes 0.3 seconds to complete.
    """
    print(f"processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

这里我们定义了一个流程任务,你可以看到我可以在任务级别更改custom_rate_limit。如果我们不指定 custom_rate_limit,则将分配默认值 10。 现在我们的速率限制已更改为 10 秒内完成 5 个任务。

现在让我们为不同的租户触发一些任务:

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

我们为租户 id 1 定义 20 个任务,为租户 id 2 定义 10 个任务。

所以我们完整的代码将如下所示:

# app.py
import time
import redis
from celery import celery, task

app = celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=false,
)

# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)


class ratelimitedtask(task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    mock processing task that takes 0.3 seconds to complete.
    """
    print(f"processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

让我们运行我们的工作线程:

celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

现在,运行 app.py 脚本来触发任务:

python app.py

如您所见,工作人员处理了第一个租户的 5 个任务,并为所有其他任务设置了重试。然后,它会执行第二个租户的 5 个任务,并为其他任务设置重试,然后继续进行。

这种方法允许您定义每个租户的速率限制,但正如您在我们的示例中看到的,对于运行速度非常快的任务,对速率限制过于严格最终会让工作人员在一段时间内无所事事。微调速率限制参数至关重要,并且取决于具体的任务和数量。不要犹豫,不断尝试,直到找到最佳平衡。

结论

我们探讨了 celery 的默认任务处理如何导致多租户环境中的不公平,以及速率限制如何帮助解决此问题。通过实施特定于租户的速率限制,我们可以防止任何单个租户垄断资源,并确保更公平地分配处理能力。

这种方法为在 celery 中实现公平处理提供了坚实的基础。然而,还有其他值得探索的技术来进一步优化多租户应用程序中的任务处理。虽然我最初计划在一篇文章中涵盖所有内容,但事实证明这个主题非常广泛!为了确保清晰度并保持本文的重点,我决定将其分为两部分。

在本系列的下一部分中,我们将深入研究任务优先级作为增强公平性和效率的另一种机制。这种方法允许您根据不同的标准为任务分配不同的优先级,确保即使在高需求时期也能及时处理关键任务。

敬请期待下期!

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

772

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

661

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

764

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

679

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1345

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

549

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

579

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

730

2023.08.11

菜鸟裹裹入口以及教程汇总
菜鸟裹裹入口以及教程汇总

本专题整合了菜鸟裹裹入口地址及教程分享,阅读专题下面的文章了解更多详细内容。

0

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 13.7万人学习

Django 教程
Django 教程

共28课时 | 3.4万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号