要支持golang环境中的grpc流式通信,核心在于正确定义proto文件并实现服务端与客户端的处理逻辑。一、在.proto文件中使用stream关键字定义流式接口,如双向流需在请求和响应前均添加stream;二、服务端通过recv()接收消息,通过send()发送响应,并循环处理直到收到eof;三、客户端应分goroutine处理recv()和send(),发送完毕调用closesend();四、注意处理超时、断开连接、流关闭及性能优化等问题。

Golang环境要支持gRPC流式通信,核心在于定义好proto文件中的流式接口,并在服务端和客户端正确实现对应的处理逻辑。本文主要讲两个场景:服务端推送(Server Streaming)和双向流(Bidirectional Streaming)的配置方法与注意事项。

一、proto文件中定义流式接口
gRPC的流式通信是通过在
.proto文件中使用
stream关键字来声明的。常见的几种流式方式包括:

- Server streaming:服务端返回多个响应
- Client streaming:客户端发送多个请求
- Bidirectional streaming:双方都收发多个消息
比如一个双向流的接口可以这样写:
立即学习“go语言免费学习笔记(深入)”;
syntax = "proto3";
package example;
service Greeter {
rpc Chat (stream MessageRequest) returns (stream MessageResponse);
}
message MessageRequest {
string content = 1;
}
message MessageResponse {
string reply = 1;
}这个例子中,
Chat方法允许客户端和服务端持续发送和接收消息,适用于聊天类或实时通知类场景。

定义proto时要注意:stream只能用于参数或返回值双向流需要两边都加stream使用protoc生成代码时确保安装了正确的插件,如protoc-gen-go-grpc
二、服务端实现流式处理逻辑
以Go语言为例,实现上述双向流的服务端逻辑大致如下:
type server struct{}
func (s *server) Chat(stream pb.Greeter_ChatServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 根据收到的消息构造回复
resp := &pb.MessageResponse{
Reply: "Received: " + req.Content,
}
// 向客户端发送响应
if err := stream.Send(resp); err != nil {
return err
}
}
}这段代码的核心点在于:
- 使用
Recv()
接收客户端发来的每一条消息 - 使用
Send()
向客户端发送每条响应 - 循环处理直到客户端关闭连接(收到EOF)
注意:不要在goroutine里直接使用stream.Send或Recv,除非你做了同步控制。因为gRPC的流不是并发安全的。
三、客户端发起双向流请求
客户端的实现也很简单,主要流程是启动一个流,然后在一个goroutine中持续读取服务端的返回,另一个goroutine用来发送请求。
示例代码如下:
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatalf("Failed to open stream: %v", err)
}
// 单独协程读取服务端返回
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error receiving message: %v", err)
break
}
fmt.Println("Server says:", resp.Reply)
}
}()
// 主协程发送消息给服务端
for i := 0; i < 5; i++ {
msg := &pb.MessageRequest{
Content: fmt.Sprintf("Message %d", i),
}
if err := stream.Send(msg); err != nil {
log.Printf("Error sending message: %v", err)
break
}
time.Sleep(time.Second)
}
stream.CloseSend()这里的关键点:
stream.Recv()
和stream.Send()
应该分开处理,避免阻塞- 发送完所有消息后记得调用
CloseSend()
,否则服务端会一直等待 - 处理错误和EOF的情况很重要,否则容易导致程序卡死
四、常见问题与优化建议
实际开发中可能会遇到一些坑,比如:
- 连接超时或断开:可以在上下文中设置合理的超时时间,或加入重连机制
-
流被意外关闭:检查是否提前调用了
CloseSend()
,或者服务端主动结束 - 性能瓶颈:如果消息量很大,考虑压缩数据或调整gRPC的传输限制(max-message-size)
另外,如果你要做服务端主动推送(不等客户端发消息),也可以用Server Streaming的方式,原理类似,只是proto中只对返回值加
stream即可。
基本上就这些。流式通信虽然比普通RPC复杂一点,但结构清晰、功能强大,适合实时性要求高的场景。只要proto定义清楚,服务端和客户端逻辑配合得当,用起来还是很顺手的。










