ConcurrentQueue无法替代环形缓冲区,因其链表实现导致内存不连续、缺乏原子批次操作、不支持预分配与零拷贝;环形缓冲区在SPSC场景下凭借固定容量、缓存友好和无锁特性,更适用于高吞吐低延迟场景。

为什么不能直接用 ConcurrentQueue 代替环形缓冲区
因为 ConcurrentQueue 是链表实现,内存不连续,无法保证写入/读取的原子性批次,也不支持预分配固定大小和零拷贝访问。环形缓冲区核心价值在于:确定容量、缓存友好、单生产者/单消费者(SPSC)场景下免锁、支持指针快速读写。如果你需要的是高吞吐低延迟的日志暂存、网络包收发或实时音频流缓冲,ConcurrentQueue 的 GC 压力和间接寻址开销会成为瓶颈。
如何用 Interlocked 实现 SPSC 无锁环形缓冲区
关键不是“完全不用锁”,而是避免 lock 语句阻塞线程;SPSC 场景下,仅用 Interlocked.CompareExchange 和 Interlocked.Add 即可协调读写索引。必须满足:一个线程只写、一个线程只读,且不允许多对一或一对多。
- 缓冲区底层数组用
T[]预分配,长度为 2 的幂(便于位运算取模) - 写索引(
_writeIndex)和读索引(_readIndex)均为long类型,避免 32 位溢出导致误判 - 实际位置用
index & (_capacity - 1)计算,比% _capacity快且安全 - 写操作前先用
Interlocked.CompareExchange检查是否有足够空位,失败则返回false(不阻塞) - 读操作同理,检查是否有数据可读,再用
Interlocked.Add批量推进读索引
public sealed class RingBuffer{ private readonly T[] _buffer; private readonly int _capacity; private readonly int _mask; private long _writeIndex; private long _readIndex; public RingBuffer(int capacity) { _capacity = RoundUpToPowerOfTwo(capacity); _mask = _capacity - 1; _buffer = new T[_capacity]; } public bool TryWrite(T item) { long writePos = Interlocked.Read(ref _writeIndex); long readPos = Interlocked.Read(ref _readIndex); long available = _capacity - (writePos - readPos); if (available zuojiankuohaophpcn= 0) return false; _buffer[writePos & _mask] = item; Interlocked.Increment(ref _writeIndex); return true; } public bool TryRead(out T item) { item = default!; long writePos = Interlocked.Read(ref _writeIndex); long readPos = Interlocked.Read(ref _readIndex); if (writePos == readPos) return false; item = _buffer[readPos & _mask]; Interlocked.Increment(ref _readIndex); return true; } private static int RoundUpToPowerOfTwo(int v) { v--; v |= v youjiankuohaophpcnyoujiankuohaophpcn 1; v |= v youjiankuohaophpcnyoujiankuohaophpcn 2; v |= v youjiankuohaophpcnyoujiankuohaophpcn 4; v |= v youjiankuohaophpcnyoujiankuohaophpcn 8; v |= v youjiankuohaophpcnyoujiankuohaophpcn 16; return v + 1; }}
为什么
volatile不够,而必须用Interlocked.Read在 x86/x64 上,
volatile字段读写会插入内存屏障,但不能保证“读-改-写”操作的原子性。比如两个线程同时执行_writeIndex++,即使字段是volatile,仍可能丢失一次自增。而Interlocked.Read(ref _writeIndex)不仅保证读取最新值,还强制刷新 CPU 缓存行,确保你看到的是其他线程写入后的结果。尤其在 ARM 平台上,缺少Interlocked会导致读写索引严重错乱。容易被忽略的边界:批量读写与内存可见性
上面示例是单元素读写,实际中常需
TryWriteBatch或TryReadBatch。这时不能简单循环调用TryWrite,否则每轮都重复检查可用空间,效率低且逻辑错乱。正确做法是:一次计算最大可写数量,用Interlocked.CompareExchange原子预留位置,再逐个赋值,最后用Interlocked.Add提交写索引偏移。同样,读端也要先确认数据量,再批量复制,最后提交读索引——否则中间被写端覆盖就丢数据了。另外,如果
T是引用类型,写入时只是存引用,不触发对象复制;但若T是结构体且较大(如超过 16 字节),要考虑缓存行对齐和复制开销。无锁结构体写入本身没问题,但频繁大结构体搬运会抵消无锁带来的性能优势。










