gRPC双向流服务端方法必须返回IAsyncEnumerable,否则抛InvalidOperationException;需用IServerStreamWriter.WriteAsync主动推送并监听context.CancellationToken,客户端须调用CompleteAsync且处理异常与超时。

gRPC 双向流方法签名必须返回 IAsyncEnumerable
gRPC .NET 6+ 生成的双向流(bidi streaming)服务端方法,其返回类型固定为 IAsyncEnumerable,客户端调用时也需用 await foreach 消费。这不是可选项——如果手写服务端方法返回 Task 或 IObservable,gRPC 运行时会直接抛出 InvalidOperationException:“Method does not return IAsyncEnumerable”。
生成代码示例(由 .proto 文件编译而来):
public virtual async Task BidirectionalStreamingCall(
IAsyncEnumerable requestStream,
IServerStreamWriter responseStream,
ServerCallContext context)
{
await foreach (var req in requestStream)
{
await responseStream.WriteAsync(new HelloReply { Message = $"Hello, {req.Name}!" });
}
}
注意:这里服务端逻辑是「读请求流 → 写响应流」,但实际双向交互中,你往往需要在写响应的同时监听新请求,或按条件提前退出。此时不能只依赖单层 await foreach。
服务端主动推送需配合 CancellationToken 和手动 WriteAsync
IAsyncEnumerable 本身是拉取模型(pull-based),而双向流的真实需求常是推模型(push-based):比如后台任务持续产出数据、事件总线触发通知、或定时心跳。这时不能把所有逻辑塞进 await foreach (req in requestStream) 里——它会阻塞直到客户端发来下一条请求。
正确做法是:启动独立任务处理推送逻辑,并用 IServerStreamWriter 主动写入;同时监听 context.CancellationToken 确保连接断开时及时取消。
- 不要用
yield return模拟推送(性能差、无法响应取消) - 必须检查
context.CancellationToken.IsCancellationRequested或await context.CancellationToken.WaitHandle避免写入已关闭的流 -
WriteAsync是线程安全的,多个任务可并发调用,但 gRPC 不保证顺序——若需严格序,加锁或串行化写入
客户端消费时 await foreach 会隐式等待流结束
客户端调用双向流后,await foreach (var reply in call.ResponseStream) 会一直挂起,直到服务端完成(即 BidirectionalStreamingCall 方法返回)、或连接异常中断、或显式调用 call.RequestStream.CompleteAsync() 关闭请求流。
常见误操作:
- 忘记调用
await call.RequestStream.CompleteAsync(),导致服务端永远等不到请求流结束,也就不会退出await foreach循环 - 在循环中抛出未捕获异常,使
await foreach提前退出,但服务端仍在写入 → 触发RpcException:“Status(StatusCode=Unknown, Detail="Stream removed"") - 未设置
CallOptions的Deadline,长连接无超时,网络闪断后客户端卡死
安全写法示例:
using var call = client.BidirectionalStreamingCall();
_ = Task.Run(async () =>
{
await foreach (var req in requests)
{
await call.RequestStream.WriteAsync(req);
}
await call.RequestStream.CompleteAsync(); // 必须!
});
await foreach (var reply in call.ResponseStream)
{
Console.WriteLine(reply.Message);
}
序列化与背压问题:避免 WriteAsync 积压引发 OOM
gRPC 默认不启用背压(backpressure):服务端连续调用 WriteAsync,数据会在内存中排队,直到 TCP 窗口满或客户端消费过慢。极端情况下,大量未确认消息堆积在 IServerStreamWriter 内部缓冲区,触发 OutOfMemoryException。
缓解方式只有两种:
- 在写入前检查
context.CancellationToken,并在循环中加入轻量级await Task.Yield()让出控制权(不推荐高频使用) - 改用
WriteAsync(T, CancellationToken)重载,并传入一个能响应客户端消费速度的 token —— 但 .NET gRPC 目前不暴露底层流的可写信号,所以**实际唯一可控手段是业务层限速**:例如每秒最多推送 N 条,或根据上一条WriteAsync的耗时动态调整间隔
没有银弹。如果你的场景要求高吞吐低延迟,且客户端处理能力波动大,得在协议层加应答机制(如客户端每收到 K 条就发一个 Ack),服务端据此控制发送节奏。










