zoukankan      html  css  js  c++  java
  • ListenableFuture的状态同步和原子更新

    前言


    在Java8里的Future类实现中,引入了一种新的Future类:CompletableFuture。此类相比较于原来的Future类来说,最大的一点不同在于它可以完全异步执行结果回调。而在老Future模式下,用户等待结果是需要阻塞等待的,然后利用此结果做后续的操作。无疑这在效率上并不是很好。但其实在CompletableFuture出现之前,已经有类似特点的Future工具类的实现,它就是guava包里的ListenableFuture的实现。按照时间顺序,ListenableFuture实现在前,CompletableFuture在后。个人觉得,Java8的CompletableFuture在一定程度上应该还是借鉴了ListenableFuture的一些思想。二者的一个共通思想:一个可监听式的Future对象。今天笔者要聊的主题关乎里面的状态同步,关于ListenableFuture的子类内部实现,许多人可能未必清楚。

    ListenableFuture的监听添加


    ListenableFuture,ListenableFuture,关键词在里面的Listenable。而且它只有以下一个接口定义:

    public interface ListenableFuture<V> extends Future<V> {
      void addListener(Runnable listener, Executor executor);
    }
    

    所以我们先来看看里面的监听是如何增加的。说是监听,其实就是一个回调执行操作,实现定义如下:

      @Override
      public void addListener(Runnable listener, Executor exec) {
        executionList.add(listener, exec);
      }
    

    这里的executionList可不是线程池,而是一个执行列表,这里的监听runnable执行在给定的执行器内。executionList是被定义在ListenableFuture的实现子类中:

    public abstract class AbstractFuture<V> implements ListenableFuture<V> {
    
      /** Synchronization control for AbstractFutures. */
      private final Sync<V> sync = new Sync<V>();
    
      // The execution list to hold our executors.
      private final ExecutionList executionList = new ExecutionList();
      ...
    

    在这里我们看到了一个Sync对象,它是用来做什么的呢?我们继续往下看。

    ListenableFuture内的状态同步控制


    先不看Sync的具体实现,在ListenableFuture的子类实现的主要方法里,间接调用的都是Sync的同名方法,

      @Override
      public V get() throws InterruptedException, ExecutionException {
        return sync.get();
      }
    
      @Override
      public boolean isDone() {
        return sync.isDone();
      }
    
      @Override
      public boolean isCancelled() {
        return sync.isCancelled();
      }
    
      @Override
      public boolean cancel(boolean mayInterruptIfRunning) {
        if (!sync.cancel()) {
          return false;
        }
        // Future执行cancal操作时,也执行一把监听回调操作
        executionList.execute();
        if (mayInterruptIfRunning) {
          interruptTask();
        }
        return true;
      }
    

    换句话说,我们对一个ListenableFuture的操作调用其实是对Sync的一个操作调用。在这里我们基本可以有一个大概猜测:Sync是一个包装了Future实现的一个同步类,至于里面具体是怎么同步呢,为什么同步呢?我们继续往下看。

    ListenableFuture内的Sync同步


    我们直接来看里面的Sync定义说明:

      /**
       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
       * private subclass to hold the synchronizer.  This synchronizer is used to
       * implement the blocking and waiting calls as well as to handle state changes
       * in a thread-safe manner.  The current state of the future is held in the
       * Sync state, and the lock is released whenever the state changes to either
       * {@link #COMPLETED} or {@link #CANCELLED}.
       *
       * <p>To avoid races between threads doing release and acquire, we transition
       * to the final state in two steps.  One thread will successfully CAS from
       * RUNNING to COMPLETING, that thread will then set the result of the
       * computation, and only then transition to COMPLETED or CANCELLED.
       *
       * <p>We don't use the integer argument passed between acquire methods so we
       * pass around a -1 everywhere.
       */
      static final class Sync<V> extends AbstractQueuedSynchronizer {
    

    一句话简单概括,它是一个继承了AbstractQueuedSynchronizer(AQS)类的用于做future对象状态同步控制的操作类。这里其实假设了一个情况,会存在多线程同时操作某future对象的情况,通过AQS进行这些调用的阻塞同步控制,从而保证状态的原子更新操作,获取不到锁的线程会被置入一个FIFO的队列中进行等待。在AQS里,有分为互斥和共享两种模式,前后者的核心区别在于是否支持多个线程同时能够获取锁的情况。

    下面是Sync里面的complete实现,通过CAS操作来更新状态:

        private boolean complete(@Nullable V v, @Nullable Throwable t,
            int finalState) {
          boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
          if (doCompletion) {
            // If this thread successfully transitioned to COMPLETING, set the value
            // and exception and then release to the final state.
            this.value = v;
            this.exception = t;
            releaseShared(finalState);
          } else if (getState() == COMPLETING) {
            // If some other thread is currently completing the future, block until
            // they are done so we can guarantee completion.
            acquireShared(-1);
          }
          return doCompletion;
        }
    

    其实在JDK lock包下的许多锁的实现类中,都有用到类似Sync(AQS子类)做多线程的同步控制,比如ReentrantLock。

    其它状态原子更新方法


    在Hadoop中,同样有对于ListenableFuture的一个抽象类实现,不过它的内部不是AQS类做状态同步控制,而是实现3种更为高效的原子更新方法。按照不同情况,做逐一fall back的选择。以下代码供大家学习参考使用、

    以下代码的目的是更新AbstractFuture类中的waiters,listener这样的volatile类型变量。

      /**
       * This field encodes the current state of the future.
       * <p>
       * <p>The valid values are:
       * <ul>
       * <li>{@code null} initial state, nothing has happened.
       * <li>{@link Cancellation} terminal state, {@code cancel} was called.
       * <li>{@link Failure} terminal state, {@code setException} was called.
       * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
       * <li>{@link #NULL} terminal state, {@code set(null)} was called.
       * <li>Any other non-null value, terminal state, {@code set} was called with
       * a non-null argument.
       * </ul>
       */
      private volatile Object value;
    
      /**
       * All listeners.
       */
      private volatile Listener listeners;
    
      /**
       * All waiting threads.
       */
      private volatile Waiter waiters;
    
      /**
       * Constructor for use by subclasses.
       */
      protected AbstractFuture() {
      }
    

    首先是,调用比较底层的Unsafe包,更新对象按照地址偏移量进行对象更新。

      /**
       * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
       * <p>
       * <p>Static initialization of this class will fail if the
       * {@link sun.misc.Unsafe} object cannot be accessed.
       */
      private static final class UnsafeAtomicHelper extends AtomicHelper {
        static final sun.misc.Unsafe UNSAFE;
        static final long LISTENERS_OFFSET;
        static final long WAITERS_OFFSET;
        static final long VALUE_OFFSET;
        static final long WAITER_THREAD_OFFSET;
        static final long WAITER_NEXT_OFFSET;
    
        static {
          sun.misc.Unsafe unsafe = null;
          try {
            unsafe = sun.misc.Unsafe.getUnsafe();
          } catch (SecurityException tryReflectionInstead) {
            ...
          }
          try {
            Class<?> abstractFuture = AbstractFuture.class;
            WAITERS_OFFSET = unsafe
                .objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
            LISTENERS_OFFSET = unsafe
                .objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
            VALUE_OFFSET = unsafe
                .objectFieldOffset(abstractFuture.getDeclaredField("value"));
            WAITER_THREAD_OFFSET = unsafe
                .objectFieldOffset(Waiter.class.getDeclaredField("thread"));
            WAITER_NEXT_OFFSET = unsafe
                .objectFieldOffset(Waiter.class.getDeclaredField("next"));
            UNSAFE = unsafe;
          } catch (Exception e) {
            throwIfUnchecked(e);
            throw new RuntimeException(e);
          }
        }
    
        ...
    
        @Override
        void putThread(Waiter waiter, Thread newValue) {
          UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
        }
    
        @Override
        void putNext(Waiter waiter, Waiter newValue) {
          UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
        }
    
    
        /**
         * Performs a CAS operation on the {@link #listeners} field.
         */
        @Override
        boolean casListeners(
            AbstractFuture<?> future, Listener expect, Listener update) {
          return UNSAFE
              .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
        }
    
        /**
         * Performs a CAS operation on the {@link #value} field.
         */
        @Override
        boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
          return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
        }
      }
    

    方法二,通过基于反射原理的原子引用类型AtomicReferenceFieldUpdater类,来做变量的原子更新。

      /**
       * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}.
       */
     private static final class SafeAtomicHelper extends AtomicHelper {
        final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
        final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
        final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
        final AtomicReferenceFieldUpdater<AbstractFuture, Listener>
            listenersUpdater;
        final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
    
        SafeAtomicHelper(
            AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
            AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
            AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
            AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
            AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
          this.waiterThreadUpdater = waiterThreadUpdater;
          this.waiterNextUpdater = waiterNextUpdater;
          this.waitersUpdater = waitersUpdater;
          this.listenersUpdater = listenersUpdater;
          this.valueUpdater = valueUpdater;
        }
    
        @Override
        void putThread(Waiter waiter, Thread newValue) {
          waiterThreadUpdater.lazySet(waiter, newValue);
        }
    
        @Override
        void putNext(Waiter waiter, Waiter newValue) {
          waiterNextUpdater.lazySet(waiter, newValue);
        }
    
        @Override
        boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
            update) {
          return waitersUpdater.compareAndSet(future, expect, update);
        }
    
        @Override
        boolean casListeners(
            AbstractFuture<?> future, Listener expect, Listener update) {
          return listenersUpdater.compareAndSet(future, expect, update);
        }
    
        @Override
        boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
          return valueUpdater.compareAndSet(future, expect, update);
        }
      }
    

    方法三,基于synchronized关键字做字段更新,在效果上不及前二者:

      /**
       * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
       * <p>
       * <p>This is an implementation of last resort for when certain basic VM
       * features are broken (like AtomicReferenceFieldUpdater).
       */
      private static final class SynchronizedHelper extends AtomicHelper {
        @Override
        void putThread(Waiter waiter, Thread newValue) {
          waiter.thread = newValue;
        }
    
        @Override
        void putNext(Waiter waiter, Waiter newValue) {
          waiter.next = newValue;
        }
    
        @Override
        boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
            update) {
          synchronized (future) {
            if (future.waiters == expect) {
              future.waiters = update;
              return true;
            }
            return false;
          }
        }
    
        @Override
        boolean casListeners(
            AbstractFuture<?> future, Listener expect, Listener update) {
          synchronized (future) {
            if (future.listeners == expect) {
              future.listeners = update;
              return true;
            }
            return false;
          }
        }
    
        @Override
        boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
          synchronized (future) {
            if (future.value == expect) {
              future.value = update;
              return true;
            }
            return false;
          }
        }
      }
    

    上面的CAS操作已经基本是标准的CAS算法步骤了。
    以上就是3个简单的原子更新器的简单实现类。

    引用


    [1].https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java

  • 相关阅读:
    C++ 多线程编程
    协程简介(coroutine)
    Yanhua Digimaster 3如何使用免焊适配器重置仪表板?
    Autel OTOFIX IM1 远程/在线技术支持指南
    Xhorse奥迪免焊适配器套装功能列表+常见问题
    如何通过 DDD 构建一辆汽车
    周末复习一波Linux,Linux常用命令总结,还有语法+案例
    Dubbo 基础知识
    GIT版本控制学习博客
    C++检测和定位内存泄漏
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183558.html
Copyright © 2011-2022 走看看