zoukankan      html  css  js  c++  java
  • Dubbo发布过程中,服务端调用过程

    接收请求

    服务端接收请求以后,我们从一个Handler看起,因为Netty的处理核心为Handler,因为我们找到了InternalDecoder,消息经过处理后,调用了Channels.fireMessageReceived()方法

    final class NettyCodecAdapter {
        private final Codec2 codec;
        private class InternalDecoder extends SimpleChannelUpstreamHandler {
            public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
                //从时间源中获取消息内容  o为BigEndianHeapChannelBuffer类型
                Object o = event.getMessage();
                if (!(o instanceof ChannelBuffer)) {
                    ctx.sendUpstream(event);
                    return;
                }
    
                ChannelBuffer input = (ChannelBuffer) o;
                int readable = input.readableBytes();
                if (readable <= 0) {
                    return;
                }
    
                com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
                if (buffer.readable()) {
                    if (buffer instanceof DynamicChannelBuffer) {
                        buffer.writeBytes(input.toByteBuffer());
                        message = buffer;
                    } else {
                        int size = buffer.readableBytes() + input.readableBytes();
                        message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                                size > bufferSize ? size : bufferSize);
                        message.writeBytes(buffer, buffer.readableBytes());
                        message.writeBytes(input.toByteBuffer());
                    }
                } else {
                    //进入此处,对message 进行装饰
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                            input.toByteBuffer());
                }
                //获取channel
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
                Object msg;
                int saveReaderIndex;
    
                try {
                    // decode object.
                    do {
                        saveReaderIndex = message.readerIndex();
                        try {
                            //codec = DubboCountCodec
                            msg = codec.decode(channel, message);
                        } catch (IOException e) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw e;
                        }
                        //msg = Request [id=5, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=queryList, 
    //parameterTypes=[], arguments=[], attachments={path=com.bail.user.service.IUserService, input=191, dubbo=2.6.2,
    // interface=com.bail.user.service.IUserService, version=1.0.0, timeout=8000}]]
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            if (saveReaderIndex == message.readerIndex()) {
                                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                //触发消息接收动作                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                            }
                        }
                    } while (message.readable());
                } finally {
                    if (message.readable()) {
                        message.discardReadBytes();
                        buffer = message;
                    } else {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    }
                    NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
                }
            }
        }
    }
    

    消息接收

    Channels.fireMessageReceived()方法内部调用了ctx.sendUpstream()方法,方法如下

    public class Channels {
        public static void fireMessageReceived(
                ChannelHandlerContext ctx, Object message, SocketAddress remoteAddress) {
            //ctx = DefaultChannelHandlerContext
            ctx.sendUpstream(new UpstreamMessageEvent(
                    ctx.getChannel(), message, remoteAddress));
        }
    }
    

    接着调用DefaultChannelPipeline的sendUpstream方法,往下继续执行,调用的是SimpleChannelHandler的sendUpstream方法

    public class DefaultChannelPipeline implements ChannelPipeline {
            public void sendUpstream(ChannelEvent e) {
                DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
                if (next != null) {
                    DefaultChannelPipeline.this.sendUpstream(next, e);
                }
            }
    }
    

    SimpleChannelHandler的sendUpstream方法根据不同的事件类型调用不同的处理方法,当接收请求的时候,事件类型为MessageEvent,此处调用messageReceived方法,messageReceived调用具体实现方法

    public class SimpleChannelHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
        public void handleUpstream(
                ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    
            if (e instanceof MessageEvent) {
                messageReceived(ctx, (MessageEvent) e);
            } else if (e instanceof WriteCompletionEvent) {
                WriteCompletionEvent evt = (WriteCompletionEvent) e;
                writeComplete(ctx, evt);
            } else if (e instanceof ChildChannelStateEvent) {
                ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
                if (evt.getChildChannel().isOpen()) {
                    childChannelOpen(ctx, evt);
                } else {
                    childChannelClosed(ctx, evt);
                }
            } else if (e instanceof ChannelStateEvent) {
                ChannelStateEvent evt = (ChannelStateEvent) e;
                switch (evt.getState()) {
                case OPEN:
                    if (Boolean.TRUE.equals(evt.getValue())) {
                        channelOpen(ctx, evt);
                    } else {
                        channelClosed(ctx, evt);
                    }
                    break;
                case BOUND:
                    if (evt.getValue() != null) {
                        channelBound(ctx, evt);
                    } else {
                        channelUnbound(ctx, evt);
                    }
                    break;
                case CONNECTED:
                    if (evt.getValue() != null) {
                        channelConnected(ctx, evt);
                    } else {
                        channelDisconnected(ctx, evt);
                    }
                    break;
                case INTEREST_OPS:
                    channelInterestChanged(ctx, evt);
                    break;
                default:
                    ctx.sendUpstream(e);
                }
            } else if (e instanceof ExceptionEvent) {
                exceptionCaught(ctx, (ExceptionEvent) e);
            } else {
                ctx.sendUpstream(e);
            }
        }
    }
    

    messageReceived

    根据消息获取channel信道,调用AbstractPeer的received,实际上调用的是MultiMessageHandler的received方法

    public class NettyHandler extends SimpleChannelHandler {
    
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            //获取通道
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            try {
                handler.received(channel, e.getMessage());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
    }
    

    获取channel

    获取到NettyChannel ,返回到messageReceived

    final class NettyChannel extends AbstractChannel {
        static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
            if (ch == null) {
                return null;
            }
            NettyChannel ret = channelMap.get(ch);
            if (ret == null) {
                //handler = nettyServer
                NettyChannel nc = new NettyChannel(ch, url, handler);
                if (ch.isConnected()) {
                    ret = channelMap.putIfAbsent(ch, nc);
                }
                if (ret == null) {
                    ret = nc;
                }
            }
            //返回一个NettyChannel
            return ret;
        }
    }
    

    received

    public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
    
        public MultiMessageHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage) message;
                for (Object obj : list) {
                    handler.received(channel, obj);
                }
            } else {
                //此处的handler是一个HeartbeatHandler
                handler.received(channel, message);
            }
        }
    }
    
    
    • HeartbeatHandler
    public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
        public void received(Channel channel, Object message) throws RemotingException {
            setReadTimestamp(channel);
            if (isHeartbeatRequest(message)) {
                Request req = (Request) message;
                if (req.isTwoWay()) {
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setEvent(Response.HEARTBEAT_EVENT);
                    channel.send(res);
                    if (logger.isInfoEnabled()) {
                        int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                    + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                        }
                    }
                }
                return;
            }
            if (isHeartbeatResponse(message)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
                }
                return;
            }
            //进入最后的方法
            handler.received(channel, message);
        }
    
    }
    

    AllChannelHandler
    此处利用多线程,包装了一个多线程任务执行,根据信道、处理器、信道状态、消息包装了一个线程任务,接下里我们分析消息任务

    public class AllChannelHandler extends WrappedChannelHandler {
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                //此处利用多线程,包装了一个多线程任务执行
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
                //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            	if(message instanceof Request && t instanceof RejectedExecutionException){
            		Request request = (Request)message;
            		if(request.isTwoWay()){
            			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
            			Response response = new Response(request.getId(), request.getVersion());
            			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
            			response.setErrorMessage(msg);
            			channel.send(response);
            			return;
            		}
            	}
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
        private ExecutorService getExecutorService() {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) {
                cexecutor = SHARED_EXECUTOR;
            }
            return cexecutor;
        }
    }
    

    消息任务处理

    首先我们看一下ChannelEventRunnable,构造函数创建任务以后,执行任务内容run()方法

    public class ChannelEventRunnable implements Runnable {
        private final ChannelHandler handler;
        private final Channel channel;
        private final ChannelState state;
        private final Throwable exception;
        private final Object message;
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
            this(channel, handler, state, message, null);
        }
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
            this.channel = channel;
            this.handler = handler;
            this.state = state;
            this.message = message;
            this.exception = exception;
        }
    
    public void run() {
            //此时,state = reveived,调用handler的reveived方法,handler = DecodeHandler
            switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case RECEIVED:
                    try {
                        handler.received(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
            }
        }
    }
    

    DecodeHandler

    调用DecodeHandler的received方法,此时message = Request,从message中获取RpcInvocation,处理之后继续执行handler.received(channel, message)方法,此时handler = HeaderExchangeHandler

    • DecodeHandler
    public class DecodeHandler extends AbstractChannelHandlerDelegate {
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                decode(((Response) message).getResult());
            }
    
            handler.received(channel, message);
        }
    
        private void decode(Object message) {
            if (message != null && message instanceof Decodeable) {
                try {
                    //进入解码过程,调用message的decode方法,即RpcInvocation的decode方法,此时调用的是DecodeableRpcInvocation的decode方法。此处更像一个补偿机制,方法内部直接返回。
                    ((Decodeable) message).decode();
                    if (log.isDebugEnabled()) {
                        log.debug("Decode decodeable message " + message.getClass().getName());
                    }
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                    }
                } // ~ end of catch
            } // ~ end of if
        } 
    }
    
    • DecodeableRpcInvocation
    public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
        public void decode() throws Exception {
            if (!hasDecoded && channel != null && inputStream != null) {
                try {
                    decode(channel, inputStream);
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                    }
                    request.setBroken(true);
                    request.setData(e);
                } finally {
                    hasDecoded = true;
                }
            }
        }
    }
    

    HeaderExchangeHandler

    接着调用HeaderExchangeHandler的received方法,强转成request,然后调用handleRequest方法,构造一个Response对象,调用reply方法返回结果,并赋值Response对象。然后通过channel将结果返回。

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    //强转成request
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            //调用handleRequest方法
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    
        Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
            //构造一个Response对象
            Response res = new Response(req.getId(), req.getVersion());
            if (req.isBroken()) {
                Object data = req.getData();
    
                String msg;
                if (data == null) msg = null;
                else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                else msg = data.toString();
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus(Response.BAD_REQUEST);
    
                return res;
            }
            // find handler by message class.
            Object msg = req.getData();
            //msg=RpcInvocation
            try {
                // handle data.
                //此时的handler = DubboProtocol的一个内部成员变量 requestHandler
                //调用处理器的reply响应方法
                Object result = handler.reply(channel, msg);
                res.setStatus(Response.OK);
                res.setResult(result);
            } catch (Throwable e) {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
            }
            return res;
        }
    }
    
    • DubboProtocol
      调用reply方法,消息类型为Invocation类型,根据调用信息获取一个调用者,调用的是DubboProtocol类的getInvoker()方法。
      根据serviceKey从暴露缓存容器中获取暴露服务,然后调用暴露服务获取调用者。然后调用调用者的invoke方法。
    public class DubboProtocol extends AbstractProtocol {
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                //消息类型为Invocation类型
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    //根据调用信息获取一个调用者
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                    + " not found in callback service interface ,invoke will be ignored."
                                    + " please update the api interface. url is:"
                                    + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
      }
        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            boolean isCallBackServiceInvoke = false;
            boolean isStubServiceInvoke = false;
            //获取本地服务端口为20880,path=com.bail.user.service.IUserService
            int port = channel.getLocalAddress().getPort();
            String path = inv.getAttachments().get(Constants.PATH_KEY);
            // if it's callback service on client side
            //存根服务调用为false
            isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
            if (isStubServiceInvoke) {
                port = channel.getRemoteAddress().getPort();
            }
            //callback
            isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
            if (isCallBackServiceInvoke) {
                path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
                inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
            }
            //serviceKey = com.bail.user.service.IUserService:1.0.0:20880
            String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
            //然后根据serviceKey从暴露缓存容器中获取暴露服务
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
            if (exporter == null)
                throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
            //exporter = DubboExporter
            return exporter.getInvoker();
        }
    
    }
    

    调用暴露服务Exporter

    根据Exporter获取invoker,此时的invoker是一个被ProtocolFilterWrapper包装了的Protocol对象,该对象对invoker进行了Filter包装。返回一个invoker

    public abstract class AbstractExporter<T> implements Exporter<T> {
        private final Invoker<T> invoker;
        public Invoker<T> getInvoker() {
            return invoker;
        }
    }
    

    调用者执行

    从暴露者中获取到调用者之后,开始调用者的执行。首先调用者是ProtocolFilterWrapper类型对象,其中是各个Filter的执行,执行完Filter以后,执行InvokerWrapper的invoke()方法、DelegateProviderMetaDataInvoker的invoke方法,最后是实际Invoker的调用

    public class ProtocolFilterWrapper implements Protocol {
        private final Protocol protocol;
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
                        //进行invoke的调用
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    }
    

    Invoker的invoke

    AbstractProxyInvoker调用invoke,接着调用doInvoke,doInvoke最后调用的是动态代理类的doInvoke方法,然后返回结果对象。

    public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
        private final T proxy;//proxy = UserServiveImpl@NNNN
    
        private final Class<T> type; //type = com.bail.user.service.IUserService
    
        private final URL url;//url = registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-provider&dubbo=2.6.2&export=dubbo://192.168.137.210:20881/com.bail.user.service.IUserService?
    //anyhost=true&application=user-provider&bind.ip=192.168.137.210&bind.port=20881&dubbo=2.6.2&generic=false&getUserById.retries=3&getUserBy
    Id.timeout=3000
    //&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=3120&retries=2&revision=1.0.0
    //&side=provider&timeout=8000&timestamp=1638518404364&version=1.0.0&pid=3120&registry=zookeeper&timeout=7200&timestamp=1638518404360
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            try {
                return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
            } catch (InvocationTargetException e) {
                return new RpcResult(e.getTargetException());
            } catch (Throwable e) {
                throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • JavassistProxyFactory
      调用的是在生成代理类调用者的doInvoke方法
    public class JavassistProxyFactory extends AbstractProxyFactory {
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    }
    

    结果返回

    通过channel返回

    • AbstractPeer
    public abstract class AbstractPeer implements Endpoint, ChannelHandler {
        @Override
        public void send(Object message) throws RemotingException {
            send(message, url.getParameter(Constants.SENT_KEY, false));
        }
    }
    
    • NettyChannel
      调用父类的send,继续执行channel.write()方法,然后调用的是底层netty的方法,将结果返回
    final class NettyChannel extends AbstractChannel {
        public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.write(message);
                if (sent) {
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.getCause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
    
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
    }
    
    • AbstractChannel
      对channel状态进行验证,然后返回NettyChannel
    public abstract class AbstractChannel extends AbstractPeer implements Channel {
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (isClosed()) {
                throw new RemotingException(this, "Failed to send message "
                        + (message == null ? "" : message.getClass().getName()) + ":" + message
                        + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
            }
        }
    }
    
  • 相关阅读:
    Selenium操作之滚动条
    IntelliJ IDEA代码编码区提示库源不匹配字节码解决办法
    自动化测试浅谈
    json-lib解析json之二维JSONArray
    Java 多态
    Java0基础教程——java的安装
    LAYUI弹出层详解
    ajax实现动态URL
    js serialize()
    TP5.1接入支付宝实现网页/APP支付完整请求回调流程(沙箱环境)
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15637490.html
Copyright © 2011-2022 走看看