0

0

GolangRPC服务端流与客户端流应用案例

P粉602998670

P粉602998670

发布时间:2025-09-06 10:20:03

|

691人浏览过

|

来源于php中文网

原创

服务端流适用于实时数据订阅、传感器监控和任务进度更新,客户端流适合大文件分块上传和日志批量上报,双向流支持实时交互场景如聊天和在线协作,三者基于HTTP/2和Protobuf stream实现,提供高效、持续的双向通信能力。

golangrpc服务端流与客户端流应用案例

在Golang gRPC的实践中,服务端流和客户端流是两种非常强大的通信模式,它们打破了传统RPC请求-响应的单一循环,允许客户端与服务端之间进行更灵活、更持续的数据交换。简单来说,服务端流允许服务器对一个客户端请求发送多个响应,而客户端流则允许客户端向服务器发送多个请求,最终由服务器返回一个单一的响应。这两种模式极大地扩展了gRPC的应用场景,使得构建实时、高效的分布式系统成为可能。

解决方案

理解Golang gRPC的流式通信,核心在于其底层的HTTP/2协议以及Protobuf定义中

stream
关键字的应用。当你需要在客户端和服务端之间进行连续的数据传输时,无论是单向还是双向,流都是首选。

首先,在

.proto
文件中定义流式方法,你需要在参数类型或返回类型前加上
stream
关键字。例如:

syntax = "proto3";

package mypackage;

service MyService {
  // 服务端流:客户端发送一个请求,服务端发送多个响应
  rpc ServerStreamExample(Request) returns (stream Response);

  // 客户端流:客户端发送多个请求,服务端发送一个响应
  rpc ClientStreamExample(stream Request) returns (Response);

  // 双向流:客户端和服务端都可以发送多个消息
  rpc BidirectionalStreamExample(stream Request) returns (stream Response);
}

message Request {
  string data = 1;
}

message Response {
  string result = 1;
}

在Golang中,

protoc
工具会为这些流式方法生成相应的接口。对于服务端,你实现的接口方法会接收到一个
stream
上下文,你可以通过它发送或接收消息。对于客户端,生成的客户端存根会提供类似的方法,允许你创建并管理一个流。

立即学习go语言免费学习笔记(深入)”;

服务端流的实现通常涉及在一个循环中多次调用

Send()
方法,直到所有数据发送完毕。客户端流则是在一个循环中多次调用
Send()
,并在发送结束后调用
CloseAndRecv()
来获取最终响应。双向流则更为复杂,需要同时管理发送和接收逻辑,通常通过独立的goroutine来处理接收,而主goroutine处理发送。

一个常见的挑战是流的生命周期管理和错误处理。无论是哪种流,当一方关闭流或发生错误时,另一方都应该能够优雅地检测到并做出响应。例如,服务端在发送数据时,如果客户端断开连接,

Send()
操作会返回一个错误,你需要捕获并处理。同样,客户端在接收数据时,如果服务端关闭流或出错,
Recv()
也会返回
io.EOF
或具体错误。

Golang gRPC服务端流适用于哪些场景?

服务端流(Server Streaming)在Golang gRPC中,就好比服务端打开了一个数据水龙头,客户端请求一次,服务端就能源源不断地推送数据。这种模式特别适合那些需要实时更新或大量数据一次性传输的场景。

我个人觉得,最典型的应用就是实时数据订阅。想象一下股票交易系统,客户端只需要订阅一次某个股票代码,服务端就能持续推送最新的价格变动,而不需要客户端反复轮询。传感器数据监控也是一个很好的例子,比如工业物联网中,一个边缘设备不断向云端推送环境数据,云端的一个gRPC服务就可以通过服务端流将这些数据实时分发给多个监控面板。

再比如,长时间运行任务的进度更新。如果有一个非常耗时的计算任务,客户端发起请求后,服务端可以每隔一段时间通过流发送任务的当前进度百分比或阶段性结果,而不是让客户端干等一个最终响应。这不仅提升了用户体验,也使得系统在处理复杂任务时更加透明。

在实际操作中,服务端流的实现逻辑相对直观。服务端接收到客户端的初始请求后,会进入一个循环,不断地向客户端发送

Response
消息。当所有数据发送完毕,或者发生特定事件需要终止流时,服务端会关闭流。客户端则会持续地从流中接收消息,直到收到
io.EOF
(表示流已结束)或遇到其他错误。

家政清洁钟点服务网站源码1.7.1
家政清洁钟点服务网站源码1.7.1

