zoukankan      html  css  js  c++  java
  • 精通并发与 Netty (二)常用的 rpc 框架

    Google Protobuf 使用方式分析

    对于 RPC 协议来说,最重要的就是对象的发送与接收,这就要用到序列化与反序列化,也称为编码和解码,序列化与反序列化和网络传输一般都在对应的 RPC 框架中完成。

    序列化与反序列化的流程如下:

    JavaBean-> stub(client) <->skeleton(server)->JavaBean,简单点说就是编码和解码。

    相比于 RMI 远程方法调用,很多 RPC 远程过程调用的跨语言的,这就需要序列化于反序列化协议也支持跨语言。Google Procobuf 就是这样一种跨语言的序列化于反序列化协议,效率非常高(怎么做到比其他协议效率高那?比其他协议压缩生成的对象小)。

    Netty 对于 ProtoBuf 提供了很好的支持。

    先看如何单独使用 Google ProtoBuf

    1. 新建 .proto 结构描述文件

      syntax = "proto2";
      
      package com.paul.protobuf;
      
      //加快解析速度
      option optimize_for = SPEED;
      option java_package = "com.paul.protobuf";
      option java_outer_classname = "DataInfo";
      
      message Student{
      	 reuqired string name = 1;
      	 option int32 = 2;
      	 option string address = 3;
      }
      
    2. 使用对应的编译文件生成对应的 Java 类

      Proton —java_out src/main/java src/protobuf/Student.proto

    3. 这时在我们代码的 src/main/java 文件夹下生成了一个新的 pkg com.paul.protobuf,里面生成了 DataInfo 类。对象会有对应的 builder 方法让我们来构建。

    4. 测试序列化方法

      // 构建对象->字节->对象
      public class ProtoBufTest{
        public static void main(String[] args) throws Exception{
          DataInfo.Student student = DataInfo.Student.newBuilder().setName("张三").setAge(20).setAddress("abc").build();
          byte[] student2ByteArray = student.toByteArray();
          DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray);
          System.out.println(studdent2);
        }
        
      }
      

    在来看 Netty 对 Google ProtoBuf 的支持

    还是只给出不一样的部分(服务单和客户端的这部分是一样的):

    @Override
    protected void initChannel(SocketChannel ch) throws Exception{
      ChannelPipeline pipeline = ch.pipeline();
      pipeline.addLast(new ProtobufVarint32FrameDecoder());
      //解码器
      pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
      pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
      //编码器
      pipeline.addLast(new ProtobufEncoder());
      pipeline.addLast(new MyServerHandler());
    }
    

    测试方法就是在客户端组装一个 DataInfo.Student 然后发送给服务端,这里就不演示了。

    大家可能会发现上面的代码存在一个问题,就是上面的程序只能对 DataInfo.Student 进行编解码,如果传递消息的类型有多种怎么办那?

    解决方案一:定义义协议,需要自己实现解码器,通过前两位来标识具体的 JavaBean 类型。

    解决方案二:定义一个最外层的类,通过枚举的方式来确定传递的 JavaBean 类型。

    比如我们有两个 JavaBean

    message MyMessage{
    	enum DataType{
         PersonType = 1;
         DogType = 2;
         CatType = 3;
    	}
      required Datatype data_type = 1;
      //oneof 在同一时刻只有一个字段会被设置,字段之间会共享内存,后面设置会自动清空前面的。
      oneof dataBody{
    		Person person = 2;
        Dog dog = 3;
        Cat cat = 4;
      }
    }
    
    message Person{
    	option string name = 1;
      option int32 age = 2;
      option string address = 3;
    }
    
    message Dog{
    	option string name = 1;
      option int32 age = 2;
    }
    
    message Cat{
    	option string name = 1;
      option int32 city = 2;
    }
    

    Pipeline 的改动(客户端和服务端):

    pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));
    

    我们自己的 handler 的改动:

    @Overrode
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
      MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().
           setDataType(DataType.PersonType.PersonType).
           setPerson(MyDataInfo.Person.newBuilder().
                    setName("张三").setAge(20).
                    setAddress("111").build()).
           build();
           ctx.channel().writeAndFlush(myMessage);
            
    }
    

    服务端 handler 根据 enum 的类型分别进行解析。

    在实际的应用环境中,我们客户端和服务端大概率是两个分开的应用程序,此时我们使用 Google ProtoBuf 时 .proto 文件和对应的 class 文件是不是需要在两边都保存一份,如果有修改会非常麻烦。下面我们介绍一种最佳实践。

    最佳实践是使用 git 作为版本控制系统为前提的:

    不那么好的方案:git submodule,就相当于 maven 的子模块,客户端和服务端都依赖这个模块。

    比较好的方案:git subtree,将公共的代码合并到 server 和 client 端,相当于向 server 和 client 提交代码。

    Apache Thrift 使用方式与文件编写方式分析

    Apache Thrift 和 Google ProtoBuf 整体非常相似,适用于可伸缩的跨语言的服务开发。Thrift 相当于 Netty + Google ProtoBuf,是一个高性能 RPC 框架。Thrift 底层是 socket + RPC 的模式。

    Thrift 是一个典型的 CS 结构,客户端和服务端可以使用不同的语言开发,既然客户端和服务端能使用不同的语言开发,那么一定有一种中间语言来关联服务端和客户端,这就是 IDL(Interface Description Language)。

    Thrift 如何实现多语言之间的通信?

    数据传输使用 socket (多种语言均支持),数据再以特定的格式(String 等)发送,接收方语言进行解析。

    如何使用?

    定义 thrift 的文件,由 thrift 文件(IDL) 生成双方语言的接口,model,在生成的 model 以及接口中会有解码编码的代码。

    Thrift 中的服务

    Thrift 定义服务相当于 Java 中创建 Interface 一样,创建的 service 经过代码生成命令之后就会生成客户端和服务端的框架代码,定义形式如下:

    service HelloWorldService{
      //service 中定义的函数,相当于 java interface 中定义的方法
      string doAction(1:string name, 2:i32 age);
    }
    

    .thrift 文件的定义

    // java 中的包名
    namespace jave thrift.generate
    // 定义别名
    typedef i16 short
    typedef i32 int
    typedef i64 long
    typedef bool boolean
    typedef string String
    
    struct Person{
      1: optional String username,
      2: optional int age,
      3: optional boolean married
    }
    
    exception DataException{
      1: optional String message,
      2: optional String callStack,
      3: optional String date
    }
    
    service PersonService{
      Person getPersonByName(1: required String username) throws (1: DataException dataException),
      void savePerson(1:requried Person person) throws (1:DataException dataException)
    }
    

    编译 thrift 文件

    thrift --gen java src/thrift/data.thrift
    

    生成的文件

    Person.java 里面包含了编解码的方法,PersonService 里面包含了 getPersonByName 和 savePerson 的方法。

    测试方法:

    服务端服务的具体实现方法

    public class PersonServiceImpl implements PersonService.Iface{
      @Override
      public Person getPersonByName(String username) throws DataException,TException{
        Person p = new Person();
        p.setUserName("paul");
        p.setAge(25);
        p.setMarried(true);
        return p;
      }
      
      @Override
      public void savePerson(Person person) throws DataException,TException{
        System.out.println(person.getUserName());
      }
      
    }
    

    Thrift 的服务端:

    public class ThriftServer{
      public static void main(String[] args){
        //非阻塞的 socket server
        TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
        // 高可用的 server
        THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
        PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
        
        arg.protocolFactory(new TCompactPrococol.Factory());
        arg.transportFactory(new TFramedTransport.Facotry());
        arg.processorFactory(new TProcessorFactory(processor));
        
        TServer server = new THsHaServer(arg);
        System.out.println("Thrift Server Started");
        //死循环
        server.serve();
      }
    }
    

    Thrift 的客户端:

    public class ThriftClient{
      publiuc static void main(String[] args){
        TTransport transport = new TFramedTransport(new TSocket
    ("localhost",8899),600);
        TProcotol procotol = new TComapctProcotol(transport);
        PersonService.Client client = new PersonService.Client(procotol);
        
        try{
          //打开 socket
          transport.open();
          //好像调用本地方法一样
          Person person = client.getPersonByName("paul");
          System.out.println(person.getAge());
        }catch(Exception ex){
          throw ex;
        }finally{
          transport.close();
        }
      }
    }
    

    Thrift 的架构:

    Thrift 的传输格式,协议:

    TBinaryProtocol-二进制格式

    TCompactProtocol-压缩格式

    TJSONProtocol-JSON 格式

    TSimpleJSONProtocol-提供 JSON 只写协议,生成的文件很容易通过脚本语言解析。很少使用,缺少元数据信息,接收方不能读取出来。

    TDebugProtocol-使用易懂的可读的文本格式,以便于 debug。

    Thrift 数据传输方式,transport:

    TSocket-阻塞式 socket。

    TFramedTransport-以 frame 为单位进行传输,非阻塞式服务中使用。

    TFileTransport-以文件形式进行传输。

    TMemoryTransport-将内存用于 I/O,Java 实现时内部实际使用了简单的 ByteArrayOutputStream。

    支持的服务模型,server:

    TSimpleServer-简单的单线程服务模型,常用于测试。

    TThreadPoolServer-多线程服务模型,标准的阻塞式 IO。

    TNonboockingServer-多线程服务模型,使用非阻塞式 IO(需要使用 TFramedTransport 数据传输方式)。

    THsHaServer-THsHa 引入了线程池去处理,其模型把读写任务放到线程池处理。Half-sync/Half-async 的处理模式,Half-sync 是在处理 IO 事件上,Half-async 用于 handler 对 rpc 的同步处理。

  • 相关阅读:
    PAT查找题---1032 挖掘机技术哪家强 (20分)
    PAT查找题---1028 人口普查 (20分)
    PAT查找题---1004 成绩排名 (20分)
    01_1JAVA简介
    01考试简介
    shell时间变量拼接问题
    如何将oracle查询的结果传输给变量
    生产环境邮件问题总结
    mutt+msmtp做linux邮件客户端
    linux配置邮件客户端
  • 原文地址:https://www.cnblogs.com/paulwang92115/p/11160842.html
Copyright © 2011-2022 走看看