0

0

c# 如何用 Channel 实现一个批处理(Batching)的后台服务

月夜之吻

月夜之吻

发布时间:2026-01-04 09:45:59

|

948人浏览过

|

来源于php中文网

原创

直接用 Channel 做批处理易丢数据,因其无“凑够N条”或“超时提交”语义;需封装定时器+批量读取逻辑,并确保 FlushBatchAsync 支持取消、不吞异常、清空列表,且生命周期与 IHostedService 对齐。

c# 如何用 channel 实现一个批处理(batching)的后台服务

为什么直接用 Channel 做批处理容易丢数据

因为 Channel 本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠 Channel.Reader.ReadAsync() 一条条读,就退化成单条处理;如果自己加循环 TryRead 拼批次,又得手动管超时、取消、边界条件——稍不留神,Writer 关闭时未读完的数据就丢了,或者批次卡住不触发。

核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。

Channel + Timer 实现可靠批处理的关键点

推荐用一个后台 Task 持续从 Channel.Reader 尝试批量读取,同时用 System.Threading.Timer 触发“兜底提交”。注意三点:

  • Timer 的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法)
  • 每次读取前检查 Reader.Completion.IsCompleted,避免在 Channel 关闭后还尝试读
  • 批次收集必须用 List 而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
    var batch = new List();
    var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

    try
    {
        while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
        {
            while (_channel.Reader.TryRead(out var item))
            {
                batch.Add(item);
                if (batch.Count >= _batchSize)
                {
                    await FlushBatchAsync(batch, ct).ConfigureAwait(false);
                    batch.Clear();
                    timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
                }
            }

            // 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
            timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
        }
    }
    finally
    {
        timer.Dispose();
        if (batch.Count > 0)
            await FlushBatchAsync(batch, ct).ConfigureAwait(false);
    }
}

FlushBatchAsync 必须支持取消且不能吞异常

这是最容易出问题的一环:如果 FlushBatchAsync 里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递 CancellationToken,并在 catch 块中区分 OperationCanceledException 和其他异常。

叮当好记-AI音视频转图文
叮当好记-AI音视频转图文

AI音视频转录与总结,内容学习效率 x10!

下载
  • 遇到 OperationCanceledException:直接 return,不要重试,因为上层已要求停止
  • 遇到其他异常:记录日志,但不要 throw —— 否则整个 BatchingLoopAsync 会退出,后续数据全丢
  • 如果需要重试,应在 FlushBatchAsync 内部做(比如用 Polly),而不是让外层循环崩溃
private async Task FlushBatchAsync(List batch, CancellationToken ct)
{
    try
    {
        await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false);
    }
    catch (OperationCanceledException) when (ct.IsCancellationRequested)
    {
        // 正常退出路径,不记日志
        return;
    }
    catch (Exception ex) when (!(ex is OperationCanceledException))
    {
        _logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count);
        // 不 throw,继续下一轮
    }
    finally
    {
        batch.Clear(); // 确保清空,避免引用残留
    }
}

注册为 IHostedService 时要注意生命周期绑定

Channel 的 WriterReader 都需要和宿主生命周期对齐。常见错误是把 Channel.CreateBounded() 放在构造函数里,但没在 StopAsync 中显式调用 Writer.Complete(),导致 BatchingLoopAsync 永远等在 WaitToReadAsync 上,服务无法正常退出。

  • StartAsync 中启动 BatchingLoopAsync 并用 Task.RunBackgroundService 托管
  • StopAsync 中先调 _channel.Writer.Complete(),再 await _batchingTask 等它自然结束
  • 别在 Dispose 里做任何异步清理——IHostedService 的 Dispose 是同步的

真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

475

2023.08.10

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

242

2025.11.14

golang channel相关教程
golang channel相关教程

本专题整合了golang处理channel相关教程,阅读专题下面的文章了解更多详细内容。

341

2025.11.17

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

339

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2069

2023.08.14

vb怎么连接数据库
vb怎么连接数据库

在VB中,连接数据库通常使用ADO(ActiveX 数据对象)或 DAO(Data Access Objects)这两个技术来实现:1、引入ADO库;2、创建ADO连接对象;3、配置连接字符串;4、打开连接;5、执行SQL语句;6、处理查询结果;7、关闭连接即可。

346

2023.08.31

MySQL恢复数据库
MySQL恢复数据库

MySQL恢复数据库的方法有使用物理备份恢复、使用逻辑备份恢复、使用二进制日志恢复和使用数据库复制进行恢复等。本专题为大家提供MySQL数据库相关的文章、下载、课程内容,供大家免费下载体验。

253

2023.09.05

vb中怎么连接access数据库
vb中怎么连接access数据库

vb中连接access数据库的步骤包括引用必要的命名空间、创建连接字符串、创建连接对象、打开连接、执行SQL语句和关闭连接。本专题为大家提供连接access数据库相关的文章、下载、课程内容,供大家免费下载体验。

321

2023.10.09

从零到实战:Python 编程系统入门专题
从零到实战:Python 编程系统入门专题

本专题面向零编程基础及初学者,系统讲解 Python 编程语言的核心知识与实战技巧。内容涵盖 Python 基础语法、数据结构、函数与模块、常用标准库、简单算法思维,以及真实应用场景下的小项目实战。通过循序渐进的学习路径,帮助读者快速建立编程思维,掌握 Python 在数据处理、自动化脚本及日常开发中的实际应用能力,为后续深入学习 Web 开发、数据分析或人工智能打下坚实基础。

2

2026.01.05

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
WEB前端教程【HTML5+CSS3+JS】
WEB前端教程【HTML5+CSS3+JS】

共101课时 | 8.1万人学习

JS进阶与BootStrap学习
JS进阶与BootStrap学习

共39课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号