zoukankan      html  css  js  c++  java
  • 9.3 客户端接收响应信息(异步转同步的实现)

    一 总体流程

    客户端接收响应消息
    NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -->MultiMessageHandler.received(Channel channel, Object message)
      -->HeartbeatHandler.received(Channel channel, Object message)
        -->AllChannelHandler.received(Channel channel, Object message)
          -->ExecutorService cexecutor = getExecutorService()
          -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
            -->ChannelEventRunnable.run()
              -->DecodeHandler.received(Channel channel, Object message)
                -->decode(Object message)
                -->HeaderExchangeHandler.received(Channel channel, Object message)
                  -->handleResponse(Channel channel, Response response)
                    -->DefaultFuture.received(channel, response)
                      -->doReceived(Response res)//异步转同步

    二 源码解析

    在HeaderExchangeHandler.received(Channel channel, Object message)方法之前,与服务端接收请求消息一样,不再赘述。

    HeaderExchangeHandler.received(Channel channel, Object message)

     1     public void received(Channel channel, Object message) throws RemotingException {
     2         ...
     3         try {
     4             if (message instanceof Request) {
     5                 ...
     6             } else if (message instanceof Response) {
     7                 handleResponse(channel, (Response) message);
     8             } else if (message instanceof String) {
     9                 ...
    10             } else {
    11                 ...
    12             }
    13         } finally {
    14             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    15         }
    16     }
    17 
    18     static void handleResponse(Channel channel, Response response) throws RemotingException {
    19         if (response != null && !response.isHeartbeat()) {
    20             DefaultFuture.received(channel, response);
    21         }
    22     }

    DefaultFuture.received(Channel channel, Response response)

     1     private final long id;
     2     private final Request request;
     3     private final int timeout;
     4     private volatile Response response;
     5     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
     6     private final Condition done = lock.newCondition();
     7 
     8     public static void received(Channel channel, Response response) {
     9         try {
    10             DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
    11             if (future != null) {
    12                 future.doReceived(response);
    13             } else {
    14                 logger.warn("The timeout response finally returned at "
    15                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
    16                         + ", response " + response
    17                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    18                         + " -> " + channel.getRemoteAddress()));
    19             }
    20         } finally {
    21             CHANNELS.remove(response.getId());
    22         }
    23     }
    24 
    25     private void doReceived(Response res) {
    26         lock.lock();
    27         try {
    28             //设置response
    29             response = res;
    30             if (done != null) {
    31                 //唤醒阻塞的线程
    32                 done.signal();
    33             }
    34         } finally {
    35             lock.unlock();
    36         }
    37         if (callback != null) {
    38             invokeCallback(callback);
    39         }
    40     }

    这里比较难懂,笔者再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)

     1     public ResponseFuture request(Object request, int timeout) throws RemotingException {
     2         if (closed) {
     3             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
     4         }
     5         // create request.
     6         Request req = new Request();
     7         req.setVersion("2.0.0");
     8         req.setTwoWay(true);
     9         req.setData(request);
    10         DefaultFuture future = new DefaultFuture(channel, req, timeout);
    11         try {
    12             channel.send(req);
    13         } catch (RemotingException e) {
    14             future.cancel();
    15             throw e;
    16         }
    17         return future;
    18     }

    netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类,

    在HeaderExchangeChannel.request(Object request, int timeout),在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。

    1                   -->DubboInvoker.doInvoke(final Invocation invocation)
    2                     //获取ExchangeClient进行消息的发送
    3                     -->ReferenceCountExchangeClient.request(Object request, int timeout)
    4                       -->HeaderExchangeClient.request(Object request, int timeout)
    5                         -->HeaderExchangeChannel.request(Object request, int timeout)

    DubboInvoker.doInvoke(final Invocation invocation)

     1 protected Result doInvoke(final Invocation invocation) throws Throwable {
     2         RpcInvocation inv = (RpcInvocation) invocation;
     3         final String methodName = RpcUtils.getMethodName(invocation);
     4         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
     5         inv.setAttachment(Constants.VERSION_KEY, version);
     6 
     7         ExchangeClient currentClient;
     8         if (clients.length == 1) {
     9             currentClient = clients[0];
    10         } else {
    11             currentClient = clients[index.getAndIncrement() % clients.length];
    12         }
    13         try {
    14             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
    15             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
    16             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    17             if (isOneway) {
    18                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    19                 currentClient.send(inv, isSent);
    20                 RpcContext.getContext().setFuture(null);
    21                 return new RpcResult();
    22             } else if (isAsync) {
    23                 ResponseFuture future = currentClient.request(inv, timeout);
    24                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    25                 return new RpcResult();
    26             } else {
    27                 RpcContext.getContext().setFuture(null);
    28                 return (Result) currentClient.request(inv, timeout).get();
    29             }
    30         } catch (TimeoutException e) {
    31             throw new RpcException(...);
    32         } catch (RemotingException e) {
    33             throw new RpcException(...);
    34         }
    35     }

    其中currentClient.request(inv, timeout)返回值是ResponseFuture,DefaultFuture是ResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()。

     1     public Object get() throws RemotingException {
     2         return get(timeout);
     3     }
     4 
     5     public Object get(int timeout) throws RemotingException {
     6         if (timeout <= 0) {
     7             timeout = Constants.DEFAULT_TIMEOUT;
     8         }
     9         if (!isDone()) {
    10             long start = System.currentTimeMillis();
    11             lock.lock();
    12             try {
    13                 while (!isDone()) {
    14                     //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
    15                     done.await(timeout, TimeUnit.MILLISECONDS);
    16                     if (isDone() || System.currentTimeMillis() - start > timeout) {
    17                         break;
    18                     }
    19                 }
    20             } catch (InterruptedException e) {
    21                 throw new RuntimeException(e);
    22             } finally {
    23                 lock.unlock();
    24             }
    25             if (!isDone()) {
    26                 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
    27             }
    28         }
    29         return returnFromResponse();
    30     }
    31 
    32     public boolean isDone() {
    33         return response != null;
    34     }

    此处我们看到当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间到时了。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived:

    会先为response赋上返回值,之后执行condition的signal唤醒被阻塞的线程,get()方法就会释放锁,执行returnFromResponse(),返回值。

     1     private Object returnFromResponse() throws RemotingException {
     2         Response res = response;
     3         if (res == null) {
     4             throw new IllegalStateException("response cannot be null");
     5         }
     6         if (res.getStatus() == Response.OK) {
     7             return res.getResult();
     8         }
     9         if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
    10             throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    11         }
    12         throw new RemotingException(channel, res.getErrorMessage());
    13     }

    到现在其实还有一个问题?就是netty时异步非阻塞的,那么假设现在我发了1w个Request,后来返回来1w个Response,那么怎么对应Request和Response呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request和Response中都有一个属性id。

    在HeaderExchangeChannel.request(Object request, int timeout)中:

     1         Request req = new Request();
     2         req.setVersion("2.0.0");
     3         req.setTwoWay(true);
     4         req.setData(request);
     5         DefaultFuture future = new DefaultFuture(channel, req, timeout);
     6         try {
     7             channel.send(req);
     8         } catch (RemotingException e) {
     9             future.cancel();
    10             throw e;
    11         }
    12         return future;

    看一下Request的构造器:

     1     private static final AtomicLong INVOKE_ID = new AtomicLong(0);
     2     private final long mId;
     3 
     4     public Request() {
     5         mId = newId();
     6     }
     7 
     8     private static long newId() {
     9         // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
    10         return INVOKE_ID.getAndIncrement();
    11     }

    看一下DefaultFuture的构造器:

     1     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
     2     private final long id;
     3     private final Request request;
     4     private volatile Response response;
     5 
     6     public DefaultFuture(Channel channel, Request request, int timeout) {
     7         ...
     8         this.request = request;
     9         this.id = request.getId();
    10         ...
    11         FUTURES.put(id, this);
    12         ...
    13     }

    再来看一下响应。

    HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)

     1     Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
     2         Response res = new Response(req.getId(), req.getVersion());
     3         ...
     4         Object msg = req.getData();
     5         try {
     6             // handle data.
     7             Object result = handler.reply(channel, msg);
     8             res.setStatus(Response.OK);
     9             res.setResult(result);
    10         } catch (Throwable e) {
    11             res.setStatus(Response.SERVICE_ERROR);
    12             res.setErrorMessage(StringUtils.toString(e));
    13         }
    14         return res;
    15     }

    来看一下Response的构造器:

    1     private long mId = 0;
    2 
    3     public Response(long id, String version) {
    4         mId = id;
    5         mVersion = version;
    6     }

    这里response的id的值时request的id。最后来看一下服务端接收后的处理:

    DefaultFuture.received(Channel channel, Response response)

     1     public static void received(Channel channel, Response response) {
     2         try {
     3             DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
     4             if (future != null) {
     5                 future.doReceived(response);
     6             } else {
     7                ...
     8             }
     9         } finally {
    10             CHANNELS.remove(response.getId());
    11         }
    12     }
  • 相关阅读:
    20、职责链模式
    19、命令模式
    18、桥接模式
    17、单例模式
    javascript移动端实现企业图谱总结
    前端用js模拟疫情扩散开发总结
    移动端企业图谱开发兼容性等问题踩坑
    js实现企业图谱(pc端企业图谱项目总结与踩坑分享)
    基于vue脚手架的项目打包上线(发布)方法和误区
    实现一个网页版的聊天室(类似于钉钉群)
  • 原文地址:https://www.cnblogs.com/java-zhao/p/7822521.html
Copyright © 2011-2022 走看看