gRPC主要有4种请求和响应模式,分别是简单模式(Simple RPC)
、服务端流式(Server-side streaming RPC)
、客户端流式(Client-side streaming RPC)
、和双向流式(Bidirectional streaming RPC)
。
1.简单模式(Simple RPC)
:客户端发起请求并等待服务端响应。
2.服务端流式(Server-side streaming RPC)
:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。 场景:.客户端要获取某原油股的实时走势,客户端发送一个请求, 服务端实时返回该股票的走势
3.客户端流式(Client-side streaming RPC)
:与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。情景模拟:客户端大量数据上传到服务端
4.双向流式(Bidirectional streaming RPC)
:双方使用读写流去发送一个消息序列,两个流独立操作,双方可以同时发送和同时接收。 情景模拟:双方对话(可以一问一答、一问多答、多问一答,形式灵活)
从上面的定义不难看出,用stream
可以定义一个流式消息。下面我们就通过实例来演示一下流式通信的使用方法。
GO
1.首先/api/hello.proto
[如下], 并且生成新的api/hello.pb.go
代码。
syntax = "proto3"; package api; // Any消息类型允许您将消息作为嵌入类型,而不需要它们 .proto定义。Any包含任意序列化的消息(字节),以及一个URL,该URL充当该消息的全局唯一标识符并解析为该消息的类型。要使用Any类型,你需要导入google/protobuf/any.proto. import "google/protobuf/any.proto"; message HelloRequest { string greeting = 1; map<string, string> infos = 2; } message HelloResponse { string reply = 1; repeated google.protobuf.Any details = 2; } service HelloService { rpc SayHello(HelloRequest) returns (HelloResponse){} rpc ListHello(HelloRequest) returns (stream HelloResponse) {} rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {} rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {} } message Hello { string msg = 1; } message Error { repeated string msg = 1; }
2.编译指令:
protoc -ID:Goinclude -I. --go_out=plugins=grpc:. ./api/api.proto
3.在生成的代码api.hello.go中,我们可以看到客户端接口如下:
type HelloServiceServer interface { SayHello(context.Context, *HelloRequest) (*HelloResponse, error) ListHello(*HelloRequest, HelloService_ListHelloServer) error SayMoreHello(HelloService_SayMoreHelloServer) error SayHelloChat(HelloService_SayHelloChatServer) error }
4.接口的实现 server/service/service.go如下:
package service import ( "context" "fmt" "io" "log" "time" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" api "gogrpcstream/api" ) type SayHelloServer struct{} func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) { log.Printf("Client Greeting:%s", in.Greeting) log.Printf("Client Info:%v", in.Infos) var an *any.Any if in.Infos["hello"] == "world" { an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"}) } else { an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}}) } if err != nil { return } return &api.HelloResponse{ Reply: "Hello World !!", Details: []*any.Any{an}, }, nil } // 服务器端流式 RPC, 接收一次客户端请求,返回一个流 func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error { log.Printf("Client Say: %v", in.Greeting) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"}) time.Sleep(1 * time.Second) return nil } // 客户端流式 RPC, 客户端流式请求,服务器可返回一次 func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error { // 接受客户端请求 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("SayMoreHello Client Say: %v", req.Greeting) } // 流读取完成后,返回 return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"}) } //双向 func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error { n := 1 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)}) if err != nil { return err } n++ log.Printf("SayHelloChat Client Say: %v", req.Greeting) } return nil }
5. server/main.go 服务端实现:
package main import ( "crypto/tls" "crypto/x509" "io/ioutil" "log" "net" "google.golang.org/grpc/credentials" "google.golang.org/grpc" api "gogrpcstream/api" sv "gogrpcstream/server/service" ) func main() { lis, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } // 加载证书和密钥 (同时能验证证书与私钥是否匹配) cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key") if err != nil { panic(err) } // 将根证书加入证书词 // 测试证书的根如果不加入可信池,那么测试证书将视为不可惜,无法通过验证。 certPool := x509.NewCertPool() rootBuf, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(rootBuf) { panic("fail to append test ca") } tlsConf := &tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, Certificates: []tls.Certificate{cert}, ClientCAs: certPool, } serverOpt := grpc.Creds(credentials.NewTLS(tlsConf)) grpcServer := grpc.NewServer(serverOpt) api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{}) log.Println("Server Start...") grpcServer.Serve(lis) }
6.客服端实现:client/main.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "io" "io/ioutil" "log" api "gogrpcstream/api" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) func main() { cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key") if err != nil { panic(err) } // 将根证书加入证书池 certPool := x509.NewCertPool() bs, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(bs) { panic("cc") } // 新建凭证 transportCreds := credentials.NewTLS(&tls.Config{ ServerName: "localhost", Certificates: []tls.Certificate{cert}, RootCAs: certPool, }) dialOpt := grpc.WithTransportCredentials(transportCreds) conn, err := grpc.Dial("localhost:8080", dialOpt) if err != nil { log.Fatalf("Dial failed:%v", err) } defer conn.Close() client := api.NewHelloServiceClient(conn) resp1, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 1 !!", Infos: map[string]string{"hello": "world"}, }) if err != nil { log.Fatal(err) } log.Printf("SayHello Resp1:%+v", resp1) resp2, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 2 !!", }) if err != nil { log.Fatalf("%v", err) } log.Printf("SayHello Resp2:%+v", resp2) // 服务器端流式 RPC; recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"}) if err != nil { log.Fatalf("ListHello err: %v", err) } for { //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M) resp, err := recvListHello.Recv() if err == io.EOF { break } if err != nil { log.Fatal(err) } log.Printf("ListHello Server Resp: %v", resp.Reply) } //可以使用CloseSend()关闭stream,这样服务端就不会继续产生流消息 //调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着之前结果获取消息 // 客户端流式 RPC; sayMoreClient, err := client.SayMoreHello(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)}) } //关闭流并获取返回的消息 sayMoreResp, err := sayMoreClient.CloseAndRecv() if err != nil { log.Fatal(err) } log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply) // 双向流式 RPC; sayHelloChat, err := client.SayHelloChat(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)}) if err != nil { log.Fatalf("stream request err: %v", err) } res, err := sayHelloChat.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("SayHelloChat get stream err: %v", err) } // 打印返回值 log.Printf("SayHelloChat Server Say: %v", res.Greeting) } }
8.运行结果如下:
D:GoProjectsrcgogrpcstreamserver>go run main.go 2021/01/05 17:19:12 Server Start... 2021/01/05 17:20:00 Client Greeting:Hello Server 1 !! 2021/01/05 17:20:00 Client Info:map[hello:world] 2021/01/05 17:20:00 Client Greeting:Hello Server 2 !! 2021/01/05 17:20:00 Client Info:map[] 2021/01/05 17:20:00 Client Say: Hello Server List Hello 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
D:GoProjectsrcgogrpcstreamclient>go run main.go 2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!" details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}} 2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!" details:{[type.googleapis.com/api.Error]:{msg:"Bad Request" msg:"Wrong Info Msg"}} 2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1 2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2 2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3 2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3
asp.net core 5.0
我这里还是用上面proto文件的内容, 那服务端的GreeterService.cs 如下:
using Google.Protobuf.WellKnownTypes; using Greet; using Grpc.Core; using Microsoft.Extensions.Logging; using System; using System.Threading; using System.Threading.Tasks; namespace grpcserver { public class GreeterService : HelloService.HelloServiceBase { private readonly ILogger<GreeterService> _logger; public GreeterService(ILogger<GreeterService> logger) { _logger = logger; } public override Task<HelloResponse> SayHello(HelloRequest request, ServerCallContext context) { var response = new HelloResponse() { Reply= "Hello World!!"}; Any any = new Any(); string val; if (request.Infos.TryGetValue("hello",out val) && val== "world") { any = Any.Pack(new Hello { Msg = "Good Request" }); } else { any = Any.Pack(new Hello { Msg = "Bad Request, Wrong Info Msg" }); } response.Details.Add(any); return Task.FromResult(response); } public override async Task ListHello(HelloRequest request, IServerStreamWriter<HelloResponse> response, ServerCallContext context) { await response.WriteAsync(new HelloResponse { Reply=$"ListHello Reply {request.Greeting} 1" }); Thread.Sleep(1000); await response.WriteAsync(new HelloResponse { Reply= $"ListHello Reply {request.Greeting} 2" }); Thread.Sleep(1000); await response.WriteAsync(new HelloResponse { Reply= $"ListHello Reply {request.Greeting} 3" }); } public override async Task<HelloResponse> SayMoreHello(IAsyncStreamReader<HelloRequest> request, ServerCallContext context) { while (await request.MoveNext()) { Console.WriteLine($"SayMoreHello Client Say: {request.Current.Greeting}"); } return new HelloResponse { Reply="SayMoreHello Recv Muti Greeting" }; } public override async Task SayHelloChat(IAsyncStreamReader<HelloRequest> request, IServerStreamWriter<HelloRequest>response, ServerCallContext context) { var i = 1; while (await request.MoveNext()) { await response.WriteAsync(new HelloRequest { Greeting=$"SayHelloChat Server Say Hello {i}" }); Console.WriteLine($"SayHelloChat Client Say:"+request.Current.Greeting); } } } }
服务端的tsl 实现如下:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; using System.Security.Cryptography.X509Certificates; using Microsoft.AspNetCore.Server.Kestrel.Https; using System.Security.Authentication; using Microsoft.AspNetCore.Server.Kestrel.Core; using System.Net; namespace grpcserver { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } // Additional configuration is required to successfully run gRPC on macOS. // For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682 public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); webBuilder.ConfigureKestrel(kerstrel => { kerstrel.Listen(IPAddress.Any, 5000, o => o.Protocols = HttpProtocols.Http1AndHttp2); kerstrel.Listen(IPAddress.Any, 5001, listenOptions => { var serverPath = AppDomain.CurrentDomain.BaseDirectory + "cert\server.pfx"; var serverCertificate = new X509Certificate2(serverPath, "123456789"); var httpsConnectionAdapterOptions = new HttpsConnectionAdapterOptions() { ClientCertificateMode = ClientCertificateMode.AllowCertificate, SslProtocols = System.Security.Authentication.SslProtocols.Tls12, //用chain.Build验证客户端证书 ClientCertificateValidation = (cer, chain, error) => { return chain.Build(cer); }, ServerCertificate = serverCertificate }; listenOptions.UseHttps(httpsConnectionAdapterOptions); listenOptions.Protocols = HttpProtocols.Http1AndHttp2; }); }); }); } }
客户端的代码 如下【上面那个any 和map 的处理大家注意一下】:
using Google.Protobuf.Collections; using Greet; using Grpc.Core; using Grpc.Net.Client; using Newtonsoft.Json; using System; using System.Net.Http; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; namespace grpcclient { class Program { static async Task Main(string[] args) { var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions { HttpHandler = GetHttpHandler() }); var client = new HelloService.HelloServiceClient(channel); //// var hellorequest=new HelloRequest { Greeting = "Hello Server 1 !!"}; hellorequest.Infos.Add("hello", "world"); Console.WriteLine($"SayHello Resp1: {GetHelloResponsString( client.SayHello(hellorequest))}"); Console.WriteLine($"SayHello Resp2:{GetHelloResponsString(client.SayHello(new HelloRequest { Greeting= "Hello Server 2 !!" }))}"); /// var list = client.ListHello(new HelloRequest { Greeting = "Hello Server List Hello" }); while (await list.ResponseStream.MoveNext()) { Console.WriteLine("ListHello Server Resp:" + list.ResponseStream.Current.Reply); } /// using (var clientcall = client.SayMoreHello()) { for (int i = 0; i < 3; i++) { await clientcall.RequestStream.WriteAsync(new HelloRequest { Greeting = $"sayMoreHello Hello Server {i + 1}" }); } await clientcall.RequestStream.CompleteAsync(); var response = await clientcall.ResponseAsync; Console.WriteLine($"SayMoreHello Server Resp {response.Reply}"); } ///// /// using (var clientcall2 = client.SayHelloChat()) { var response2 = Task.Run(async () => { while (await clientcall2.ResponseStream.MoveNext()) { Console.WriteLine($"SayHelloChat Server Say {clientcall2.ResponseStream.Current.Greeting}"); } }); for (int i = 0; i < 3; i++) { await clientcall2.RequestStream.WriteAsync(new HelloRequest { Greeting = $"SayHelloChat Hello Server {i + 1}" }); await Task.Delay(1000); } await clientcall2.RequestStream.CompleteAsync(); } Console.ReadKey(); } static HttpClientHandler GetHttpHandler() { var handler = new HttpClientHandler() { SslProtocols = SslProtocols.Tls12, ClientCertificateOptions = ClientCertificateOption.Manual, ServerCertificateCustomValidationCallback = (message, cer, chain, errors) => { return chain.Build(cer); } }; var path = AppDomain.CurrentDomain.BaseDirectory + "cert\client.pfx"; var crt = new X509Certificate2(path, "123456789"); handler.ClientCertificates.Add(crt); return handler; } static string GetHelloResponsString(HelloResponse response) { string msg = "Reply:" + response.Reply + "Details:" + response.Details[0].Value.ToString(System.Text.Encoding.UTF8); return msg; } } }
运行结果:
证书 可以利用MySSL测试证书生成工具生成两张证书 也可以用openssl来实现。
下载地址 https://github.com/dz45693/gogrpcstrem.git 和 https://github.com/dz45693/aspdotnetcoregrpcstream.git
参考:
参考: