zoukankan      html  css  js  c++  java
  • 利用Thread.stop完成方法执行超时中断

    示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git
    一、业务场景:
      系统中存在多种场景并发操作事务执行时互锁的情况,导致任务积压,系统崩溃。先做了各场景业务的性能调整,但是并发互锁依然无法避免。于是开始考虑选取调用频繁的同步功能作为死锁的牺牲品,取消执行,释放锁。
     
    二、处理方案:
      由于FutureTask.cancel方案无法解决数据库死锁问题。因此接下来切换方案,改为Thread.stop实现,强行杀掉子线程。核心思想是 把需要监控的操作 包装成一个子线程,启动以后主线程开启一个运行时间监控,当判断到子线程已经超出时限,则调用stop方法,强行中断(需要说明一点,stop操作可能会带来一些未知的异常状态,jdk已经将其标记为过时,不建议使用,这里需要个人自行评估方案的影响)。 
      那么只做一个stop子线程的操作,能同时解决掉其对数据库的锁定吗?经过验证这是不可能的,对数据库的锁定需要在事务提交或回滚时 释放。事务是由spring框架自行接管处理的,在发生死锁,子线程未结束的情况下,事务即使到了超时时限,但是由于没有新的sql执行语句触发其超时校验,spring事务也无法自行结束。它的结束点为 数据库事务锁等待超时。因此,现在我们想要让事务提前回滚还需要一个操作,设置 @Transactional的timeout时间,指定为一个较短的时间,子线程被stop后,spring会自动对该事务进行回滚,从而达到数据库解锁的目的。
     

    三、代码实现:

      3.1、创建一个FTaskEndFlag的线程同步标志。父线程等待子线程反馈执行结果后,再执行后续的逻辑;

    package simm.framework.threadutils.interrupt;
    
    import java.util.concurrent.TimeoutException;
    /**
     * futuretask运行终止事件通知
     * 2018.09.22 by simm
     */
    public class FTaskEndFlag {
        private volatile boolean isNormaled = false;
        private volatile boolean fired = false;
        private Exception exception =null;
    
        public boolean isNormaled() {
            return isNormaled;
        }
    
        /**
         * 获取子线程异常信息
         * @return
         */
        public Exception getException() {
            return exception;
        }
    
        /**
         * 通知结束
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result){
            isNormaled = result;
            fired = true;
            notifyAll();
        }
    
        /**
         * 通知结束
         * @param result
         * @param result
         */
        public synchronized void notifyEnd(boolean result,Exception ex){
            isNormaled = result;
            exception = ex;
            fired = true;
            notifyAll();
        }
    
        /**
         * 执行结束通知
         */
        public synchronized void waitForEnd() throws InterruptedException {
            while (!fired) {
                //子线程挂起,释放synchronized同步块
                wait();
            }
        }
        /**
         * 执行结束通知
         */
        public void waitForEnd(Thread thread,Long timeout) throws TimeoutException {
            long begin = System.currentTimeMillis();
            while(System.currentTimeMillis()-begin <= timeout){
                waitFunc(10);
                //子线程已经执行完毕,则返回
                if(fired) return;
            }
            //超时未返回,则终止线程
            try{
                thread.stop();
            }catch(Exception e){
                e.printStackTrace();
                throw e;
            }
            throw new TimeoutException("方法执行超出时限:"+timeout);
        }
        /**
         * 等待
         */
        private void waitFunc(long millis){
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      3.2、创建一个BaseThread的抽象类,内置FTaskEndFlag线程同步标志;

    package com.zhaogang.ii.biz.threads.future;
    
    /**
     * 基础线程
     */
    public abstract class BaseThread extends Thread {
        /**
         * futuretask 等待标志
         */
        private FTaskEndFlag flag = new FTaskEndFlag();
    
        public FTaskEndFlag getFlag() {
            return flag;
        }
    }

      3.3、创建一个超时重试的工具类,对FutureTask的结果获取设置超时时间;

    package com.zhaogang.ii.biz.threads.future;
    
    import java.lang.reflect.Constructor;
    import java.util.List;
    import java.util.concurrent.*;
    /**
     * 方法超时重试工具
     * 2018.09.20  by simm
     */
    public class RetryUtil {/**
         * 默认方法(3秒超时,重试3次)
         * @param syncThread
         * @param params
         * @param <T>
         * @return
         * @throws Exception
         */
        public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params) throws Exception {
            return execute(syncThread,params,3000,1000,3);
        }
    
        /**
         * 方法超时控制
         * @param syncThread 线程类
         * @param params 线程构造参数
         * @param timeout
         * @param interval
         * @param retryTimes
         * @param <T>
         * @return
         * @throws Exception
         */
        public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params, long timeout, long interval, int retryTimes) throws Exception {
            Boolean result = false;
            try{
                //参数类型数组
                Class[] parameterTypeArrs = new Class[params.size()];
                for(int i=0;i<params.size();i++){
                    Class c =  params.get(i).getClass();
                    if(c.getName().indexOf("$$")>0){
                        String clsName = c.getName().substring(0,c.getName().indexOf("$$"));
                        parameterTypeArrs[i] = Class.forName(clsName);
                    }else{
                        parameterTypeArrs[i] = c;
                    }
                }
                //根据参数类型获取相应的构造函数
                Constructor constructor= syncThread.getConstructor(parameterTypeArrs);
                //参数数组
                Object[] parameters= params.toArray();
                //根据获取的构造函数和参数,创建实例
                BaseThread processor = (BaseThread) constructor.newInstance(parameters);
                processor.start();
                //等待线程结束
                processor.getFlag().waitForEnd(processor,timeout);
                boolean r = processor.getFlag().isNormaled();
                if(!r){
                    throw processor.getFlag().getException();
                }
                return processor.getFlag().isNormaled();
            }catch (TimeoutException e) {
                //超时重试
                retryTimes--;
                if(retryTimes > 0){
                    System.out.println("方法开始重试:"+retryTimes);
                    Thread.sleep(interval);
                    execute(syncThread,params,timeout,interval,retryTimes);
                }else{
                    throw e;
                }
            }
            return result;
        }
    }

      3.4、设置子线程事务超时时间;

        @Transactional(timeout = 3)
        public void syncProduct(ProductDetailinfo productInfo) {

    四、给出一个调用代码。实现一个继承自BaseFutureTask的 FutureTask任务。依旧需要注意子线程涉及到spring的组件,最好是参数从主线程注入到子线程。保证传递的是动态代理对象。

    package interrupt;
    
    import simm.framework.threadutils.interrupt.BaseThread;
    
    public class SyncProductThread extends BaseThread {
        private ProductBiz productBiz;
        private ProductDetailinfo productInfo;
        /**
         * 构造函数
         * @param productBiz
         * @param productInfo
         */
        public SyncProductThread(ProductBiz productBiz, ProductDetailinfo productInfo){
            this.productBiz = productBiz;
            this.productInfo = productInfo;
        }
        @Override
        public void run() {
            boolean isNormaled = false;
            Exception exception = null;
            try{
                productBiz.syncProduct(productInfo);
                isNormaled = true;
            }catch(Exception e){
                e.printStackTrace();
                exception = e;
            }finally {
                //通知子线程运行完毕了
                super.getFlag().notifyEnd(isNormaled,exception);
            }
        }
    }
    /**
         * 超时中断的同步方法
         * @param productInfo
         * @throws Exception
         */
        public void syncProductTimeout(final ProductDetailinfo productInfo) throws Exception {
            Long timeout = 3000L;
            Long interval = 1000L;
            RetryUtil.execute(SyncProductThread.class,Arrays.asList(productBiz,productInfo),timeout,interval,3);
            //RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);
        }

    参考文章

    https://www.jianshu.com/p/55221d045f39

  • 相关阅读:
    RMQ(非log2储存方法)
    2016年5月份学习记录
    NOIP200504循环
    膜拜acm大牛 虽然我不会这题,但是AC还是没有问题的~(转自hzwer)
    最长公共子序列的长度
    菜鸟,大牛和教主三者的区别(转自hzwer)
    NOIP201205Vigenère密码
    NOIP200503采药
    公路乘车
    NOIP200902分数线划定
  • 原文地址:https://www.cnblogs.com/MrSi/p/9691337.html
Copyright © 2011-2022 走看看