zoukankan      html  css  js  c++  java
  • ScheduleServerRunnable2

    package com.xx.schedule.thrift.server;
    
    import com.xx.schedule.thrift.service.ScheduleService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.thrift.TMultiplexedProcessor;
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocolFactory;
    import org.apache.thrift.server.THsHaServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.apache.thrift.transport.TTransportException;
    
    /**
     * @version 1.0.0
     * @author xx.cn
     * @date 2020-09-27 14:21
     */
    @Slf4j
    public class ScheduleServerRunnable2 implements Runnable{
        private int port;
        private ScheduleService.Iface scheduleService;
        private TServer server;
    
        private static final int THRIFT_TIMEOUT = 5000;
        private static final int THRIFT_TCP_BACKLOG = 5000;
        private static final int THRIFT_CORE_THREADS = Runtime.getRuntime().availableProcessors();
        private static final int THRIFT_MAX_THREADS = 2 * Runtime.getRuntime().availableProcessors();
        private static final int THRIFT_SELECTOR_THREADS = 16;
        private static final TProtocolFactory THRIFT_PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
        // 16MB
        private static final int THRIFT_MAX_FRAME_SIZE = 16 * 1024 * 1024;
        // 4MB
        private static final int THRIFT_MAX_READ_BUF_SIZE = 4 * 1024 * 1024;
    
        public ScheduleServerRunnable2(int port, ScheduleService.Iface scheduleService) {
            this.port = port;
            this.scheduleService = scheduleService;
        }
    
        @Override
        public void run() {
            try {
                log.info("start server");
                buildServer().serve();
            } catch (Exception e) {
                log.error("start server error", e);
            }
        }
    
        public void stop(){
            log.info("stop server");
            server.stop();
        }
    
        private TServer buildServer() throws TTransportException {
            TNonblockingServerSocket.NonblockingAbstractServerSocketArgs socketArgs = new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs()
                    .port(port)
                    .clientTimeout(THRIFT_TIMEOUT)
                    .backlog(THRIFT_TCP_BACKLOG);
    
            try (TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(socketArgs)){
                TProcessor processor = new ScheduleService.Processor<ScheduleService.Iface>(scheduleService);
    
                TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
                multiplexedProcessor.registerProcessor("scheduleService", processor);
    
                // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
                THsHaServer.Args args = new THsHaServer.Args(serverTransport)
                        .transportFactory(new TFramedTransport.Factory(THRIFT_MAX_FRAME_SIZE))
    //                    .inputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
    //                    .outputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
                        .protocolFactory(THRIFT_PROTOCOL_FACTORY)
                        .processor(multiplexedProcessor);
    
                args.maxReadBufferBytes = THRIFT_MAX_READ_BUF_SIZE;
    
                server = new THsHaServer(args);
            }
            return server;
        }
    }
    /*
            TProcessor tprocessor = new ScheduleService.Processor<ScheduleService.Iface>(scheduleService);
            TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(port);
            THsHaServer.Args tArgs = new THsHaServer.Args(tnbSocketTransport);
            tArgs.processor(tprocessor);
            tArgs.transportFactory(new TFramedTransport.Factory());
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            //半同步半异步的服务模型
            TServer server = new THsHaServer(tArgs);
            */
    package com.xx.schedule.client;
    
    import com.xx.schedule.thrift.model.PpcResponse;
    import com.xx.schedule.thrift.service.ScheduleService;
    import com.google.common.collect.Lists;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    /**
     * @version 1.0.0
     * @author xx.cn
     * @date 2020-09-25 14:02
     */
    @Slf4j
    public class ScheduleClientDemo2 {
        public static final String SERVER_IP = "192.168.1.18";
        public static final int SERVER_PORT = 7777;
        public static final int TIMEOUT = 30000;
    
        public void startClient() {
            try(TTransport transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT))){
                TProtocol protocol = new TBinaryProtocol(transport);
                ScheduleService.Client client = new ScheduleService.Client(protocol);
                transport.open();
                client.receive(new PpcResponse().setUuid("uuid").setResult(Lists.newArrayList("world")));
            } catch(Exception ex){
                log.error("startClient error", ex);
            }
        }
    
        public static void main(String[] args) {
            ScheduleClientDemo2 client = new ScheduleClientDemo2();
            client.startClient();
        }
    }
  • 相关阅读:
    python之函数对象、函数嵌套、名称空间与作用域、装饰器
    python之函数
    python基础-小练习
    python基础之文件操作
    python基础之字符编码
    web开发-Django博客系统
    HotSpot的算法实现
    垃圾回收机制(GC)
    Java注意点...
    JVM内存区域及对象
  • 原文地址:https://www.cnblogs.com/exmyth/p/13755177.html
Copyright © 2011-2022 走看看