zoukankan      html  css  js  c++  java
  • Dubbo异步转同步

      Dubbo是一款开源的RPC中间件框架,底层数据传输默认使用的Netty,那么请求的处理理论上是异步的,为什么我们在使用的时候是同步的呢?肯定是Dubbo框架,做了异步转同步的处理。

      首先我们来梳理下,异步转同步,我们的需求是怎样的?

      1、调用方请求远程服务之后,需要等待结果,此刻,请求线程应该阻塞

      2、远程服务返回结果后,唤醒请求线程,调用方得到结果

      Dubbo异步转同步,核心类是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。

      DefaultFuture构造函数:

     1     private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
     2 
     3    // 每次请求都会生成一个DefaultFuture对象,然后保存到FUTURES中,请求返回结果时,根据id从FUTURES中找到对应的DefaultFuture对象,并删除
     4     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
     5 
     6     // AtomicLong从0开始递增,创建Request对象时生成的id
     7     private final long id;
     8     private final Channel channel;
     9     // 请求对象
    10     private final Request request;
    11     // 超时的设置
    12     private final int timeout;
    13     // 这里使用Lock和Condition实现等待通知机制
    14     private final Lock lock = new ReentrantLock();
    15     private final Condition done = lock.newCondition();
    16     private final long start = System.currentTimeMillis();
    17     private volatile long sent;
    18     // 请求的返回结果
    19     private volatile Response response;
    20     private volatile ResponseCallback callback;
    21 
    22     public DefaultFuture(Channel channel, Request request, int timeout) {
    23         this.channel = channel;
    24         this.request = request;
    25         this.id = request.getId();
    26         this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    27         // put into waiting map.
    28         FUTURES.put(id, this);
    29         CHANNELS.put(id, channel);
    30     }

      get():

     1 public Object get(int timeout) throws RemotingException {
     2         if (timeout <= 0) {
     3             timeout = Constants.DEFAULT_TIMEOUT;
     4         }
     5         // isDone()方法就是判断Response是否有值(即是否有返回结果)
     6         if (!isDone()) {
     7             long start = System.currentTimeMillis();
     8             lock.lock();
     9             try {
    10                 while (!isDone()) {
    11                     // 超时等待
    12                     done.await(timeout, TimeUnit.MILLISECONDS);
    13                     // 如果有返回结果了,或者,超时了,就退出循环
    14                     if (isDone() || System.currentTimeMillis() - start > timeout) {
    15                         break;
    16                     }
    17                 }
    18             } catch (InterruptedException e) {
    19                 throw new RuntimeException(e);
    20             } finally {
    21                 lock.unlock();
    22             }
    23             // 如果是超时了,就抛出异常
    24             if (!isDone()) {
    25                 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
    26             }
    27         }
    28         // 远程服务正常返回结果,则返回给调用方
    29         return returnFromResponse();
    30     }

      received(Channel channel, Response response):

     1 public static void received(Channel channel, Response response) {
     2         try {
     3             // 根据请求id从FUTURES中获取DefaultFuture,并删除
     4             DefaultFuture future = FUTURES.remove(response.getId());
     5             if (future != null) {
     6                 future.doReceived(response);
     7             } else {
     8                 logger.warn("The timeout response finally returned at "
     9                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
    10                         + ", response " + response
    11                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    12                         + " -> " + channel.getRemoteAddress()));
    13             }
    14         } finally {
    15             // CHANNELS也删除
    16             CHANNELS.remove(response.getId());
    17         }
    18     }
     1 private void doReceived(Response res) {
     2         lock.lock();
     3         try {
     4             response = res;
     5             if (done != null) {
     6                 // 唤醒阻塞的线程
     7                 done.signal();
     8             }
     9         } finally {
    10             lock.unlock();
    11         }
    12         if (callback != null) {
    13             invokeCallback(callback);
    14         }
    15     }

      总结:Dubbo异步转同步的原理,其实就是利用Lock和Condition实现了等待通知机制。请求与返回结果进行匹配,则是通过传递以及接收请求id实现的。

  • 相关阅读:
    grad-cam 、cam 和热力图,基于keras的实现
    高斯过程(转)
    Keras中使用LSTM层时设置的units参数是什么
    Real Time Credit Card Fraud Detection with Apache Spark and Event Streaming
    NodeJs+http+fs+request+cheerio 采集,保存数据,并在网页上展示(构建web服务器)
    NodeJs+Request+Cheerio 采集数据
    数组与对象的深浅复制
    Git(进击学习:远程仓库操作)-V3.0
    牛逼的css3:动态过渡与图形变换
    Git(远程仓库:git@oschina)-V2.0
  • 原文地址:https://www.cnblogs.com/dushenzi/p/12369955.html
Copyright © 2011-2022 走看看