zoukankan      html  css  js  c++  java
  • Netty客户端发送消息并同步获取结果

    客户端发送消息并同步获取结果,其实是违背Netty的设计原则的,但是有时候不得不这么做的话,那么建议进行如下的设计:

    比如我们的具体用法如下:

      NettyRequest request = new NettyRequest();
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setClassName(method.getDeclaringClass().getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameterValues(args);
    
                    NettyMessage nettyMessage = new NettyMessage();
                    nettyMessage.setType(MessageType.SERVICE_REQ.value());
                    nettyMessage.setBody(request);
    
                    if (serviceDiscovery != null) {
                        serverAddress = serviceDiscovery.discover();
                    }
                    String[] array = serverAddress.split(":");
                    String host = array[0];
                    int port = Integer.parseInt(array[1]);
    
                    NettyClient client = new NettyClient(host, port);
                    NettyMessage nettyResponse = client.send(nettyMessage);
                    if (nettyResponse != null) {
                        return JSON.toJSONString(nettyResponse.getBody());
                    } else {
                        return null;
                    }

    先来看看NettyClient的写法 和 send方法的写法:

    public class NettyClient {
    
        /**
         * 日志记录
         */
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        /**
         * 客户端业务处理handler
         */
        private ClientHandler clientHandler = new ClientHandler();
    
        /**
         * 事件池
         */
        private EventLoopGroup group = new NioEventLoopGroup();
    
        /**
         * 启动器
         */
        private Bootstrap bootstrap = new Bootstrap();
    
        /**
         * 客户端通道
         */
        private Channel clientChannel;
    
        /**
         * 客户端连接
         * @param host
         * @param port
         * @throws InterruptedException
         */
        public NettyClient(String host, int port) throws InterruptedException {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));
                            channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
                            channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                            channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
                            channel.pipeline().addLast("clientHandler", clientHandler);
                            channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
                        }
                    });
    
            //发起同步连接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port);
    
            //注册连接事件
            channelFuture.addListener((ChannelFutureListener)future -> {
                //如果连接成功
                if (future.isSuccess()) {
                    logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
                    clientChannel = channelFuture.channel();
                }
                //如果连接失败,尝试重新连接
                else{
                    logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中...");
                    future.channel().close();
                    bootstrap.connect(host, port);
                }
            });
    
            //注册关闭事件
            channelFuture.channel().closeFuture().addListener(cfl -> {
                close();
                logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
            });
        }
    
        /**
         * 客户端关闭
         */
        private void close() {
            //关闭客户端套接字
            if(clientChannel!=null){
                clientChannel.close();
            }
            //关闭客户端线程组
            if (group != null) {
                group.shutdownGracefully();
            }
        }
    
        /**
         * 客户端发送消息
         * @param message
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         */
        public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {
            ChannelPromise promise = clientHandler.sendMessage(message);
            promise.await(3, TimeUnit.SECONDS);
            return clientHandler.getResponse();
        }
    }

    可以看出,我们使用了clientHandler来进行消息发送行为,通过promise阻塞来同步获取返回结果,接下来看看sendMessage的写法:

    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
    
        private ChannelHandlerContext ctx;
        private ChannelPromise promise;
        private NettyMessage response;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            this.ctx = ctx;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {
                response = message;
                promise.setSuccess();
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        public synchronized ChannelPromise sendMessage(Object message) {
            while (ctx == null) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1);
                    //logger.error("等待ChannelHandlerContext实例化");
                } catch (InterruptedException e) {
                    logger.error("等待ChannelHandlerContext实例化过程中出错",e);
                }
            }
            promise = ctx.newPromise();
            ctx.writeAndFlush(message);
            return promise;
        }
    
        public NettyMessage getResponse(){
            return response;
        }
    
    }

    可以看到,在利用ChannelHanderContext进行发送消息前,我们先创建了一个promise并返回给send方法,那么send方法此时就会阻塞等待;当我们收到服务端消息后,promise.setSuccess就会解除send方法的等待行为,这样我们就能获取结果了。

    此法针对真正需要同步等待获取结果的场景,如非必要,还是建议利用future来改造。

    benchmark测试表明,此种同步获取结果的行为,表现挺稳定的,但是ops 在 150 左右, 真是性能太差了。高性能场合禁用此法。

  • 相关阅读:
    mysql关联查询
    文本框,下拉框,单选框只读状态属性
    sql索引实例
    sql视图实例
    SQL触发器实例
    存储过程实例
    sql 、linq、lambda 查询语句的区别
    LINQ中的一些查询语句格式
    面试宝典
    SQL常用语句
  • 原文地址:https://www.cnblogs.com/scy251147/p/10721736.html
Copyright © 2011-2022 走看看