gRPC - Streaming

gRPC 的 Streaming 可用來做大量資料的傳輸,不論是 Client 傳到 Service,或是 Service 回給 Client。


使用上就是在 proto 檔用 stream 去定義要使用 Streaming 的地方,看是用在傳入還是回傳。

1
2
3
4
5
6
...
service Greeter {
...
rpc SayHellos (stream HelloRequest) returns (stream HelloReply) {}
}
...


然後 Service 與 Client 透過 IServerStreamWriter 寫資料、透過 IAsyncStreamReader 讀取資即可。


以寫入來說,可透過迴圈搭配 WriteAsync() 將資料逐筆寫入,最後透過 CompleteAsync() 告知寫入完成。

1
2
3
4
5
foreach (var item in data)                 
{
await stream.WriteAsync(item);
}
await stream.CompleteAsync();


或是引用 Grpc.Core.Utils,直接透過 WriteAllAsync() 將所有資料寫入。

1
2
3
...
await stream.WriteAllAsync(data);
...


以讀取來說,可用迴圈搭配 MoveNext() 移至下筆可用的資料,再用 Current 取出資料做處理。

1
2
3
4
5
6
7
8
using Grpc.Core;
...
while (await stream.MoveNext())
{
var data = stream.Current;
...
}
...


或是引用 Grpc.Core.Utils,然後透過 ForEachAsync 直接用 Lambda 指定每筆資料所要做的處理。

1
2
3
4
5
using Grpc.Core;                            
using Grpc.Core.Utils;
...
await stream.ForEachAsync(item => ...);
...


抑或是直接透過 ToListAsync() 取得全部資料。

1
2
3
4
5
using Grpc.Core;                           
using Grpc.Core.Utils;
...
var data = await stream.ToListAsync<...>();
...


所以假設我們有個 proto 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";

package Greet;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings.
message HelloReply {
string message = 1;
}


Service 這邊我們就可以像這樣實作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
using System.Threading.Tasks;
using Greet;
using Grpc.Core;

namespace GrpcService_CSharp1
{
public class GreeterService : Greeter.GreeterBase
{
public override async Task SayHellos(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext())
{
var request = requestStream.Current;
await responseStream.WriteAsync(new HelloReply
{
Message = "Hello " + request.Name
};
}
}

public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
return Task.FromResult(new HelloReply
{
Message = "Hello " + request.Name
});
}
}
}


Client 這邊會像這樣:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
using System;
using System.Threading.Tasks;
using Greet;
using Grpc.Core;
using Grpc.Core.Utils;

namespace GrpcService_CSharp1
{
public class Program
{
static async Task Main(string[] args)
{
// Include port of the gRPC server as an application argument
var port = args.Length > 0 ? args[0] : "50051";

var channel = new Channel("localhost:" + port, ChannelCredentials.Insecure);
var client = new Greeter.GreeterClient(channel);

var reply = await client.SayHelloAsync(new HelloRequest {Name = "GreeterClient"});
Console.WriteLine("Greeting: " + reply.Message);

var requests = new[]
{
new HelloRequest {Name = "Larry1"},
new HelloRequest {Name = "Larry2"}
};

using (var call = client.SayHellos())
{
await call.RequestStream.WriteAllAsync(requests);

await call.ResponseStream.ForEachAsync(item =>
Task.Run(() => Console.WriteLine("Greeting: " + item.Message)));
}

await channel.ShutdownAsync();

Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}
}
}


Client 與 Service 就可以透過 Streaming 進行通訊。