zoukankan      html  css  js  c++  java
  • Thrift笔记(六)--单端口 多服务

    多个服务,使用监听一个端口。先上一个demo

    Test.thrift

    namespace java com.gxf.thrift
    
    enum RequestType {
       SAY_HELLO,   //问好
       QUERY_TIME,  //询问时间
    }
    
    struct Request {
       1: required RequestType type;  // 请求的类型,必选
       2: required string name;       // 发起请求的人的名字,必选
       3: optional i32 age;           // 发起请求的人的年龄,可选
    }
    
    exception RequestException {
       1: required i32 code;
       2: optional string reason;
    }
    
    // 服务名
    service HelloWordService {
       string doAction(1: Request request) throws (1:RequestException qe); // 可能抛出异常。
    }
    
    //乘法
    service MultiSerivce{
        i32 multi(1:i32 n1, 2:i32 n2);
    }

    定义了两个服务,一个是hello,一个是mult。第一输出hello信息,第二个做乘法运算。使用thrift命令,生成java代码

    thrift -gen java Test.thrift

    Server端,两个服务实现类

    import org.apache.commons.lang3.StringUtils;
    
    import java.util.Date;
    
    public class HelloWordServiceImpl implements HelloWordService.Iface {
    
        // 实现这个方法完成具体的逻辑。
        public String doAction(Request request)
                throws RequestException, org.apache.thrift.TException {
            System.out.println("Get request: " + request);
            if (StringUtils.isBlank(request.getName()) || request.getType() == null) {
                throw new com.gxf.thrift.RequestException();
            }
            String result = "Hello, " + request.getName();
            if (request.getType() == com.gxf.thrift.RequestType.SAY_HELLO) {
                result += ", Welcome!";
            } else {
                result += ", Now is " + new Date().toLocaleString();
            }
            return result;
    
        }
    }
    import org.apache.thrift.TException;
    
    public class MultiServiceImpl implements MultiSerivce.Iface {
        @Override
        public int multi(int n1, int n2) throws TException {
            return n1 + n2;
        }
    }

    Service端服务启动类

    import org.apache.thrift.TMultiplexedProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.transport.TServerSocket;
    
    import java.net.ServerSocket;
    
    public class HelloWordServer {
    
        public static void main(String[] args) throws Exception {
            ServerSocket socket = new ServerSocket(7912);
            TServerSocket serverTransport = new TServerSocket(socket);
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
    
            TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
            multiplexedProcessor.registerProcessor("helloService", new HelloWordService.Processor<>(
                    new HelloWordServiceImpl()));
            multiplexedProcessor.registerProcessor("multiService", new MultiSerivce.Processor<>(
                    new MultiServiceImpl()
            ));
    
           
            TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
            serverArgs.processor(multiplexedProcessor);
            serverArgs.protocolFactory(proFactory);
            TServer server = new TThreadPoolServer(serverArgs);
            System.out.println("Start server on port 7912...");
    
            server.serve();
        }
    
    }

    Client端测试类

    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    public class HelloWordClient {
        public static void main(String[] args) throws Exception {
            TTransport transport = new TSocket("127.0.0.1", 7912);
    
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "multiService");
            transport.open();
            MultiSerivce.Client multiClient = new MultiSerivce.Client(tMultiplexedProtocol);
            int mulRes = multiClient.multi(1, 3);
            System.out.println("mulRes = " + mulRes);
    
            TMultiplexedProtocol helloProtocol = new TMultiplexedProtocol(protocol, "helloService");
            HelloWordService.Client helloClient = new HelloWordService.Client(helloProtocol);
            Request helloRequest = new Request();
            helloRequest.setAge(28);
            helloRequest.setName("guanxiangfei");
            helloRequest.setType(RequestType.QUERY_TIME);
            String helloRes = helloClient.doAction(helloRequest);
            System.out.println("helloRes: " + helloRes);
    
    
            transport.close();  // 请求结束,断开连接
        }
    
    }

    下面主要分析Server端源码

    跟进TMultiplexedProcessor类

    public class TMultiplexedProcessor implements TProcessor {
    
        private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
                = new HashMap<String,TProcessor>();
        private TProcessor defaultProcessor;

    这里有个map存放 servicename --> processor,接着看注册源码

    public void registerProcessor(String serviceName, TProcessor processor) {
            SERVICE_PROCESSOR_MAP.put(serviceName, processor);
        }

    直接在map中放了servicename --> processor。我们跟进serve方法

    public void serve() {
        try {
          serverTransport_.listen();
        } catch (TTransportException ttx) {
          LOGGER.error("Error occurred during listening.", ttx);
          return;
        }
    
        // Run the preServe event
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }
    
        setServing(true);
    
        while (!stopped_) {
          TTransport client = null;
          TProcessor processor = null;
          TTransport inputTransport = null;
          TTransport outputTransport = null;
          TProtocol inputProtocol = null;
          TProtocol outputProtocol = null;
          ServerContext connectionContext = null;
          try {
            client = serverTransport_.accept();
            if (client != null) {
              processor = processorFactory_.getProcessor(client);
              inputTransport = inputTransportFactory_.getTransport(client);
              outputTransport = outputTransportFactory_.getTransport(client);
              inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
              outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
              if (eventHandler_ != null) {
                connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
              }
              while (true) {
                if (eventHandler_ != null) {
                  eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
                }
                if(!processor.process(inputProtocol, outputProtocol)) {
                  break;
                }
              }
            }
          } catch (TTransportException ttx) {
            // Client died, just move on
          } catch (TException tx) {
            if (!stopped_) {
              LOGGER.error("Thrift error occurred during processing of message.", tx);
            }
          } catch (Exception x) {
            if (!stopped_) {
              LOGGER.error("Error occurred during processing of message.", x);
            }
          }
    
          if (eventHandler_ != null) {
            eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
          }
    
          if (inputTransport != null) {
            inputTransport.close();
          }
    
          if (outputTransport != null) {
            outputTransport.close();
          }
    
        }
        setServing(false);
      }

    有个accept接收到客户端连接,重点看

    if(!processor.process(inputProtocol, outputProtocol)) {
                  break;
                }

    这里是处理客户断传来的请求,继续跟进

    @Override
      public boolean process(TProtocol in, TProtocol out) throws TException {
        TMessage msg = in.readMessageBegin();
        ProcessFunction fn = processMap.get(msg.name);
        if (fn == null) {
          TProtocolUtil.skip(in, TType.STRUCT);
          in.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
          out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
          x.write(out);
          out.writeMessageEnd();
          out.getTransport().flush();
          return true;
        }
        fn.process(msg.seqid, in, out, iface);
        return true;
      }

    获取客户端的请求类型和方法,获取服务端注册的service。跟进fn.process()看服务端处理过程

    public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
        T args = getEmptyArgsInstance();
        try {
          args.read(iprot);
        } catch (TProtocolException e) {
          iprot.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd();
        TSerializable result = null;
        byte msgType = TMessageType.REPLY;
    
        try {
          result = getResult(iface, args);
        } catch (TTransportException ex) {
          LOGGER.error("Transport error while processing " + getMethodName(), ex);
          throw ex;
        } catch (TApplicationException ex) {
          LOGGER.error("Internal application error processing " + getMethodName(), ex);
          result = ex;
          msgType = TMessageType.EXCEPTION;
        } catch (Exception ex) {
          LOGGER.error("Internal error processing " + getMethodName(), ex);
          if(!isOneway()) {
            result = new TApplicationException(TApplicationException.INTERNAL_ERROR,
                "Internal error processing " + getMethodName());
            msgType = TMessageType.EXCEPTION;
          }
        }
    
        if(!isOneway()) {
          oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
          result.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
        }
      }

    接着跟进getResult方法,看下服务端如何计算结果

    public multi_result getResult(I iface, multi_args args) throws org.apache.thrift.TException {
            multi_result result = new multi_result();
            result.success = iface.multi(args.n1, args.n2);
            result.setSuccessIsSet(true);
            return result;
          }

    这里可以看出,调用了服务端实现类对象对应的方法。保存在multi_result对象中,序列化发给客户端。

    总结:

    单端口,多服务,主要就是用一个map放service-->processor映射。客户端传servicename给服务端

  • 相关阅读:
    小菜读书---《程序员修炼之道–从小工到专家》
    小菜读书--《大话设计模式》
    darknet-mini:带注释的darknet简化版,助你深入理解YOLO
    如何使用VLC进行视频录像
    海康相机打开的方法
    YOLOv5训练自己的数据集(超详细完整版)
    用GANs来做数据增强
    torch ----------->>>>rknn
    Linux下的tar压缩解压缩命令详解
    小缺陷目标检测网络--PCB缺陷检测—TDD-net
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9393618.html
Copyright © 2011-2022 走看看