zoukankan      html  css  js  c++  java
  • java如何实现一个Future

    实现Futrue接口

    public class MsgFuture<V> implements java.util.concurrent.Future<V> {
        ...
        ...
    }    
    

      

    Future的主要特性为Future.get()、

      get()
     get(long timeout, TimeUnit unit)

    主要思路如下:
    构造MsgFuture时,设置开始时间,这里是sendTime;设置timeout,默认get()方法的超时时间,我们的程序不可能会无限等待
    默认的get()对应的值域是result,默认为一个NULL对象,标识没有返回数据

    result的值需要其他线程在做完任务后将值写到Future对象中,这里暴露了一个方法setResult(object)
        /**
         * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
         * @param result
         */
        public synchronized void setResult(Object result) {
            reentrantLock.lock();
            try {
                this.result = result;
                condition.signalAll();
            }finally {
                reentrantLock.unlock();
            }
    
        }
    

      使用ReentrantLock来进行数据可见性控制

           condition.signalAll()可以唤醒condition.await的阻塞wait

           至于其他线程如何调用到setResult(object)方法,可以使用ConcurrentHashMap,key为msgId,值为MsgFuture对象,设置成一个全局的,或两个线程都可访问,其他线程根据msgId获取到MsgFuture,然后调用setResult(object)方法

        /**
         * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         * @throws TimeoutException
         */
        @SuppressWarnings("all")
        public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
            long left = getLeftTime(timeout, unit);                      //根据timeout配置获取剩余的世界
            if(left < 0){
                //已经没有剩余时间
                if(isDone()){                                            //如果已经完成,直接放回结果
                    return (V)this.result;
                }else{
                    //timeout
                    throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
                }
            }else{
    
                reentrantLock.lock();                                    //同步
                try {
                    //获取锁后先判断是否已经完成,防止无意义的await
                    if(isDone()){                                        //先判断是否已经完成 
                        return (V)this.result;                           //直接返回  
                    }
                    logger.debug("await "+left+" ms");
                    condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);       //没有返回,阻塞等待,如果condition被唤醒,也会提前退出
                }finally {
                    reentrantLock.unlock();
                }
                if(isDone()){                                            //被唤醒或超时时间已到,尝试判断是否完成
                    return (V)this.result;                               //返回
                }
    
                throw new TimeoutException("未获取到结果");                //超时  
            }
        }
    

      

        public boolean isDone() {
            return this.result != NULL;
        }
    

      



    全部代码
    public class MsgFuture<V> implements java.util.concurrent.Future<V> {
    
        private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class);
    
        /**
         * 全局的空对象,如果Future获取到值了,那么一定不是NULL
         */
        private final static Object NULL = new Object();
        /**
         * 主锁
         */
        private final ReentrantLock reentrantLock = new ReentrantLock();
    
        /**
         * 条件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法来实现阻塞、唤醒
         */
        private final Condition condition = reentrantLock.newCondition();
    
        private int timeout;
    
        private volatile Object result = NULL;
    
        private long sendTime;
    
        public MsgFuture(int timeout, long sendTime) {
            this.timeout = timeout;
            this.sendTime = sendTime;
        }
    
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }
    
        public boolean isCancelled() {
            return false;
        }
    
        public boolean isDone() {
            return this.result != NULL;
        }
    
        /**
         * 获取future结果
         * @return
         * @throws InterruptedException
         */
        public V get() throws InterruptedException {
            logger.debug("sendTime:{}",sendTime);
            try {
                return get(timeout, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                logger.error("获取future结果异常", e);
            }
            return null;
        }
    
        /**
         * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         * @throws TimeoutException
         */
        @SuppressWarnings("all")
        public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
            long left = getLeftTime(timeout, unit);
            if(left < 0){
                //已经没有剩余时间
                if(isDone()){
                    return (V)this.result;
                }else{
                    //timeout
                    throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
                }
            }else{
    
                reentrantLock.lock();
                try {
                    //获取锁后先判断是否已经完成,防止无意义的await
                    if(isDone()){
                        return (V)this.result;
                    }
                    logger.debug("await "+left+" ms");
                    condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);
                }finally {
                    reentrantLock.unlock();
                }
                if(isDone()){
                    return (V)this.result;
                }
    
                throw new TimeoutException("未获取到结果");
            }
        }
    
        /**
         * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
         * @param result
         */
        public synchronized void setResult(Object result) {
            reentrantLock.lock();
            try {
                this.result = result;
                condition.signalAll();
            }finally {
                reentrantLock.unlock();
            }
    
        }
    
        /**
         * 计算剩余时间
         * @param timeout
         * @param unit
         * @return
         */
        private long getLeftTime(long timeout, TimeUnit unit){
            long now = System.currentTimeMillis();
            timeout = unit.toMillis(timeout); // 转为毫秒
            return timeout - (now - sendTime);
        }
    
        /*public static void main(String[] args) {
            MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis());
    
            //测试先唤醒、后get是否正常
            msgFuture.setResult("yoxi");
    
            try {
                System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                logger.error("Interrupt异常", e);
            } catch (TimeoutException e) {
                logger.error("测试先唤醒,后get出错", e);
            }
        }*/
    }
    

      





  • 相关阅读:
    减肥 day1
    这两天的学习内容
    小马激活工具激活系统导致系统崩溃
    mac OS X下Java项目环境搭建+IntelliJ IDEA Jrebel插件安装与破解+Office 2016破解版安装
    JDK 动态代理实现原理
    php ajax提交post请求出现数组被截断情况的解决方法
    mac OS X下git代码行统计命令
    mac OS X下安装Redis及Thinkphp3.1使用Redis
    linux服务器git pull/push时提示输入账号密码之免除设置
    linux服务器修改ftp默认21端口方法
  • 原文地址:https://www.cnblogs.com/windliu/p/8087169.html
Copyright © 2011-2022 走看看