zoukankan      html  css  js  c++  java
  • FutureTask子任务取消执行的状态判断

    示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git
    一、业务场景:
      系统中存在多种场景并发操作事务执行时互锁的情况,导致任务积压,系统崩溃。先做了各场景业务的性能调整,但是并发互锁依然无法避免。于是开始考虑选取调用频繁的同步功能作为死锁的牺牲品,取消执行,释放锁。
     
    二、处理方案:
      在这里优先选择FutureTask.cancel方案。核心思想是 调用FutureTask的get方法时,设置超时时长。接收到超时异常后,调用cancel方法,中断线程。当然,实际来看这个方案也满足不了我的业务需要。它存在以下两个局限:
    • cancel方法只是向子线程发起中断请求,是否能够中断取决于子线程自身,不能确定子线程会在哪一步操作退出,加入启用的有事务,这个事务可能回滚了,也可能提交成功了。因此,我们需要借用synchronized功能,让父子线程通讯,来明确获得子线程的运行状态;
    • 子线程中执行数据库操作,引起死锁等待,这种情况下cancle操作是不能取消任务了,只能等到事务超时。这个问题由于cancel无法强制关闭线程,因此无法用FutureTask方案。

      以下实现依然围绕FutureTask这个方案来将,只是添加父子线程通讯,明确获取子线程状态的实现。

    三、代码实现:

      3.1、创建一个FTaskEndFlag的线程同步标志。父线程等待子线程反馈执行结果后,再执行后续的逻辑;

    package simm.framework.threadutils.interrupt;
    
    import java.util.concurrent.TimeoutException;
    /**
     * futuretask运行终止事件通知
     * 2018.09.22 by simm
     */
    public class FTaskEndFlag {
        private volatile boolean isNormaled = false;
        private volatile boolean fired = false;
        private Exception exception =null;
    
        public boolean isNormaled() {
            return isNormaled;
        }
    
        /**
         * 获取子线程异常信息
         * @return
         */
        public Exception getException() {
            return exception;
        }
    
        /**
         * 通知结束
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result){
            isNormaled = result;
            fired = true;
            notifyAll();
        }
    
        /**
         * 通知结束
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result,Exception ex){
            isNormaled = result;
            exception = ex;
            fired = true;
            notifyAll();
        }
    
        /**
         * 执行结束通知
         */
        public synchronized void waitForEnd() throws InterruptedException {
            while (!fired) {
                //子线程挂起,释放synchronized同步块
                wait();
            }
        }
       /**
         * 等待
         */
        private void waitFunc(long millis){
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      3.2、创建一个BaseFutureTask的抽象类,内置FTaskEndFlag线程同步标志;

    package simm.framework.threadutils.interrupt;
    
    import java.util.concurrent.Callable;
    
    /**
     * 基础任务
     * 2018.09.22 by simm
     */
    public abstract class BaseFutureTask implements Callable<Boolean> {
        /**
         * futuretask 等待标志
         */
        private FTaskEndFlag flag = new FTaskEndFlag();
    
        public FTaskEndFlag getFlag() {
            return flag;
        }
    }

      3.3、创建一个超时重试的工具类,对FutureTask的结果获取设置超时时间;

    package simm.framework.threadutils.interrupt;
    
    import java.lang.reflect.Constructor;
    import java.util.List;
    import java.util.concurrent.*;
    /**
     * 方法超时重试工具
     * 2018.09.20  by simm
     */
    public class RetryUtil {
        /**
         * 可缓存线程执行器(依jvm情况自行回收创建)
         */
        private static ExecutorService executorService = Executors.newCachedThreadPool();
    
        /**
         * 默认方法(3秒超时,重试3次)
         * @param callable
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         * @throws TimeoutException
         */
        public static Boolean execute(BaseFutureTask callable) throws InterruptedException, ExecutionException, TimeoutException {
            return execute(callable,3000,1000,3);
        }
    
        /**
         * 方法超时控制
         * @param callable 方法体
         * @param timeout 超时时长
         * @param interval 间隔时长
         * @param retryTimes 重试次数
         * @return
         * @throws ExecutionException
         * @throws InterruptedException
         * @throws TimeoutException
         */
        public static Boolean execute(BaseFutureTask callable, long timeout,long interval, int retryTimes) throws ExecutionException, InterruptedException, TimeoutException {
            Boolean result = false;
            FutureTask<Boolean> futureTask = new FutureTask<>(callable);
            executorService.execute(futureTask);
            try {
                result = futureTask.get(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                futureTask.cancel(true);
                throw e;
            }catch(TimeoutException e){
                futureTask.cancel(true);
                callable.getFlag().waitForEnd();
                if(callable.getFlag().isNormaled()){
                    return true;
                }
                e.printStackTrace();
                //超时重试
                retryTimes--;
                if(retryTimes > 0){
                    Thread.sleep(interval);
                    execute(callable,timeout,interval,retryTimes);
                }else{
                    throw e;
                }
            }
            return result;
        }
    }

    四、给出一个调用代码。实现一个继承自BaseFutureTask的 FutureTask任务。依旧需要注意子线程涉及到spring的组件,最好是参数从主线程注入到子线程。

    RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);

    参考文章

    https://www.jianshu.com/p/55221d045f39

  • 相关阅读:
    数据库连接池使用(一):使用C#数据库连接池
    开包即食的教程带你浅尝最新开源的C# Web引擎Blazor
    HTTP协议之内容协商
    浅谈Cookie、Session与Cache的区别
    应用程序框架(一):DDD分层架构:领域实体(基础篇)
    持续集成是什么?
    Git版本控制 Git、github,gitlab相关操作
    Spring-Cloud-Netflix-Ribbon&Feigen
    Spring-Cloud-Netflix-Eureka集群搭建
    java面试题-javaSE基础
  • 原文地址:https://www.cnblogs.com/MrSi/p/9691092.html
Copyright © 2011-2022 走看看