zoukankan      html  css  js  c++  java
  • grpc(五)

    5 grpc支持的功能

    1 拦截器

    客户端拦截器

    客户端普通的拦截器

    func aInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        # before do something
    	err := invoker(ctx, method, req, reply, cc, opts...)
    	# after do something 
    }
    

    客户端流拦截器

    
    func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        # before do something
        s, err := streamer(ctx, desc, cc, method, opts...)
        # after do something 
    }
    

    调用时使用

    // Setting up a connection to the server.
    conn, err := grpc.Dial(address, grpc.WithInsecure(),
    	grpc.WithUnaryInterceptor(aClientInterceptor),
    	grpc.WithStreamInterceptor(bStreamInterceptor))
    if err != nil {
    	log.Fatalf("did not connect: %v", err)
    }
    

    服务端拦截器

    func aUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    	log.Println("======= [Server Interceptor] ", info.FullMethod)
    	log.Printf(" Pre Proc Message : %s", req)
    	// Invoking the handler to complete the normal execution of a unary RPC.
    	m, err := handler(ctx, req)
    
    	// Post processing logic
    	log.Printf(" Post Proc Message : %s", m)
    	return m, err
    }
    
    

    服务端流拦截器

    
    func bServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    	// Pre-processing
    	log.Println("====== [Server Stream Interceptor] ", info.FullMethod)
    
    	// Invoking the StreamHandler to complete the execution of RPC invocation
    	err := handler(srv, newWrappedStream(ss))
    	if err != nil {
    		log.Printf("RPC failed with error %v", err)
    	}
    	return err
    }
    

    服务端使用拦截器

    lis, err := net.Listen("tcp", port)
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    	s := grpc.NewServer(
    		grpc.UnaryInterceptor(aUnaryServerInterceptor),
    		grpc.StreamInterceptor(bServerStreamInterceptor))
    	pb.RegisterXXServer(s, &server{})
    	// Register reflection service on gRPC server.
    	reflection.Register(s)
    	if err := s.Serve(lis); err != nil {
    		log.Fatalf("failed to serve: %v", err)
    	}
    

    2 截止时间

    客户端截止时间

    clientDeadline := time.Now().Add(time.Duration(2 * time.Second))
    ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)
    defer cancel()
    

    3 取消

    主动调用cancel

    4 错误处理

    常用的状态码

    OK 成功
    
    CANCELLED 取消
    
    DEEATLINE_EXCEEDED 超时
    
    INVALID_ARGUMENT 客户端非法参数
    
    
    

    5 多路复用

    一个server上注册多个服务

    lis, err := net.Listen("tcp", port)
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    	grpcServer := grpc.NewServer()
    
    	a_pb.RegisterOrderManagementServer(grpcServer, &aServer{})
    
    	b_pb.RegisterGreeterServer(grpcServer, &bServer{})
    
    	reflection.Register(grpcServer)
    	if err := grpcServer.Serve(lis); err != nil {
    		log.Fatalf("failed to serve: %v", err)
    	}
    

    6 元数据

    服务端读元数据

    	md, metadataAvailable := metadata.FromIncomingContext(ctx)
    	if !metadataAvailable {
    		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
    	}
    

    服务端写元数据

    	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    	stream.SendHeader(header)
    

    客户端添加元数据

    
    	md := metadata.Pairs(
    		"timestamp", time.Now().Format(time.StampNano),
    		"kn", "vn",
    	)
    	mdCtx := metadata.NewOutgoingContext(context.Background(), md)
    
    	ctxA := metadata.AppendToOutgoingContext(mdCtx, "k1", "v1", "k1", "v2", "k2", "v3")
    
    
    	// RPC using the context with new metadata.
    	var header, trailer metadata.MD
    
    
    	// RPC: Add Order
    	res, _ := client.XXX(ctxA, &xx{}, grpc.Header(&header), grpc.Trailer(&trailer))
    

    7 负载均衡

    增加负载均衡

    roundrobinConn, err := grpc.Dial(
    		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName), // // "example:///lb.example.grpc.io"
    		grpc.WithBalancerName("round_robin"), // This sets the initial balancing policy.
    		grpc.WithInsecure(),
    	)
    

    压缩,增加特性

     grpc.UseCompressor(gzip.Name)
    
  • 相关阅读:
    [Swift]LeetCode380. 常数时间插入、删除和获取随机元素 | Insert Delete GetRandom O(1)
    [Swift]LeetCode378. 有序矩阵中第K小的元素 | Kth Smallest Element in a Sorted Matrix
    说说心声------ 一些经历
    安装eclipse maven插件m2eclipse No repository found containing
    苹果浏览器实战(三)
    CSDN挑战编程——《绝对值最小》
    高可用技术工具包 High Availability Toolkit
    jstl 标签 循环 序号
    坚向的ViewPager,上下滑动的组件,android上下滑动 VerticalPager
    Php socket数据编码
  • 原文地址:https://www.cnblogs.com/beckbi/p/14852210.html
Copyright © 2011-2022 走看看