zoukankan      html  css  js  c++  java
  • Netty客户端发送消息并同步获取结果

    客户端发送消息并同步获取结果,其实是违背Netty的设计原则的,但是有时候不得不这么做的话,那么建议进行如下的设计:

    比如我们的具体用法如下:

      NettyRequest request = new NettyRequest();
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setClassName(method.getDeclaringClass().getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameterValues(args);
    
                    NettyMessage nettyMessage = new NettyMessage();
                    nettyMessage.setType(MessageType.SERVICE_REQ.value());
                    nettyMessage.setBody(request);
    
                    if (serviceDiscovery != null) {
                        serverAddress = serviceDiscovery.discover();
                    }
                    String[] array = serverAddress.split(":");
                    String host = array[0];
                    int port = Integer.parseInt(array[1]);
    
                    NettyClient client = new NettyClient(host, port);
                    NettyMessage nettyResponse = client.send(nettyMessage);
                    if (nettyResponse != null) {
                        return JSON.toJSONString(nettyResponse.getBody());
                    } else {
                        return null;
                    }

    先来看看NettyClient的写法 和 send方法的写法:

    public class NettyClient {
    
        /**
         * 日志记录
         */
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        /**
         * 客户端业务处理handler
         */
        private ClientHandler clientHandler = new ClientHandler();
    
        /**
         * 事件池
         */
        private EventLoopGroup group = new NioEventLoopGroup();
    
        /**
         * 启动器
         */
        private Bootstrap bootstrap = new Bootstrap();
    
        /**
         * 客户端通道
         */
        private Channel clientChannel;
    
        /**
         * 客户端连接
         * @param host
         * @param port
         * @throws InterruptedException
         */
        public NettyClient(String host, int port) throws InterruptedException {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));
                            channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
                            channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                            channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
                            channel.pipeline().addLast("clientHandler", clientHandler);
                            channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
                        }
                    });
    
            //发起同步连接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port);
    
            //注册连接事件
            channelFuture.addListener((ChannelFutureListener)future -> {
                //如果连接成功
                if (future.isSuccess()) {
                    logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
                    clientChannel = channelFuture.channel();
                }
                //如果连接失败,尝试重新连接
                else{
                    logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中...");
                    future.channel().close();
                    bootstrap.connect(host, port);
                }
            });
    
            //注册关闭事件
            channelFuture.channel().closeFuture().addListener(cfl -> {
                close();
                logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
            });
        }
    
        /**
         * 客户端关闭
         */
        private void close() {
            //关闭客户端套接字
            if(clientChannel!=null){
                clientChannel.close();
            }
            //关闭客户端线程组
            if (group != null) {
                group.shutdownGracefully();
            }
        }
    
        /**
         * 客户端发送消息
         * @param message
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         */
        public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {
            ChannelPromise promise = clientHandler.sendMessage(message);
            promise.await(3, TimeUnit.SECONDS);
            return clientHandler.getResponse();
        }
    }

    可以看出,我们使用了clientHandler来进行消息发送行为,通过promise阻塞来同步获取返回结果,接下来看看sendMessage的写法:

    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
    
        private ChannelHandlerContext ctx;
        private ChannelPromise promise;
        private NettyMessage response;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            this.ctx = ctx;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {
                response = message;
                promise.setSuccess();
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        public synchronized ChannelPromise sendMessage(Object message) {
            while (ctx == null) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1);
                    //logger.error("等待ChannelHandlerContext实例化");
                } catch (InterruptedException e) {
                    logger.error("等待ChannelHandlerContext实例化过程中出错",e);
                }
            }
            promise = ctx.newPromise();
            ctx.writeAndFlush(message);
            return promise;
        }
    
        public NettyMessage getResponse(){
            return response;
        }
    
    }

    可以看到,在利用ChannelHanderContext进行发送消息前,我们先创建了一个promise并返回给send方法,那么send方法此时就会阻塞等待;当我们收到服务端消息后,promise.setSuccess就会解除send方法的等待行为,这样我们就能获取结果了。

    此法针对真正需要同步等待获取结果的场景,如非必要,还是建议利用future来改造。

    benchmark测试表明,此种同步获取结果的行为,表现挺稳定的,但是ops 在 150 左右, 真是性能太差了。高性能场合禁用此法。

  • 相关阅读:
    out/host/linuxx86/obj/EXECUTABLES/aapt_intermediates/aapt 64 32 操作系统
    linux 查看路由器 电脑主机 端口号 占用
    linux proc进程 pid stat statm status id 目录 解析 内存使用
    linux vim 设置大全详解
    ubuntu subclipse svn no libsvnjavahl1 in java.library.path no svnjavahl1 in java.library.path no s
    win7 安装 ubuntu 双系统 详解 easybcd 工具 不能进入 ubuntu 界面
    Atitit.json xml 序列化循环引用解决方案json
    Atitit.编程语言and 自然语言的比较and 编程语言未来的发展
    Atitit.跨语言  文件夹与文件的io操作集合  草案
    Atitit.atijson 类库的新特性设计与实现 v3 q31
  • 原文地址:https://www.cnblogs.com/scy251147/p/10721736.html
Copyright © 2011-2022 走看看