zoukankan      html  css  js  c++  java
  • gRPC流模式的实现和TLS加密通信[Go和asp.net core 5.0]

    gRPC主要有4种请求和响应模式,分别是简单模式(Simple RPC)服务端流式(Server-side streaming RPC)客户端流式(Client-side streaming RPC)、和双向流式(Bidirectional streaming RPC)

    1.简单模式(Simple RPC):客户端发起请求并等待服务端响应。

    2.服务端流式(Server-side streaming RPC):客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。 场景:.客户端要获取某原油股的实时走势,客户端发送一个请求, 服务端实时返回该股票的走势

    3.客户端流式(Client-side streaming RPC):与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。情景模拟:客户端大量数据上传到服务端

    4.双向流式(Bidirectional streaming RPC):双方使用读写流去发送一个消息序列,两个流独立操作,双方可以同时发送和同时接收。 情景模拟:双方对话(可以一问一答、一问多答、多问一答,形式灵活)

    从上面的定义不难看出,用stream可以定义一个流式消息。下面我们就通过实例来演示一下流式通信的使用方法。

    GO

    1.首先/api/hello.proto [如下], 并且生成新的api/hello.pb.go代码。

    syntax = "proto3";
     
    package api;
    // Any消息类型允许您将消息作为嵌入类型,而不需要它们 .proto定义。Any包含任意序列化的消息(字节),以及一个URL,该URL充当该消息的全局唯一标识符并解析为该消息的类型。要使用Any类型,你需要导入google/protobuf/any.proto.
    import "google/protobuf/any.proto";
     
    message HelloRequest {
      string greeting = 1;
      map<string, string> infos  = 2;
    }
     
    message HelloResponse {
      string reply = 1;
      repeated google.protobuf.Any details = 2;
    }
     
    service HelloService {
      rpc SayHello(HelloRequest) returns (HelloResponse){}
      rpc ListHello(HelloRequest) returns (stream HelloResponse) {}
      rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {}
      rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {}
    }
     
    message Hello {
        string msg = 1;
    }
     
    message Error {
        repeated string msg = 1;
    }

    2.编译指令:

    protoc -ID:Goinclude -I. --go_out=plugins=grpc:. ./api/api.proto

    3.在生成的代码api.hello.go中,我们可以看到客户端接口如下:

    type HelloServiceServer interface {
        SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
        ListHello(*HelloRequest, HelloService_ListHelloServer) error
        SayMoreHello(HelloService_SayMoreHelloServer) error
        SayHelloChat(HelloService_SayHelloChatServer) error
    }

    4.接口的实现 server/service/service.go如下:

    package service
     
    import (
        "context"
        "fmt"
        "io"
        "log"
        "time"
     
        "github.com/golang/protobuf/ptypes"
        "github.com/golang/protobuf/ptypes/any"
     
        api "gogrpcstream/api"
    )
     
    type SayHelloServer struct{}
     
    func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) {
        log.Printf("Client Greeting:%s", in.Greeting)
        log.Printf("Client Info:%v", in.Infos)
     
        var an *any.Any
        if in.Infos["hello"] == "world" {
            an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"})
        } else {
            an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}})
        }
     
        if err != nil {
            return
        }
        return &api.HelloResponse{
            Reply:   "Hello World !!",
            Details: []*any.Any{an},
        }, nil
    }
     
    // 服务器端流式 RPC, 接收一次客户端请求,返回一个流
    func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error {
        log.Printf("Client Say: %v", in.Greeting)
     
        stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"})
        time.Sleep(1 * time.Second)
        stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"})
        time.Sleep(1 * time.Second)
        stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"})
        time.Sleep(1 * time.Second)
        return nil
    }
     
    // 客户端流式 RPC, 客户端流式请求,服务器可返回一次
    func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error {
        // 接受客户端请求
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                break
            }
     
            if err != nil {
                return err
            }
     
            log.Printf("SayMoreHello Client Say: %v", req.Greeting)
        }
     
        // 流读取完成后,返回
        return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"})
    }
     
    //双向
    func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error {
        n := 1
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                break
            }
     
            if err != nil {
                return err
            }
            err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)})
            if err != nil {
                return err
            }
            n++
            log.Printf("SayHelloChat Client Say: %v", req.Greeting)
        }
        return nil
    }

    5. server/main.go 服务端实现:

    package main
     
    import (
        "crypto/tls"
        "crypto/x509"
        "io/ioutil"
        "log"
        "net"
     
        "google.golang.org/grpc/credentials"
     
        "google.golang.org/grpc"
     
        api "gogrpcstream/api"
        sv "gogrpcstream/server/service"
    )
     
    func main() {
        lis, err := net.Listen("tcp", ":8080")
        if err != nil {
            panic(err)
        }
     
        // 加载证书和密钥 (同时能验证证书与私钥是否匹配)
        cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key")
        if err != nil {
            panic(err)
        }
     
        // 将根证书加入证书词
        // 测试证书的根如果不加入可信池,那么测试证书将视为不可惜,无法通过验证。
        certPool := x509.NewCertPool()
        rootBuf, err := ioutil.ReadFile("../certs/ca.pem")
        if err != nil {
            panic(err)
        }
     
        if !certPool.AppendCertsFromPEM(rootBuf) {
            panic("fail to append test ca")
        }
     
        tlsConf := &tls.Config{
            ClientAuth:   tls.RequireAndVerifyClientCert,
            Certificates: []tls.Certificate{cert},
            ClientCAs:    certPool,
        }
     
        serverOpt := grpc.Creds(credentials.NewTLS(tlsConf))
        grpcServer := grpc.NewServer(serverOpt)
     
        api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{})
     
        log.Println("Server Start...")
        grpcServer.Serve(lis)
    }

    6.客服端实现:client/main.go

    package main
     
    import (
        "context"
        "crypto/tls"
        "crypto/x509"
        "fmt"
        "io"
        "io/ioutil"
        "log"
     
        api "gogrpcstream/api"
     
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials"
    )
     
    func main() {
        cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key")
        if err != nil {
            panic(err)
        }
     
        // 将根证书加入证书池
        certPool := x509.NewCertPool()
        bs, err := ioutil.ReadFile("../certs/ca.pem")
        if err != nil {
            panic(err)
        }
     
        if !certPool.AppendCertsFromPEM(bs) {
            panic("cc")
        }
     
        // 新建凭证
        transportCreds := credentials.NewTLS(&tls.Config{
            ServerName:   "localhost",
            Certificates: []tls.Certificate{cert},
            RootCAs:      certPool,
        })
     
        dialOpt := grpc.WithTransportCredentials(transportCreds)
     
        conn, err := grpc.Dial("localhost:8080", dialOpt)
        if err != nil {
            log.Fatalf("Dial failed:%v", err)
        }
        defer conn.Close()
     
        client := api.NewHelloServiceClient(conn)
        resp1, err := client.SayHello(context.Background(), &api.HelloRequest{
            Greeting: "Hello Server 1 !!",
            Infos:    map[string]string{"hello": "world"},
        })
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("SayHello Resp1:%+v", resp1)
     
        resp2, err := client.SayHello(context.Background(), &api.HelloRequest{
            Greeting: "Hello Server 2 !!",
        })
        if err != nil {
            log.Fatalf("%v", err)
        }
        log.Printf("SayHello Resp2:%+v", resp2)
     
        // 服务器端流式 RPC;
        recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"})
        if err != nil {
            log.Fatalf("ListHello err: %v", err)
        }
     
        for {
            //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
            resp, err := recvListHello.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatal(err)
            }
     
            log.Printf("ListHello Server Resp: %v", resp.Reply)
        }
        //可以使用CloseSend()关闭stream,这样服务端就不会继续产生流消息
        //调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着之前结果获取消息
     
        // 客户端流式 RPC;
        sayMoreClient, err := client.SayMoreHello(context.Background())
        if err != nil {
            log.Fatal(err)
        }
        for i := 0; i < 3; i++ {
            sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)})
        }
        //关闭流并获取返回的消息
        sayMoreResp, err := sayMoreClient.CloseAndRecv()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply)
     
        // 双向流式 RPC;
        sayHelloChat, err := client.SayHelloChat(context.Background())
        if err != nil {
            log.Fatal(err)
        }
     
        for i := 0; i < 3; i++ {
            err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)})
            if err != nil {
                log.Fatalf("stream request err: %v", err)
            }
            res, err := sayHelloChat.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("SayHelloChat get stream err: %v", err)
            }
            // 打印返回值
            log.Printf("SayHelloChat Server Say: %v", res.Greeting)
     
        }
     
    }

    8.运行结果如下:

    D:GoProjectsrcgogrpcstreamserver>go run main.go
    2021/01/05 17:19:12 Server Start...
    2021/01/05 17:20:00 Client Greeting:Hello Server 1 !!
    2021/01/05 17:20:00 Client Info:map[hello:world]
    2021/01/05 17:20:00 Client Greeting:Hello Server 2 !!
    2021/01/05 17:20:00 Client Info:map[]
    2021/01/05 17:20:00 Client Say: Hello Server List Hello
    2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0
    2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1
    2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2
    2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0
    2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1
    2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
    D:GoProjectsrcgogrpcstreamclient>go run main.go
    2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!"  details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}}
    2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!"  details:{[type.googleapis.com/api.Error]:{msg:"Bad Request"  msg:"Wrong Info Msg"}}
    2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1
    2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2
    2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3
    2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting
    2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1
    2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2
    2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3

    asp.net core 5.0

    我这里还是用上面proto文件的内容, 那服务端的GreeterService.cs 如下:

    using Google.Protobuf.WellKnownTypes;
    using Greet;
    using Grpc.Core;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    namespace grpcserver
    {
        public class GreeterService : HelloService.HelloServiceBase
        {
            private readonly ILogger<GreeterService> _logger;
            public GreeterService(ILogger<GreeterService> logger)
            {
                _logger = logger;
            }
    
            public override  Task<HelloResponse> SayHello(HelloRequest request, ServerCallContext context)
            {
                var response = new HelloResponse() {  Reply= "Hello World!!"};
                Any any = new Any();
                string val;
                if (request.Infos.TryGetValue("hello",out val) && val==  "world")
                {
                    any = Any.Pack(new Hello { Msg = "Good Request" });
                }
                else {
                    any = Any.Pack(new Hello { Msg = "Bad Request, Wrong Info Msg" });
                }
                response.Details.Add(any);
                return  Task.FromResult(response);
            }
            public override async Task ListHello(HelloRequest request, IServerStreamWriter<HelloResponse> response, ServerCallContext context)
            {
               await response.WriteAsync(new HelloResponse {  Reply=$"ListHello Reply {request.Greeting} 1" });
                Thread.Sleep(1000);
                await response.WriteAsync(new HelloResponse { Reply= $"ListHello Reply {request.Greeting} 2" });
                Thread.Sleep(1000);
                await response.WriteAsync(new HelloResponse { Reply= $"ListHello Reply {request.Greeting} 3" });
            }
            public override async Task<HelloResponse> SayMoreHello(IAsyncStreamReader<HelloRequest> request, ServerCallContext context)
            {
                while (await request.MoveNext()) {
                    Console.WriteLine($"SayMoreHello Client Say: {request.Current.Greeting}");
                }
                return new HelloResponse { Reply="SayMoreHello Recv Muti Greeting" };
            }
           
            public override async Task SayHelloChat(IAsyncStreamReader<HelloRequest> request, IServerStreamWriter<HelloRequest>response, ServerCallContext context)
            {
                var i = 1;
                while (await request.MoveNext()) {
                  await  response.WriteAsync(new HelloRequest { Greeting=$"SayHelloChat Server Say Hello {i}" });
                    Console.WriteLine($"SayHelloChat Client Say:"+request.Current.Greeting);
                }
            }
        }
    }

    服务端的tsl 实现如下:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.Hosting;
    using System.Security.Cryptography.X509Certificates;
    using Microsoft.AspNetCore.Server.Kestrel.Https;
    using System.Security.Authentication;
    using Microsoft.AspNetCore.Server.Kestrel.Core;
    using System.Net;
    namespace grpcserver
    {
        public class Program
        {
            public static void Main(string[] args)
            {
                CreateHostBuilder(args).Build().Run();
            }
    
            // Additional configuration is required to successfully run gRPC on macOS.
            // For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
            public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureWebHostDefaults(webBuilder =>
                    {
                        webBuilder.UseStartup<Startup>();
                        webBuilder.ConfigureKestrel(kerstrel =>
                        {
                            kerstrel.Listen(IPAddress.Any, 5000, o => o.Protocols = HttpProtocols.Http1AndHttp2);
                            kerstrel.Listen(IPAddress.Any, 5001, listenOptions =>
                            {
                                var serverPath = AppDomain.CurrentDomain.BaseDirectory + "cert\server.pfx";
                                var serverCertificate = new X509Certificate2(serverPath, "123456789");
                                var httpsConnectionAdapterOptions = new HttpsConnectionAdapterOptions()
                                {
                                    ClientCertificateMode = ClientCertificateMode.AllowCertificate,
                                    SslProtocols = System.Security.Authentication.SslProtocols.Tls12,
                                    //用chain.Build验证客户端证书
                                    ClientCertificateValidation = (cer, chain, error) =>
                                    {
                                        return chain.Build(cer);
                                    },
                                    ServerCertificate = serverCertificate
                                };
                                listenOptions.UseHttps(httpsConnectionAdapterOptions);
                                listenOptions.Protocols = HttpProtocols.Http1AndHttp2;
                            });
    
    
                        });
    
                    });
        }
    }

    客户端的代码 如下【上面那个any 和map 的处理大家注意一下】:

    using Google.Protobuf.Collections;
    using Greet;
    using Grpc.Core;
    using Grpc.Net.Client;
    using Newtonsoft.Json;
    using System;
    using System.Net.Http;
    using System.Security.Authentication;
    using System.Security.Cryptography.X509Certificates;
    using System.Threading.Tasks;
    
    namespace grpcclient
    {
        class Program
        {
            static  async Task Main(string[] args)
            {
                var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions { HttpHandler = GetHttpHandler() });
                var client = new HelloService.HelloServiceClient(channel);
                ////
                var hellorequest=new HelloRequest { Greeting = "Hello Server 1 !!"};
                hellorequest.Infos.Add("hello", "world");
      
                Console.WriteLine($"SayHello Resp1: {GetHelloResponsString( client.SayHello(hellorequest))}");
                Console.WriteLine($"SayHello Resp2:{GetHelloResponsString(client.SayHello(new HelloRequest { Greeting= "Hello Server 2 !!" }))}");
               ///
                var list = client.ListHello(new HelloRequest { Greeting = "Hello Server List Hello" });
                while (await list.ResponseStream.MoveNext()) {
                    Console.WriteLine("ListHello Server Resp:" + list.ResponseStream.Current.Reply);
                }
                ///
                using (var clientcall = client.SayMoreHello()) {
                    for (int i = 0; i < 3; i++)
                    {
                       await clientcall.RequestStream.WriteAsync(new HelloRequest { Greeting = $"sayMoreHello Hello Server {i + 1}" });
                    }
                    await clientcall.RequestStream.CompleteAsync();
                    var response = await clientcall.ResponseAsync;
                    Console.WriteLine($"SayMoreHello Server Resp {response.Reply}");
                }
    
                /////
                ///
                using (var clientcall2 = client.SayHelloChat()) {
                    var response2 = Task.Run(async () =>
                    {
                        while (await clientcall2.ResponseStream.MoveNext())
                        {
                            Console.WriteLine($"SayHelloChat Server Say {clientcall2.ResponseStream.Current.Greeting}");
                        }
                    });
                    for (int i = 0; i < 3; i++)
                    {
                      await  clientcall2.RequestStream.WriteAsync(new HelloRequest { Greeting = $"SayHelloChat Hello Server {i + 1}" });
                        await Task.Delay(1000);
                    }
                  
                    await clientcall2.RequestStream.CompleteAsync();
                }
    
                Console.ReadKey();
               
            }
            static HttpClientHandler GetHttpHandler()
            {
                var handler = new HttpClientHandler()
                {
                    SslProtocols = SslProtocols.Tls12,
                    ClientCertificateOptions = ClientCertificateOption.Manual,
                    ServerCertificateCustomValidationCallback = (message, cer, chain, errors) =>
                    {
                        return chain.Build(cer);
                    }
                };
                var path = AppDomain.CurrentDomain.BaseDirectory + "cert\client.pfx";
                var crt = new X509Certificate2(path, "123456789");
                handler.ClientCertificates.Add(crt);
                return handler;
            }
            static string GetHelloResponsString(HelloResponse response) {
                string msg = "Reply:" + response.Reply + "Details:" + response.Details[0].Value.ToString(System.Text.Encoding.UTF8);
                return msg;
            }
        }
    }

    运行结果:

    证书 可以利用MySSL测试证书生成工具生成两张证书 也可以用openssl来实现。

    下载地址 https://github.com/dz45693/gogrpcstrem.git 和 https://github.com/dz45693/aspdotnetcoregrpcstream.git

    参考:

    参考:

    https://razeencheng.com/post/how-to-use-grpc-in-golang-03

  • 相关阅读:
    JAVA并发-CountDownLatch
    【转载】Makedown数学公式语法
    算法的时间复杂度
    JVM-卡表(Card Table)
    sync.WaitGroup的使用以及坑
    go 多协程爬取图片
    go ioutial 读取写入文件
    go 下载图片
    go 正则 爬取邮箱代码
    go 解析path
  • 原文地址:https://www.cnblogs.com/majiang/p/14237097.html
Copyright © 2011-2022 走看看