推荐用 Channel 实现线程安全异步生产者消费者队列,它无锁、轻量、原生支持 async/await;避免误用 ConcurrentQueue 或 BlockingCollection 的同步阻塞操作。

用 Channel 实现线程安全的异步生产者消费者队列
直接上结论:C# 6+(.NET Core 2.1+)推荐用 System.Threading.Channels.Channel,它专为高并发异步场景设计,比手写 BlockingCollection + Task 组合更轻量、无锁、原生支持 async/await。
常见错误是试图用 ConcurrentQueue 自己封装 awaitable 操作——它本身不提供异步等待能力,强行加 Task.Delay 或轮询会浪费 CPU;也有人误用 BlockingCollection 的 Take(),它会同步阻塞线程,破坏 async 上下文。
-
Channel.CreateBounded创建有界通道,溢出时可配置拒绝策略(如() DropWrite或抛异常) -
Channel.CreateUnbounded适合写入吞吐优先、内存可控的场景() - 生产端调用
Writer.WriteAsync(item),消费端用Reader.ReadAsync()—— 两者都返回ValueTask,无分配开销 - 通道关闭后,
Reader.ReadAsync()会完成并返回default(T),需配合TryRead或检查WaitToReadAsync().IsCompletedSuccessfully
var channel = Channel.CreateBounded(100); // 生产者 _ = Task.Run(async () => { for (int i = 0; i < 5; i++) { await channel.Writer.WriteAsync($"msg-{i}"); await Task.Delay(100); } channel.Writer.Complete(); });
// 消费者 _ = Task.Run(async () => { await foreach (var msg in channel.Reader.ReadAllAsync()) { Console.WriteLine($"Received: {msg}"); } });
如何正确处理消费者异常与通道终止
消费者任务崩溃或未捕获异常会导致 ReadAllAsync() 提前退出,但通道可能仍有未读项;更隐蔽的问题是:生产者调用 Complete() 后,若消费者没读完就退出,剩余数据会丢失。
- 必须在
await foreach外层包try/catch,否则异常会静默终止迭代 - 若需确保所有已入队消息被处理,不要依赖
channel.Writer.Complete()触发消费者退出——应另设信号(如CancellationToken)协调停机 -
Reader.Completion是Task,反映消费者侧是否完成(包括异常终止),可用于await channel.Reader.Completion等待消费结束 - 避免在消费者中直接
await channel.Reader.Completion—— 它不会等未读项,只等迭代器退出
什么时候不该用 Channel?
不是所有“队列”需求都适合 Channel。它定位是“流式数据传输”,不提供随机访问、计数查询、中间件插拔等能力。
- 需要实时获取当前队列长度?
Channel.Reader.Count只在有界通道且未被并发写入时可靠;无界通道返回 -1 - 要支持多个消费者竞争消费同一消息?
Channel是单消费者语义;此时该用ServiceBus或RabbitMQ - 消息需持久化、重试、死信?
Channel纯内存,崩溃即丢;必须外接存储层 - 项目还在 .NET Framework 4.8?
Channel不可用,只能降级用BlockingCollection+GetConsumingEnumerable()(但无法真正异步)
替代方案:BlockingCollection 的异步包装陷阱
有人用 Task.Run(() => collection.Take()) 伪异步,这本质是线程池抢占,增加调度开销且无法取消;正确做法是仅在必须兼容旧框架时,用 TryTake 配合短时 Task.Delay 循环,但务必设超时和取消令牌。
- 永远不要写
await Task.Run(() => collection.Take())—— 这违背 async/await 减少线程占用的初衷 - 若坚持用
BlockingCollection,消费循环应类似:while (collection.TryTake(out var item, 10, token)) { ... } -
BlockingCollection的Add在有界模式下可能阻塞线程,而Channel.Writer.WriteAsync在满时默认返回ValueTask并挂起,更可控
实际落地时,最易忽略的是消费者异常传播路径和通道生命周期管理——写个 await foreach 很容易,但谁负责捕获异常?谁决定何时 Complete()?这些边界不厘清,上线后就会出现消息静默丢失或消费者假死。