家政清洁钟点服务网站源码是基于易优cms内核开发的家政保洁网站系统,页面采用适合家政保洁行业的设计风格,增强了行业的针对性和适应性,除了精美的首页还加了关于我们、施工流程、新闻资讯、成功案例等栏目页面,主要包含电脑端及移动端需要小程序端可以安装易优cms小程序插件。

下载
// 服务端实现示例 (伪代码)
func (s *myServiceServer) ServerStreamExample(req *pb.Request, stream pb.MyService_ServerStreamExampleServer) error {
    log.Printf("Received request from client: %s", req.GetData())
    for i := 0; i < 5; i++ {
        resp := &pb.Response{Result: fmt.Sprintf("Data chunk %d", i)}
        if err := stream.Send(resp); err != nil {
            log.Printf("Failed to send: %v", err)
            return err // 客户端可能已断开
        }
        time.Sleep(time.Second) // 模拟数据生成延迟
    }
    log.Println("Server stream finished.")
    return nil
}

// 客户端调用示例 (伪代码)
func callServerStream(client pb.MyServiceClient) {
    req := &pb.Request{Data: "Start streaming"}
    stream, err := client.ServerStreamExample(context.Background(), req)
    if err != nil {
        log.Fatalf("could not start stream: %v", err)
    }
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            log.Println("Server stream closed.")
            break
        }
        if err != nil {
            log.Fatalf("Error receiving: %v", err)
        }
        log.Printf("Received from server: %s", resp.GetResult())
    }
}

这里需要注意,客户端如果提前关闭连接,服务端在

Send()
时会立即收到错误,这是一种自然的错误处理机制。

Golang gRPC客户端流在实际项目中如何发挥作用?

客户端流(Client Streaming)的应用场景,在我看来,更多是关于聚合客户端数据。客户端不再是发送一个请求就等待一个响应,而是可以分批次、分块地将大量数据发送给服务端,服务端在接收完全部数据后,再进行统一处理并返回一个结果。

一个非常实用的例子就是大文件分块上传。设想你需要上传一个几个GB的文件,如果一次性作为

[]byte
发送,不仅内存压力大,网络波动也容易导致失败。通过客户端流,你可以将文件切分成小块,客户端在一个循环中逐块发送,服务端则负责将这些块重新组装。最后,当所有块都发送完毕,服务端才返回一个上传成功的消息或文件哈希值。这大大提高了上传的鲁棒性和效率。

另一个常见的场景是日志或指标数据的批量上报。客户端可能在本地积累了大量的应用日志或性能指标数据,与其为每条日志都发起一个独立的RPC请求,不如通过客户端流将一段时间内收集到的所有数据一次性批量发送给服务端进行处理(比如存储到数据库或分析系统)。这显著减少了网络开销和连接建立的成本。

// 服务端实现示例 (伪代码)
func (s *myServiceServer) ClientStreamExample(stream pb.MyService_ClientStreamExampleServer) error {
    var receivedData []string
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            log.Println("Client stream finished.")
            // 所有数据接收完毕,进行处理
            finalResult := fmt.Sprintf("Processed %d data chunks: %v", len(receivedData), receivedData)
            return stream.SendAndClose(&pb.Response{Result: finalResult})
        }
        if err != nil {
            log.Printf("Error receiving from client: %v", err)
            return err
        }
        receivedData = append(receivedData, req.GetData())
        log.Printf("Received chunk from client: %s", req.GetData())
    }
}

// 客户端调用示例 (伪代码)
func callClientStream(client pb.MyServiceClient) {
    stream, err := client.ClientStreamExample(context.Background())
    if err != nil {
        log.Fatalf("could not start stream: %v", err)
    }

    for i := 0; i < 3; i++ {
        req := &pb.Request{Data: fmt.Sprintf("Client chunk %d", i)}
        if err := stream.Send(req); err != nil {
            log.Fatalf("Failed to send: %v", err)
        }
        time.Sleep(time.Millisecond * 500) // 模拟数据生成延迟
    }

    resp, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("Error closing stream or receiving response: %v", err)
    }
    log.Printf("Final response from server: %s", resp.GetResult())
}

这里客户端通过

Send()
方法连续发送数据,最后调用
CloseAndRecv()
来关闭流并等待服务端的最终响应。服务端则在循环中接收数据,直到
io.EOF
表示客户端已完成发送,然后进行最终处理并返回。

Golang gRPC双向流(Bidirectional Streaming)的实现与优势是什么?

双向流(Bidirectional Streaming)是gRPC流模式中最灵活,也通常是最复杂的。它允许客户端和服务端在同一个TCP连接上独立地、并发地发送一系列消息。你可以把它想象成一个双向的实时聊天室,双方都可以随时发言,并且对方也能随时听到。

