zoukankan      html  css  js  c++  java
  • dubbo源码-客户端发送请求与服务端接收请求源码

      之前研究了dubbo 服务端启动源码,启动之后会建立一个NettyServer,监听指定的dubbo 协议端口。 客户端在启动过程中,会与dubboserver 建立一个链接,并保持一直链接,也就是使用的是一个长连接。客户端多个请求共用的一个连接。下面研究其交互过程。

    1. 客户端发送请求与接收结果过程

     1.  客户端发送请求过程

      客户端调用 /listUser, 我们查看方法调用链如下:满足上面分析的请求调用链。

     1. org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke 方法如下:

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    }
    
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
            return result;
        }

    几个重要的对象:

    (1) Invoker 可以理解为调用者

    View Code

    (2) Invocation 包装请求参数、方法名称等信息

    View Code

    (3) Result 继承自 CompletionStage<Result>、Future<Result>、Serializable, 可用于异步获取任务等

    View Code

    2. 上面方法会调用到result = this.invoker.invoke(invocation); 然后交给:org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
    
            List<Invoker<T>> invokers = list(invocation);
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }

    1》 List<Invoker<T>> invokers = list(invocation); 获取到invokers 对象, 也就是从注册中心获取到provider 。

    invocation 对象如下:

     invokers  如下:

    2》initLoadBalance(invokers, invocation); 获取负载均衡策略,获取到的对象如下:

     3. 然后调用org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke 进行调用

        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyInvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }

    1》 getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; 获取到重试次数, 默认的DEFAULT_RETRIES 重试次数是2,所以这里就是3。然后循环3次进行尝试远程调用。

     2》 select(loadbalance, invocation, copyInvokers, invoked); 选择一个selector, 调用到:org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#select

    View Code

      最终交给org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance#doSelect 实现基于权重进行负载均衡。

    3》接着调用到org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.CallbackRegistrationInvoker#invoke

            @Override
            public Result invoke(Invocation invocation) throws RpcException {
                Result asyncResult = filterInvoker.invoke(invocation);
    
                asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
                    for (int i = filters.size() - 1; i >= 0; i--) {
                        Filter filter = filters.get(i);
                        // onResponse callback
                        if (filter instanceof ListenableFilter) {
                            Filter.Listener listener = ((ListenableFilter) filter).listener();
                            if (listener != null) {
                                if (t == null) {
                                    listener.onResponse(r, filterInvoker, invocation);
                                } else {
                                    listener.onError(t, filterInvoker, invocation);
                                }
                            }
                        } else {
                            filter.onResponse(r, filterInvoker, invocation);
                        }
                    }
                });
                return asyncResult;
            }

    4》 接着调用到:org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke

        public Result invoke(Invocation inv) throws RpcException {
            // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
            if (destroyed.get()) {
                logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                        + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
            }
            RpcInvocation invocation = (RpcInvocation) inv;
            invocation.setInvoker(this);
            if (CollectionUtils.isNotEmptyMap(attachment)) {
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
                /**
                 * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
                 * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
                 * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
                 * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
                 */
                invocation.addAttachments(contextAttachments);
            }
    
            invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            try {
                return doInvoke(invocation);
            } catch (InvocationTargetException e) { // biz exception
                Throwable te = e.getTargetException();
                if (te == null) {
                    return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
                } else {
                    if (te instanceof RpcException) {
                        ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                    }
                    return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
                }
            } catch (RpcException e) {
                if (e.isBiz()) {
                    return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
                } else {
                    throw e;
                }
            } catch (Throwable e) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            }
        }

    5》继续调用org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            inv.setAttachment(VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                    asyncRpcResult.subscribeTo(responseFuture);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    FutureContext.getContext().setCompatibleFuture(responseFuture);
                    return asyncRpcResult;
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

    6》然后走org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient#request(java.lang.Object, int)

    -》 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#request(java.lang.Object, int)

    -》org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int) 

        @Override
        public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }

    传递的参数request 是:

    timeout是超时参数。

    channel 是:

    注意:这里有个重要的过程。

    第一步:创建一个Request 对象,调用: org.apache.dubbo.remoting.exchange.Request#Request()

        public Request() {
            mId = newId();
        }
    
        private static long newId() {
            // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
            return INVOKE_ID.getAndIncrement();
        }

      可以看到生成了一个全局的消息ID,然后把需要发宋的数据封装到该Request 对象,这个消息ID是客户端和服务器端消息通信的标识,可以确保发送消息的客户端正确的得到自己的响应数据。

    第二步:DefaultFuture.newFuture(channel, req, timeout); 创建一个DefaultFuture,然后返回去这个DefaultFuture, 用于异步接收结果

        private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
    
        private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    
        public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
            final DefaultFuture future = new DefaultFuture(channel, request, timeout);
            // timeout check
            timeoutCheck(future);
            return future;
        }
    
        private DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            // put into waiting map.
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }

    7》继续调用调用到:org.apache.dubbo.remoting.transport.netty4.NettyChannel#send

        public void send(Object message, boolean sent) throws RemotingException {
            // whether the channel is closed
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.writeAndFlush(message);
                if (sent) {
                    // wait timeout ms
                    timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.cause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }

      这里的channel 就是netty的channel,如下:

       包装的msg 如下:

     8》 然后就是走netty的机制,从pipeline 中选择handler 进行链条调用,最后一个链条是: org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder#encode, 从org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen 添加的handler 可以看出, 最终会调用到:org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#encode

        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            if (msg instanceof Request) {
                encodeRequest(channel, buffer, (Request) msg);
            } else if (msg instanceof Response) {
                encodeResponse(channel, buffer, (Response) msg);
            } else {
                super.encode(channel, buffer, msg);
            }
        }

    继续调用: org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#encodeRequest  将数据写出去

        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
            Serialization serialization = getSerialization(channel);
            // header.
            byte[] header = new byte[HEADER_LENGTH];
            // set magic number.
            Bytes.short2bytes(MAGIC, header);
    
            // set request and serialization flag.
            header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    
            if (req.isTwoWay()) {
                header[2] |= FLAG_TWOWAY;
            }
            if (req.isEvent()) {
                header[2] |= FLAG_EVENT;
            }
    
            // set request id.
            Bytes.long2bytes(req.getId(), header, 4);
    
            // encode request data.
            int savedWriteIndex = buffer.writerIndex();
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            if (req.isEvent()) {
                encodeEventData(channel, out, req.getData());
            } else {
                encodeRequestData(channel, out, req.getData(), req.getVersion());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();
            int len = bos.writtenBytes();
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);
    
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        }

       到这里消息就从Netty的NioSockerChannel  发送出去,走Netty的一套机制。

    补充:阻塞等待返回结果机制

    1》org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke 调用asyncResult.get 阻塞等待结果:

                if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                    asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                }

    2》 org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke 

                   AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                    asyncRpcResult.subscribeTo(responseFuture);

    org.apache.dubbo.rpc.AsyncRpcResult#subscribeTo

        public void subscribeTo(CompletableFuture<?> future) {
            future.whenComplete((obj, t) -> {
                if (t != null) {
                    this.completeExceptionally(t);
                } else {
                    this.complete((Result) obj);
                }
            });
        }

      这里做的操作是:responseFuture 结束之后将responseFuture 的结果收集到 asyncRpcResult 结果内部。

    3》 继续调用到 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

        public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }

    org.apache.dubbo.remoting.exchange.support.DefaultFuture#DefaultFuture

        private DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            // put into waiting map.
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }

      这样就保证DefaultFuture 完成之后上面的AsyncRpcResult 也完成了, DefaultFuture 如何完成就看 org.apache.dubbo.remoting.exchange.support.DefaultFuture#FUTURES 缓存什么时候操作这个 future。

      这里用到JDK1.8 的CompletableFuture 编程。

    补充:客户端负载均衡机制

    org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#initLoadBalance    这里查找负载均衡策略,如下:可以看到默认的负载均衡策略为随机

        String DEFAULT_LOADBALANCE = "random";
    
        protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
            if (CollectionUtils.isNotEmpty(invokers)) {
                return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
            } else {
                return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
            }
        }

    dubbo 支持的负载均衡策略如下:

    补充: dubbo 集群容错机制

      dubbo 负载均衡是选择实例,集群容错,是指在集群发生错误的情况下的策略。

    org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker 是一个抽象模板类,里面的 invoke 方法调用doInvoke 方法,后续的策略只需实现doinvoke, invoke如下:

        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
    
            List<Invoker<T>> invokers = list(invocation);
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }

    支持的有如下几种实现者:

    具体的Cluster 如下:

     负载均衡策略和容错策略都可以在yml 配置:

    dubbo:
      application:
        name: dubbp-service-consumer
      registry:
        protocol: zookeeper
        address: zookeeper://192.168.99.100:2181
      consumer:
        timeout: 8000
        loadbalance: roundrobin
        cluster: failfast

    2. 客户端接收结果过程

    1. 接收消息的应该从 Decoder 开始排查 InternalDecoder

    1》org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder#decode, netty 数据写回来之后会从head 开始找handler 开始处理,所以会先经过Decoder。

            @Override
            protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    
                ChannelBuffer message = new NettyBackedChannelBuffer(input);
    
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    
                try {
                    // decode object.
                    do {
                        int saveReaderIndex = message.readerIndex();
                        Object msg = codec.decode(channel, message);
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            //is it possible to go here ?
                            if (saveReaderIndex == message.readerIndex()) {
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                out.add(msg);
                            }
                        }
                    } while (message.readable());
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ctx.channel());
                }
            }

      codec.decode(channel, message); 拿到的结果如下:(可以看到也有消息ID,这也就可以确保发出去和接收到的消息可以对应)

     2》 继续调用org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode

        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int save = buffer.readerIndex();
            MultiMessage result = MultiMessage.create();
            do {
                Object obj = codec.decode(channel, buffer);
                if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                    buffer.readerIndex(save);
                    break;
                } else {
                    result.addMessage(obj);
                    logMessageLength(obj, buffer.readerIndex() - save);
                    save = buffer.readerIndex();
                }
            } while (true);
            if (result.isEmpty()) {
                return Codec2.DecodeResult.NEED_MORE_INPUT;
            }
            if (result.size() == 1) {
                return result.get(0);
            }
            return result;
        }

    3》 org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#decode(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.buffer.ChannelBuffer)

        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int readable = buffer.readableBytes();
            byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
            buffer.readBytes(header);
            return decode(channel, buffer, readable, header);
        }

    4》 继续调用 org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

        protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
            byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
            // get request id.
            long id = Bytes.bytes2long(header, 4);
            if ((flag & FLAG_REQUEST) == 0) {
                // decode response.
                Response res = new Response(id);
                if ((flag & FLAG_EVENT) != 0) {
                    res.setEvent(true);
                }
                // get status.
                byte status = header[3];
                res.setStatus(status);
                try {
                    if (status == Response.OK) {
                        Object data;
                        if (res.isHeartbeat()) {
                            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                            data = decodeHeartbeatData(channel, in);
                        } else if (res.isEvent()) {
                            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                            data = decodeEventData(channel, in);
                        } else {
                            DecodeableRpcResult result;
                            if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                                result = new DecodeableRpcResult(channel, res, is,
                                        (Invocation) getRequestData(id), proto);
                                result.decode();
                            } else {
                                result = new DecodeableRpcResult(channel, res,
                                        new UnsafeByteArrayInputStream(readMessageData(is)),
                                        (Invocation) getRequestData(id), proto);
                            }
                            data = result;
                        }
                        res.setResult(data);
                    } else {
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        res.setErrorMessage(in.readUTF());
                    }
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                return res;
            } else {
                // decode request.
                Request req = new Request(id);
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay((flag & FLAG_TWOWAY) != 0);
                if ((flag & FLAG_EVENT) != 0) {
                    req.setEvent(true);
                }
                try {
                    Object data;
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    if (req.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, in);
                    } else if (req.isEvent()) {
                        data = decodeEventData(channel, in);
                    } else {
                        DecodeableRpcInvocation inv;
                        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                            inv = new DecodeableRpcInvocation(channel, req, is, proto);
                            inv.decode();
                        } else {
                            inv = new DecodeableRpcInvocation(channel, req,
                                    new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                        }
                        data = inv;
                    }
                    req.setData(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode request failed: " + t.getMessage(), t);
                    }
                    // bad request
                    req.setBroken(true);
                    req.setData(t);
                }
    
                return req;
            }

      这里构造 Response 对象然后返回去。

    2. 接下来走第二个handler, 也就是org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead

     1》org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.received(channel, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }

    2》 然后调用到org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getExecutorService();
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
                //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }

     org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable 如下:

    package org.apache.dubbo.remoting.transport.dispatcher;
    
    import org.apache.dubbo.common.logger.Logger;
    import org.apache.dubbo.common.logger.LoggerFactory;
    import org.apache.dubbo.remoting.Channel;
    import org.apache.dubbo.remoting.ChannelHandler;
    
    public class ChannelEventRunnable implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
    
        private final ChannelHandler handler;
        private final Channel channel;
        private final ChannelState state;
        private final Throwable exception;
        private final Object message;
    
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
            this(channel, handler, state, null);
        }
    
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
            this(channel, handler, state, message, null);
        }
    
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
            this(channel, handler, state, null, t);
        }
    
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
            this.channel = channel;
            this.handler = handler;
            this.state = state;
            this.message = message;
            this.exception = exception;
        }
    
        @Override
        public void run() {
            if (state == ChannelState.RECEIVED) {
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            } else {
                switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
                }
            }
    
        }
    
        /**
         * ChannelState
         *
         *
         */
        public enum ChannelState {
    
            /**
             * CONNECTED
             */
            CONNECTED,
    
            /**
             * DISCONNECTED
             */
            DISCONNECTED,
    
            /**
             * SENT
             */
            SENT,
    
            /**
             * RECEIVED
             */
            RECEIVED,
    
            /**
             * CAUGHT
             */
            CAUGHT
        }
    
    }

    3》 org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)

        public static void received(Channel channel, Response response, boolean timeout) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    Timeout t = future.timeoutCheckTask;
                    if (!timeout) {
                        // decrease Time
                        t.cancel();
                    }
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()));
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    
       private void doReceived(Response res) {
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            if (res.getStatus() == Response.OK) {
                this.complete(res.getResult());
            } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
            } else {
                this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
            }
        }

      根据消息ID 先移除FUTURES中的DefaultFuture; 然后调用其complete 完成该任务(这一步会导致org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke 创建的AsyncRpcResult 也完成任务,所以解除org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke 方法的阻塞 )。 然后走AsyncToSyncInvoker#invoke 的搜集结果,进行响应的流程。

    所以客户端共享一个netty 客户端通道的原理是:

    1》 发消息的时候封装成Request 对象,Request 包含一个全局的ID标识

    2》然后将request 封装成一个CompletableFuture 对象future,这个 future 对象是JDK1.8 线程相关对象。 并且这个对象future 缓存到一个map, key 是 request 的 id, value 是 future

    3》然后阻塞等待future 完成任务,也就是complete() 方法被调用。 至此发送方进入阻塞,然后会等待响应结果

    4》收到消息调用decoder 解析成 Response 对象,这个对象包含消息的ID、响应结果

    5》然后在线程池中根据消息ID获取到上面的 future 对象,然后调用其 complete 方法收集结果;最终会解除上面3》阻塞,这样会进行回传结果的响应,也达到客户端共享一个netty channel 可以正确通信的目的。

    2. 服务端接收请求与发送结果过程

      接收请求应该先从netty的handler 进行查看,消息从channel 接收到之后会先到handler。会先到decoderHandler,我们跳过这一步,直接查看 处理handler。

    1. org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.received(channel, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }

    收到的msg 信息如下: 可以看到由请求的方法名称,参数,接口等信息

     2. 然后经过一系列调用走到:org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getExecutorService();
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
                //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    View Code

      可以看到这里创建了一个异步任务然后交给异步任务。(也就是相当于任务从IO线程池转移到业务线程池)

    3. org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run

        public void run() {
            if (state == ChannelState.RECEIVED) {
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            } else {
                switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
                }
            }
    
        }
    View Code

    走代码块 state == ChannelState.RECEIVED

    4. org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest

        void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            if (req.isBroken()) {
                Object data = req.getData();
    
                String msg;
                if (data == null) {
                    msg = null;
                } else if (data instanceof Throwable) {
                    msg = StringUtils.toString((Throwable) data);
                } else {
                    msg = data.toString();
                }
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus(Response.BAD_REQUEST);
    
                channel.send(res);
                return;
            }
            // find handler by message class.
            Object msg = req.getData();
            try {
                CompletionStage<Object> future = handler.reply(channel, msg);
                future.whenComplete((appResult, t) -> {
                    try {
                        if (t == null) {
                            res.setStatus(Response.OK);
                            res.setResult(appResult);
                        } else {
                            res.setStatus(Response.SERVICE_ERROR);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                        channel.send(res);
                    } catch (RemotingException e) {
                        logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                    } finally {
                        // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                    }
                });
            } catch (Throwable e) {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
                channel.send(res);
            }
        }

      可以看看到创建了一个Response, 并且指定了ID和版本信息。然后执行 CompletionStage<Object>, whenComplete 任务完成之后通过 channel.send(res); 将结果写出去。

    5. org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply

            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
                if (!(message instanceof Invocation)) {
                    throw new RemotingException(channel, "Unsupported request: "
                            + (message == null ? null : (message.getClass().getName() + ": " + message))
                            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
                }
    
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
                return result.completionFuture().thenApply(Function.identity());
            }
    View Code

    1》 调用org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker 获取Invoker 调用者

        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            boolean isCallBackServiceInvoke = false;
            boolean isStubServiceInvoke = false;
            int port = channel.getLocalAddress().getPort();
            String path = inv.getAttachments().get(PATH_KEY);
    
            // if it's callback service on client side
            isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
            if (isStubServiceInvoke) {
                port = channel.getRemoteAddress().getPort();
            }
    
            //callback
            isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
            if (isCallBackServiceInvoke) {
                path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
                inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
            }
    
            String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
            if (exporter == null) {
                throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                        ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
            }
    
            return exporter.getInvoker();
        }
    View Code

    serviceKey 参数如下:

    cn.qz.dubbo.service.UserService:1.0.0:20990

    exporterMap 如下:(缓存了两个Invoker)

     2》 调用方法 invoker.invoke(inv);   

    6. 调用到org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke:

        public Result invoke(Invocation invocation) throws RpcException {
            try {
                Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
                CompletableFuture<Object> future = wrapWithFuture(value, invocation);
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
                future.whenComplete((obj, t) -> {
                    AppResponse result = new AppResponse();
                    if (t != null) {
                        if (t instanceof CompletionException) {
                            result.setException(t.getCause());
                        } else {
                            result.setException(t);
                        }
                    } else {
                        result.setValue(obj);
                    }
                    asyncRpcResult.complete(result);
                });
                return asyncRpcResult;
            } catch (InvocationTargetException e) {
                if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                    logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
                }
                return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
            } catch (Throwable e) {
                throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

    1》 doInvoke 调用到: org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#doInvoke

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    View Code

      可以看到是反射调用方法, proxy 就是Spring 容器里面的对象

    2》创建一个wrapWithFuture 包装了一个 future 对象。创建了一个 AsyncRpcResult 对象 , 然后收集到 future 的结果之后创建一个AppResponse 对象,然后返回 AsyncRpcResult。

    7. 然后到上面4 将结果写回去,写回去的数据如下:

    然后走: org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#send(java.lang.Object, boolean) 这里的channel 就是NttyChannel

        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
            }
            if (message instanceof Request
                    || message instanceof Response
                    || message instanceof String) {
                channel.send(message, sent);
            } else {
                Request request = new Request();
                request.setVersion(Version.getProtocolVersion());
                request.setTwoWay(false);
                request.setData(message);
                channel.send(request, sent);
            }
        }

    继续调用到org.apache.dubbo.remoting.transport.netty4.NettyChannel#send:

        public void send(Object message, boolean sent) throws RemotingException {
            // whether the channel is closed
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.writeAndFlush(message);
                if (sent) {
                    // wait timeout ms
                    timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.cause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }

      这里的channel 就是NioSocketChannel, 就走 netty 的一套机制了。

     补充: 消费者线程池。上面可以看到NettyServer 端线程池的模型是,一个bossGroup 接收请求(线程数为1),一个workerGroup(线程池大小默认是32,也可以理解为IO线程),还有一个业务线程池。是在下面代码获取的:

      org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler#getExecutorService 代码如下:

        public ExecutorService getExecutorService() {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) {
                cexecutor = SHARED_EXECUTOR;
            }
            return cexecutor;
        }

    这里业务线程和IO线程数可以指定,默认的业务线程池大小是200。比如修改线程池大小是10,修改可以从yml 修改

    dubbo:
      provider:
        iothreads: 1
        threads: 10

     测试如下:

    1. 查看org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead, 线程名如下:

    org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen 线程工厂如下:(IO线程开启)

        @Override
        protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // FIXME: should we use getTimeout()?
                            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }

     2. org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke 线程名如下:

    org.apache.dubbo.remoting.transport.netty4.NettyServer#NettyServer    设置业务线程工厂名前缀是上面的格式。

        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
            // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    深度系统安装wine
    Android应用真正的入口在哪里?
    推荐系统算法概览
    Android面试题:Scrollview内嵌一个Button他的事件消费是怎样的?Move,Down,Up分别被哪个组件消费?
    Java面试题:多线程交替打印字符串
    EventBus使用初体验
    Andoird面试题:A活动启动B活动,他们的生命周期各是怎么样的?
    Android在开发过程中如何选择compileSdkVersion,minSdkVersion和targetSdkVersion
    华为2020暑期实习面经(已拿Offer)
    工商银行软件开发中心2020暑期实习面经(已拿Offer)
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15176233.html
Copyright © 2011-2022 走看看