zoukankan      html  css  js  c++  java
  • RPC框架motan: 通信框架netty之Netty4Client

     上文已经初步探讨了如何实现一个具体的transport,本文就来讨论一个具体的transport,本文讨论netty4的的相关实现。老规矩,看看motan-transport的目录结构。

     其中最重要的类是啥,大声说出来,对,就是Netty4Client和Netty4Server。

                       图1-1motan-transport-netty4的源码结构


      Netty4Client

               首先看看NettyClinet的代码,最重要的就是open和request方法,可以看到open主要完成初始化工作。request主要请求和响应功能,heart表示心跳请求。

    public class Netty4Client extends AbstractPoolClient implements StatisticCallback {
        // 回收过期任务
        private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);
    
        // 异步的request,需要注册callback future
        // 触发remove的操作有: 1) service的返回结果处理。 2) timeout thread cancel
        protected ConcurrentMap<Long, NettyResponseFuture> callbackMap = new ConcurrentHashMap<Long, NettyResponseFuture>();
    
        private ScheduledFuture<?> timeMonitorFuture = null;
    
    
        // 连续失败次数
        private AtomicLong errorCount = new AtomicLong(0);
        // 最大连接数
        private int maxClientConnection = 0;
    
        private Bootstrap bootstrap;
    
        public Netty4Client(URL url) {
            super(url);
    
            maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(),
                    URLParamType.maxClientConnection.getIntValue());
    
            timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
                    new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
                    MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
                    TimeUnit.MILLISECONDS);
        }
    
        @Override
        public Response request(Request request) throws TransportException {
            if (!isAvailable()) {
                throw new MotanServiceException("NettyChannel is unavaliable: url=" + url.getUri()
                        + MotanFrameworkUtil.toString(request));
            }
            boolean async = url.getMethodParameter(request.getMethodName(), request.getParamtersDesc()
                    , URLParamType.async.getName(), URLParamType.async.getBooleanValue());
            return request(request, async);
        }
    
        @Override
        public void heartbeat(Request request) {
            // 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了
            if (state.isUnInitState() || state.isCloseState()) {
                LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
                return;
            }
    
            LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri());
    
            try {
                // async request后,如果service is
                // available,那么将会自动把该client设置成可用
                request(request, true);
            } catch (Exception e) {
                LoggerUtil.error("NettyClient heartbeat Error: url=" + url.getUri(), e);
            }
        }
    
        /**
         * 请求remote service
         * <p>
         * <pre>
         *         1)  get connection from pool
         *         2)  async requset
         *         3)  return connection to pool
         *         4)  check if async return response, true: return ResponseFuture;  false: return result
         * </pre>
         *
         * @param request
         * @param async
         * @return
         * @throws TransportException
         */
        private Response request(Request request, boolean async) throws TransportException {
            Channel channel = null;
    
            Response response = null;
    
            try {
                // return channel or throw exception(timeout or connection_fail)
                channel = borrowObject();
    
                if (channel == null) {
                    LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " "
                            + MotanFrameworkUtil.toString(request));
                    return null;
                }
    
                // async request
                response = channel.request(request);
                // return channel to pool
                returnObject(channel);
            } catch (Exception e) {
                LoggerUtil.error(
                        "NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request), e);
                //TODO 对特定的异常回收channel
                invalidateObject(channel);
    
                if (e instanceof MotanAbstractException) {
                    throw (MotanAbstractException) e;
                } else {
                    throw new MotanServiceException("NettyClient request Error: url=" + url.getUri() + " "
                            + MotanFrameworkUtil.toString(request), e);
                }
            }
    
            // aysnc or sync result
            response = asyncResponse(response, async);
    
            return response;
        }
    
        /**
         * 如果async是false,那么同步获取response的数据
         *
         * @param response
         * @param async
         * @return
         */
        private Response asyncResponse(Response response, boolean async) {
            if (async || !(response instanceof NettyResponseFuture)) {
                return response;
            }
    
            return new DefaultResponse(response);
        }
    
        @Override
        public synchronized boolean open() {
            if (isAvailable()) {
                LoggerUtil.warn("NettyServer ServerChannel already Opened: url=" + url);
                return true;
            }
    
            // 初始化netty client bootstrap
            initClientBootstrap();                  
    
            // 初始化连接池
            initPool();
    
            LoggerUtil.info("NettyClient finish Open: url={}", url);
    
            // 注册统计回调
            StatsUtil.registryStatisticCallback(this);
    
            // 设置可用状态
            state = ChannelState.ALIVE;
            return state.isAliveState();
        }
    
        /**
         * 初始化 netty clientBootstrap
         */
        private void initClientBootstrap() {
            bootstrap = new Bootstrap();
    
            NioEventLoopGroup group = new NioEventLoopGroup();
            bootstrap.group(group).channel(NioSocketChannel.class).handler(new Netty4ClientInitializer(url, codec, this));
    
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    
            /* 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
             如果为了严格意义的timeout,那么需要应用端进行控制。
             */
            int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
            if (timeout <= 0) {
                throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                        MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
            }
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
        }
    
        @Override
        public synchronized void close() {
            close(0);
        }
    
        /**
         * 目前close不支持timeout的概念
         */
        @Override
        public synchronized void close(int timeout) {
            if (state.isCloseState()) {
                LoggerUtil.info("NettyClient close fail: already close, url={}", url.getUri());
                return;
            }
    
            // 如果当前nettyClient还没有初始化,那么就没有close的理由。
            if (state.isUnInitState()) {
                LoggerUtil.info("NettyClient close Fail: don't need to close because node is unInit state: url={}",
                        url.getUri());
                return;
            }
    
            try {
                // 取消定期的回收任务
                timeMonitorFuture.cancel(true);
                // 关闭连接池
                pool.close();
                // 清空callback
                callbackMap.clear();
    
                // 设置close状态
                state = ChannelState.CLOSE;
                // 解除统计回调的注册
                StatsUtil.unRegistryStatisticCallback(this);
                LoggerUtil.info("NettyClient close Success: url={}", url.getUri());
            } catch (Exception e) {
                LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e);
            }
    
        }
    
        @Override
        public boolean isClosed() {
            return state.isCloseState();
        }
    
        @Override
        public boolean isAvailable() {
            return state.isAliveState();
        }
    
        @Override
        public URL getUrl() {
            return url;
        }
    
        /**
         * connection factory
         */
        @Override
        protected BasePoolableObjectFactory createChannelFactory() {
            return new NettyChannelFactory(this);
        }
    
        /**
         * 增加调用失败的次数:
         * <p>
         * <pre>
         *          如果连续失败的次数 >= maxClientConnection, 那么把client设置成不可用状态
         * </pre>
         */
        void incrErrorCount() {
            long count = errorCount.incrementAndGet();
    
            // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用
            if (count >= maxClientConnection && state.isAliveState()) {
                synchronized (this) {
                    count = errorCount.longValue();
    
                    if (count >= maxClientConnection && state.isAliveState()) {
                        LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "
                                + url.getServerPortStr());
                        state = ChannelState.UNALIVE;
                    }
                }
            }
        }
    
        /**
         * 重置调用失败的计数 :
         * <p>
         * <pre>
         * 把节点设置成可用
         * </pre>
         */
        void resetErrorCount() {
            errorCount.set(0);
    
            if (state.isAliveState()) {
                return;
            }
    
            synchronized (this) {
                if (state.isAliveState()) {
                    return;
                }
    
                // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略
                if (state.isUnAliveState()) {
                    long count = errorCount.longValue();
    
                    // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断
                    if (count < maxClientConnection) {
                        state = ChannelState.ALIVE;
                        LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " "
                                + url.getServerPortStr());
                    }
                }
            }
        }
    
        /**
         * 注册回调的resposne
         * <p>
         * <pre>
         *
         *         进行最大的请求并发数的控制,如果超过NETTY_CLIENT_MAX_REQUEST的话,那么throw reject exception
         *
         * </pre>
         *
         * @param requestId
         * @param nettyResponseFuture
         * @throws MotanServiceException
         */
        public void registerCallback(long requestId, NettyResponseFuture nettyResponseFuture) {
            if (this.callbackMap.size() >= MotanConstants.NETTY_CLIENT_MAX_REQUEST) {
                // reject request, prevent from OutOfMemoryError
                throw new MotanServiceException("NettyClient over of max concurrent request, drop request, url: "
                        + url.getUri() + " requestId=" + requestId, MotanErrorMsgConstant.SERVICE_REJECT);
            }
    
            this.callbackMap.put(requestId, nettyResponseFuture);
        }
    
        /**
         * 统计回调接口
         */
        @Override
        public String statisticCallback() {
            //避免消息泛滥,如果节点是可用状态,并且堆积的请求不超过100的话,那么就不记录log了
            if (isAvailable() && callbackMap.size() < 100) {
                return null;
            }
    
            return String.format("identity: %s available: %s concurrent_count: %s", url.getIdentity(), isAvailable(),
                    callbackMap.size());
        }
    
        /**
         * 移除回调的response
         *
         * @param requestId
         * @return
         */
        public NettyResponseFuture removeCallback(long requestId) {
            return callbackMap.remove(requestId);
        }
    
        public Bootstrap getBootstrap() {
            return bootstrap;
        }
    
        /**
         * 回收超时任务
         *
         * @author maijunsheng
         */
        class TimeoutMonitor implements Runnable {
            private String name;
    
            public TimeoutMonitor(String name) {
                this.name = name;
            }
    
            public void run() {
    
                long currentTime = System.currentTimeMillis();
    
                for (Map.Entry<Long, NettyResponseFuture> entry : callbackMap.entrySet()) {
                    try {
                        NettyResponseFuture future = entry.getValue();
                        if (future.getCreateTime() + future.getTimeout() <currentTime) {
                            // timeout: remove from callback list, and then cancel
                            removeCallback(entry.getKey());
                            future.cancel();
                        }
                    } catch (Exception e) {
                        LoggerUtil.error(
                                name + " clear timeout future Error: uri=" + url.getUri() + " requestId=" + entry.getKey(),
                                e);
                    }
                }
            }
        }
    }

      

    open的最核心功就是各种设置。

    initClientBootstrap流程中
    bootstrap.group(group).channel(NioSocketChannel.class).handler(new Netty4ClientInitializer(url, codec, this));
    这句中的Netty4ClientInitializer泪中,指定如下3个handler.

    Netty4Decoder:协议的解码器

    Netty4Encoder:协议的编码器
    Netty4ClientHandler:对服务器的返回结果的处理。
     1 public class Netty4ClientInitializer extends ChannelInitializer<SocketChannel> {
     2 
     3     private URL url;
     4 
     5     private Codec codec;
     6 
     7     private Netty4Client client;
     8 
     9     public Netty4ClientInitializer(URL url, Codec codec, Netty4Client client) {
    10         this.url = url;
    11         this.codec = codec;
    12         this.client = client;
    13     }
    14 
    15     @Override
    16     protected void initChannel(SocketChannel ch) throws Exception {
    17         ChannelPipeline p = ch.pipeline();
    18         // 最大响应包限制
    19         int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
    20                 URLParamType.maxContentLength.getIntValue());
    21         p.addLast("decoder", new Netty4Decoder(codec, client, maxContentLength));
    22         p.addLast("encoder", new Netty4Encoder(codec, client));
    23         p.addLast("handler", new Netty4ClientHandler(client));
    24 
    25     }
    26 }

     编码和解码是非常重要的过程,有个模块专门负责这个过程,现在不表。

    Netty4ClientHandler主要是request()中对响应的处理,最终执行就是以下FutureListener中的operationComplete相关代码。
     
     1 if (result && writeFuture.isSuccess()) {
     2             response.addListener(new FutureListener() {
     3                 @Override
     4                 public void operationComplete(Future future) throws Exception {
     5                     if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
     6                         // 成功的调用 
     7                         nettyClient.resetErrorCount();
     8                     } else {
     9                         // 失败的调用 
    10                         nettyClient.incrErrorCount();
    11                     }
    12                 }
    13             });
    14             return response;
    15         }
     

       

     


         
  • 相关阅读:
    AndroidStudio小技巧--依赖库
    仿iOS Segmented Control样式"
    Metaweblog在Android上使用
    正则表达式使用技巧
    flask中gunicorn的使用
    Git用法小记
    指定GPU训练模型
    python中grpc的使用示例
    如何用LaTex编辑数学公式
    keras使用多进程
  • 原文地址:https://www.cnblogs.com/hansongjiang/p/5607992.html
Copyright © 2011-2022 走看看