实现上,双向流结合了服务端流和客户端流的特点。在

.proto
文件中,
rpc
方法的参数和返回类型前都带有
stream
关键字。在Golang代码中,客户端和服务端都会获得一个
stream
接口,这个接口同时具备
Send()
Recv()
方法。这意味着,你可以同时进行发送和接收操作。

通常,为了避免死锁或复杂的同步问题,我们会将发送和接收逻辑分别放在不同的goroutine中。例如,客户端启动一个goroutine专门负责从流中接收服务端的消息,而主goroutine则负责向服务端发送消息。服务端也是类似,一个goroutine处理客户端发来的消息,另一个(或主goroutine)则负责向客户端发送消息。

// 双向流服务端实现示例 (伪代码)
func (s *myServiceServer) BidirectionalStreamExample(stream pb.MyService_BidirectionalStreamExampleServer) error {
    waitc := make(chan struct{})
    go func() {
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                // 客户端已关闭发送端
                close(waitc)
                return
            }
            if err != nil {
                log.Printf("Error receiving from client: %v", err)
                return
            }
            log.Printf("Server received: %s", req.GetData())
            // 收到消息后,服务端也可以立即回复
            resp := &pb.Response{Result: "Server processed: " + req.GetData()}
            if err := stream.Send(resp); err != nil {
                log.Printf("Error sending to client: %v", err)
                return
            }
        }
    }()

    // 服务端也可以主动向客户端发送消息
    for i := 0; i < 3; i++ {
        resp := &pb.Response{Result: fmt.Sprintf("Server initiated message %d", i)}
        if err := stream.Send(resp); err != nil {
            log.Printf("Error sending initial message: %v", err)
            break
        }
        time.Sleep(time.Second)
    }

    <-waitc // 等待客户端关闭发送端
    log.Println("Bidirectional stream finished.")
    return nil
}

// 双向流客户端调用示例 (伪代码)
func callBidirectionalStream(client pb.MyServiceClient) {
    stream, err := client.BidirectionalStreamExample(context.Background())
    if err != nil {
        log.Fatalf("could not start stream: %v", err)
    }

    waitc := make(chan struct{})
    go func() {
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                // 服务端已关闭发送端
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Error receiving from server: %v", err)
            }
            log.Printf("Client received: %s", resp.GetResult())
        }
    }()

    for i := 0; i < 5; i++ {
        req := &pb.Request{Data: fmt.Sprintf("Client message %d", i)}
        if err := stream.Send(req); err != nil {
            log.Fatalf("Failed to send: %v", err)
        }
        time.Sleep(time.Millisecond * 500)
    }
    stream.CloseSend() // 客户端关闭发送端
    <-waitc // 等待服务端关闭发送端
    log.Println("Bidirectional stream finished.")
}

优势上,双向流最显著的特点是其实时交互性。它非常适合构建需要低延迟、高并发、双向通信的应用,例如:

  • 实时聊天应用:用户发送消息,同时也能即时接收其他用户的消息。
  • 在线协作工具:多个用户同时编辑文档,他们的操作可以实时同步给其他参与者。
  • 游戏服务器:客户端发送玩家操作,服务器发送游戏状态更新。
  • 命令行工具的交互式会话:客户端发送命令,服务端返回结果,并且可以根据结果继续发送新的命令。

双向流的挑战在于其状态管理和并发控制。因为双方都可以独立发送和接收,所以需要仔细设计消息协议和处理逻辑,确保消息的顺序性(如果需要)和正确的错误处理。同时,由于流的生命周期可能很长,需要考虑心跳机制来检测不活跃的连接,以及优雅的关闭机制来避免资源泄露。这种模式虽然复杂,但它带来的强大实时交互能力是其他模式无法比拟的。

相关专题

更多
golang如何定义变量
golang如何定义变量

golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

174

2024.02.23

golang有哪些数据转换方法
golang有哪些数据转换方法

golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

225

2024.02.23

golang常用库有哪些
golang常用库有哪些

golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

335

2024.02.23

golang和python的区别是什么
golang和python的区别是什么

golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

206

2024.03.05

golang是免费的吗
golang是免费的吗

golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

388

2024.05.21

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

193

2025.06.09

golang相关判断方法
golang相关判断方法

本专题整合了golang相关判断方法,想了解更详细的相关内容,请阅读下面的文章。

188

2025.06.10

golang数组使用方法
golang数组使用方法

本专题整合了golang数组用法,想了解更多的相关内容,请阅读专题下面的文章。

191

2025.06.17

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

7

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.2万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号