直接用 Channel 做批处理易丢数据,因其无“凑够N条”或“超时提交”语义;需封装定时器+批量读取逻辑,并确保 FlushBatchAsync 支持取消、不吞异常、清空列表,且生命周期与 IHostedService 对齐。

为什么直接用 Channel 做批处理容易丢数据
因为 Channel 本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠 Channel.Reader.ReadAsync() 一条条读,就退化成单条处理;如果自己加循环 TryRead 拼批次,又得手动管超时、取消、边界条件——稍不留神,Writer 关闭时未读完的数据就丢了,或者批次卡住不触发。
核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。
用 Channel + Timer 实现可靠批处理的关键点
推荐用一个后台 Task 持续从 Channel.Reader 尝试批量读取,同时用 System.Threading.Timer 触发“兜底提交”。注意三点:
-
Timer的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法) - 每次读取前检查
Reader.Completion.IsCompleted,避免在 Channel 关闭后还尝试读 - 批次收集必须用
List而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
var batch = new List();
var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
try
{
while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count >= _batchSize)
{
await FlushBatchAsync(batch, ct).ConfigureAwait(false);
batch.Clear();
timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
}
}
// 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
}
}
finally
{
timer.Dispose();
if (batch.Count > 0)
await FlushBatchAsync(batch, ct).ConfigureAwait(false);
}
}
FlushBatchAsync 必须支持取消且不能吞异常
这是最容易出问题的一环:如果 FlushBatchAsync 里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递 CancellationToken,并在 catch 块中区分 OperationCanceledException 和其他异常。
- 遇到
OperationCanceledException:直接 return,不要重试,因为上层已要求停止 - 遇到其他异常:记录日志,但不要 throw —— 否则整个
BatchingLoopAsync会退出,后续数据全丢 - 如果需要重试,应在
FlushBatchAsync内部做(比如用Polly),而不是让外层循环崩溃
private async Task FlushBatchAsync(Listbatch, CancellationToken ct) { try { await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { // 正常退出路径,不记日志 return; } catch (Exception ex) when (!(ex is OperationCanceledException)) { _logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count); // 不 throw,继续下一轮 } finally { batch.Clear(); // 确保清空,避免引用残留 } }
注册为 IHostedService 时要注意生命周期绑定
Channel 的 Writer 和 Reader 都需要和宿主生命周期对齐。常见错误是把 Channel.CreateBounded 放在构造函数里,但没在 StopAsync 中显式调用 Writer.Complete(),导致 BatchingLoopAsync 永远等在 WaitToReadAsync 上,服务无法正常退出。
-
StartAsync中启动BatchingLoopAsync并用Task.Run或BackgroundService托管 -
StopAsync中先调_channel.Writer.Complete(),再await _batchingTask等它自然结束 - 别在
Dispose里做任何异步清理——IHostedService 的 Dispose 是同步的
真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。










