
grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。
在本文中,我们将探讨如何正确使用 grpc 流。
让我们检查一下使用 grpc 流的良好实践:
一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 grpc 服务定义:
service myservice {
rpc getsomething (somethingrequest) returns (stream somethingresponse) {}
}
如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:
service myservice {
rpc getsomething (somethingrequest) returns (somethingresponse) {}
}
通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。
比较一元请求和流请求的 golang 代码示例:
一元请求:
type somethingunary struct {
pb.unimplementedsomethingunaryserver
}
func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) {
return &pb.somethingresponse{
message: "hello " + req.name,
}, nil
}
func testsomethingunary(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registersomethingunaryserver(s, &somethingunary{})
})
client := pb.newsomethingunaryclient(conn)
response, err := client.getsomething(
context.background(),
&pb.somethingrequest{
name: "test",
},
)
if err != nil {
t.fatalf("failed to get something: %v", err)
}
if response.message != "hello test" {
t.errorf("unexpected response: %v", response.message)
}
}
流式一元请求:
type somethingstream struct {
pb.unimplementedsomethingstreamserver
}
func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error {
if err := stream.send(&pb.somethingresponse{
message: "hello " + req.name,
}); err != nil {
return err
}
return nil
}
func testsomethingstream(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registersomethingstreamserver(s, &somethingstream{})
})
client := pb.newsomethingstreamclient(conn)
stream, err := client.getsomething(
context.background(),
&pb.somethingrequest{
name: "test",
},
)
if err != nil {
t.fatalf("failed to get something stream: %v", err)
}
response, err := stream.recv()
if err != nil {
t.fatalf("failed to receive response: %v", err)
}
if response.message != "hello test" {
t.errorf("unexpected response: %v", response.message)
}
}
我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。
让我们比较一下这两个服务定义:
service bookstore {
rpc listbooks(listbooksrequest) returns (stream book) {}
}
service bookstorebatch {
rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {}
}
message listbooksresponse {
repeated book books = 1;
}
bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。
如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。
让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:
书店:
type bookstore struct {
pb.unimplementedbookstoreserver
}
func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error {
for _, b := range bookstoredata {
if b.author == req.author {
if err := stream.send(&pb.book{
title: b.title,
author: b.author,
publicationyear: int32(b.publicationyear),
genre: b.genre,
}); err != nil {
return err
}
}
}
return nil
}
func testbookstore_listbooks(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registerbookstoreserver(s, &bookstore{})
})
client := pb.newbookstoreclient(conn)
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
t.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
book, err := stream.recv()
if err != nil {
break
}
books = append(books, book)
}
if len(books) != charlesdickensbooks {
t.errorf("unexpected number of books: %d", len(books))
}
}
书店批次:
type bookstorebatch struct {
pb.unimplementedbookstorebatchserver
}
func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error {
const batchsize = 10
books := make([]*pb.book, 0, batchsize)
for _, b := range bookstoredata {
if b.author == req.author {
books = append(books, &pb.book{
title: b.title,
author: b.author,
publicationyear: int32(b.publicationyear),
genre: b.genre,
})
if len(books) == batchsize {
if err := stream.send(&pb.listbooksresponse{
books: books,
}); err != nil {
return err
}
books = books[:0]
}
}
}
if len(books) > 0 {
if err := stream.send(&pb.listbooksresponse{
books: books,
}); err != nil {
return nil
}
}
return nil
}
func testbookstorebatch_listbooks(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registerbookstorebatchserver(s, &bookstorebatch{})
})
client := pb.newbookstorebatchclient(conn)
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
t.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
response, err := stream.recv()
if err != nil {
break
}
books = append(books, response.books...)
}
if len(books) != charlesdickensbooks {
t.errorf("unexpected number of books: %d", len(books))
}
}
从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:
书店基准:
func benchmarkbookstore_listbooks(b *testing.b) {
conn := newserver(b, func(s grpc.serviceregistrar) {
pb.registerbookstoreserver(s, &bookstore{})
})
client := pb.newbookstoreclient(conn)
var benchinnerbooks []*pb.book
b.resettimer()
for i := 0; i < b.n; i++ {
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
b.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
book, err := stream.recv()
if err != nil {
break
}
books = append(books, book)
}
benchinnerbooks = books
}
benchbooks = benchinnerbooks
}
bookstorebatch 基准:
func benchmarkbookstorebatch_listbooks(b *testing.b) {
conn := newserver(b, func(s grpc.serviceregistrar) {
pb.registerbookstorebatchserver(s, &bookstorebatch{})
})
client := pb.newbookstorebatchclient(conn)
var benchinnerbooks []*pb.book
b.resettimer()
for i := 0; i < b.n; i++ {
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
b.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
response, err := stream.recv()
if err != nil {
break
}
books = append(books, response.books...)
}
benchinnerbooks = books
}
benchbooks = benchinnerbooks
}
基准测试结果:
benchmarkbookstore_listbooks benchmarkbookstore_listbooks-12 732 1647454 ns/op 85974 b/op 1989 allocs/op benchmarkbookstorebatch_listbooks benchmarkbookstorebatch_listbooks-12 1202 937491 ns/op 61098 b/op 853 allocs/op
多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。
但是为什么 bookstorebatch 比 bookstore 快?
服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。
在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。
书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。
双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。
如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。
让我们看一下下面的protobuf定义:
service sensor {
rpc watch(stream watchrequest) returns (stream watchresponse) {}
}
message watchrequest {
oneof request {
watchcreaterequest create_request = 1;
watchcancelrequest cancel_request = 2;
watchnowrequest now_request = 3;
}
}
message watchcreaterequest {
// sensor_id contains the sensor id to watch.
string sensor_id = 1;
}
message watchcancelrequest {
// sensor_id contains the sensor id to cancel.
string sensor_id = 1;
}
message watchnowrequest {
// sensor_id contains the sensor id to get the current value.
string sensor_id = 1;
}
message watchresponse {
// sensor_id contains the sensor id for the current response.
string sensor_id = 1;
// created is true if the watch was created successfully.
bool created = 2;
// canceleted is true if the watch was canceled successfully or if the creation failed.
bool canceleted = 3;
// error contains the error message if something went wrong.
string error = 4;
// timestamp contains the timestamp of the value.
google.protobuf.timestamp timestamp = 5;
// value contains the value of the sensor.
int32 value = 6;
}
请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。
传感器的 golang 代码将被忽略,但您可以在这里找到它
serverstream 包装流和传感器数据,使其更易于使用。
type serverstream struct {
s *sensorservice // service
stream pb.sensor_watchserver // stream
sendch chan *pb.watchresponse // control channel
sensorch chan sensordata // data channel
sensorwatch map[string]int // map of sensor id to watch id
}
如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。
接收消息:
func (ss *serverstream) recvloop() error {
defer ss.close()
for {
req, err := ss.stream.recv()
if errors.is(err, io.eof) {
return nil
}
if err != nil {
return err
}
switch req := req.request.(type) {
case *pb.watchrequest_createrequest:
// ignore validation (check the full code)
// create a channel to send data to the client
id := sensor.watch(ss.sensorch)
ss.sensorwatch[sensorid] = id
// send created message
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
created: true,
}
case *pb.watchrequest_cancelrequest:
// ignore validation (check the full code)
// cancel the watch
ss.s.sensors[sensorid].cancel(id)
delete(ss.sensorwatch, sensorid)
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
canceleted: true,
}
case *pb.watchrequest_nowrequest:
// ignore validation (check the full code)
// send current value
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
timestamp: timestamppb.now(),
value: int32(sensor.read()),
}
}
}
}
switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留recvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。
发送消息:
func (ss *serverstream) sendloop() {
for {
select {
case m, ok := <-ss.sendch:
if !ok {
return
}
// send message
if err := ss.stream.send(m); err != nil {
return
}
case data, ok := <-ss.sensorch:
if !ok {
return
}
// send data
if err := ss.stream.send(&pb.watchresponse{
sensorid: data.id,
timestamp: timestamppb.new(data.time),
value: int32(data.val),
}); err != nil {
return
}
case <-ss.stream.context().done():
return
}
}
}
sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。
最后,传感器服务的快乐路径测试:
func TestSensor(t *testing.T) {
conn := newServer(t, func(s grpc.ServiceRegistrar) {
pb.RegisterSensorServer(s, &sensorService{
sensors: newSensors(),
})
})
client := pb.NewSensorClient(conn)
stream, err := client.Watch(context.Background())
if err != nil {
t.Fatalf("failed to watch: %v", err)
}
response := make(chan *pb.WatchResponse)
// Go routine to read from the stream
go func() {
defer close(response)
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
return
}
response <- resp
}
}()
createRequest(t, stream, "temp")
waitUntilCreated(t, response, "temp")
waitForSensorData(t, response, "temp")
createRequest(t, stream, "pres")
waitUntilCreated(t, response, "pres")
waitForSensorData(t, response, "pres")
waitForSensorData(t, response, "temp")
waitForSensorData(t, response, "pres")
// invalid sensor
createRequest(t, stream, "invalid")
waitUntilCanceled(t, response, "invalid")
nowRequest(t, stream, "light")
waitForSensorData(t, response, "light")
// Wait for 2 seconds to make sure we don't receive any data for light
waitForNoSensorData(t, response, "light", 2*time.Second)
cancelRequest(t, stream, "temp")
waitUntilCanceled(t, response, "temp")
waitForSensorData(t, response, "pres")
// Wait for 2 seconds to make sure we don't receive any data for temp
waitForNoSensorData(t, response, "temp", 2*time.Second)
err = stream.CloseSend()
if err != nil {
t.Fatalf("failed to close send: %v", err)
}
}
从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。
grpc 流是一种用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。
如果您有任何问题或反馈,请随时在 linkedin 上与我联系。
以上就是gRPC 流:最佳实践和性能见解的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号