zoukankan      html  css  js  c++  java
  • 多线程操作数据库 异常抛出全部回滚的问题

    package czc.superzig.modular.utils;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.TimerTask;
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.DefaultTransactionDefinition;
    
    import com.google.common.collect.Lists;
    
    import czc.superzig.common.operatingtable.base.entity.Result;
    import czc.superzig.common.operatingtable.base.entity.Results;
    import czc.superzig.modular.system.operatingtable.entity.DetectionIndicator;
    
    /**
     *多线程操作数据库  其中一个线程发生异常则所有线程发生回滚 
     * 
     */
    
    public abstract class ThreadUtil<T> {
        
        private DataSourceTransactionManager txManager;
        
        public abstract void run(T entity);
        
        public ThreadUtil(List<T> list,DataSourceTransactionManager txManager){
            this.txManager=txManager;
            createThread(list);
        }
        
        
        
        private Result createThread(List<T> list) {
            Result result = new Result();
           
            //每条线程最小处理任务数
            int perThreadHandleCount = 1;
            //线程池的最大线程数
            int nThreads = 10;
            //任务数
            int taskSize = list.size();
    
            if (taskSize > nThreads * perThreadHandleCount) {
                perThreadHandleCount = taskSize % nThreads == 0 ? taskSize / nThreads : taskSize / nThreads + 1;
                nThreads = taskSize % perThreadHandleCount == 0 ? taskSize / perThreadHandleCount : taskSize / perThreadHandleCount + 1;
            } else {
                nThreads = taskSize;
            }
            //主线程的同步计时器
            CountDownLatch mainLatch = new CountDownLatch(1);
            //监控子线程的同步计时器
            CountDownLatch threadLatch = new CountDownLatch(nThreads);
            //根据子线程执行结果判断是否需要回滚
            BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(nThreads);
            //必须要使用对象,如果使用变量会造成线程之间不可共享变量值
            RollBack rollBack = new RollBack(false);
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
    
            List<Future<List<Object>>> futures = Lists.newArrayList();
            //返回数据的列表
            List<Object> returnDataList = Lists.newArrayList();
            //给每个线程分配任务
            for (int i = 0; i < nThreads; i++) {
                int lastIndex = (i + 1) * perThreadHandleCount;
                List<T> listVos = list.subList(i * perThreadHandleCount, lastIndex >= taskSize ? taskSize : lastIndex);
                FunctionThread functionThread = new FunctionThread(mainLatch, threadLatch, rollBack, resultList, listVos);
                Future<List<Object>> future = fixedThreadPool.submit(functionThread);
                futures.add(future);
            }
    
            /** 存放子线程返回结果. */
            List<Boolean> backUpResult = Lists.newArrayList();
            try {
                //等待所有子线程执行完毕
                boolean await = threadLatch.await(20, TimeUnit.SECONDS);
                //如果超时,直接回滚
                if (!await) {
                    rollBack.setRollBack(true);
                } else {
                    //查看执行情况,如果有存在需要回滚的线程,则全部回滚
                    for (int i = 0; i < nThreads; i++) {
                        Boolean flag = resultList.take();
                        backUpResult.add(flag);
                        if (flag) {
                            /** 有线程执行异常,需要回滚子线程. */
                            rollBack.setRollBack(true);
                        }
                    }
                }
            } catch (InterruptedException e) {
                result.setSuccess(false);
                result.setMsg("等待所有子线程执行完毕时,出现异常,整体回滚");
            } finally {
                //子线程再次开始执行
                mainLatch.countDown();
                fixedThreadPool.shutdown();
            }
    
            /** 检查子线程是否有异常,有异常整体回滚. */
            for (int i = 0; i < nThreads; i++) {
                if (CollectionUtils.isNotEmpty(backUpResult)) {
                    Boolean flag = backUpResult.get(i);
                    if (flag) {
                        result.setSuccess(false);
                        result.setMsg("运行失败");
                    }
                } else {
                    result.setSuccess(false);
                    result.setMsg("运行失败");
                }
            }
    
            //拼接结果
            try {
                for (Future<List<Object>> future : futures) {
                    returnDataList.addAll(future.get());
                }
            } catch (Exception e) {
                e.printStackTrace();
                result.setSuccess(false);
                result.setMsg("运行失败子线程正常创建参保人成功,主线程出现异常,回滚失败");
            }
            if(result.getSuccess()){
                result.setData(returnDataList);
            }
            return result;
        }
    
        public class FunctionThread implements Callable<List<Object>> {
            /**
             * 主线程监控
             */
            private CountDownLatch mainLatch;
            /**
             * 子线程监控
             */
            private CountDownLatch threadLatch;
            /**
             * 是否回滚
             */
            private RollBack rollBack;
            private BlockingDeque<Boolean> resultList;
            private List<T> taskList;
    
            public FunctionThread(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
                this.mainLatch = mainLatch;
                this.threadLatch = threadLatch;
                this.rollBack = rollBack;
                this.resultList = resultList;
                this.taskList = taskList;
            }
    
            @Override
            public List<Object> call() throws Exception {
                //为了保证事务不提交,此处只能调用一个有事务的方法,spring 中事务的颗粒度是方法,只有方法不退出,事务才不会提交
                return FunctionTask(mainLatch, threadLatch, rollBack, resultList, taskList);
            }
    
        }
    
        public class RollBack {
            private Boolean isRollBack;
    
            public Boolean getRollBack() {
                return isRollBack;
            }
    
            public void setRollBack(Boolean rollBack) {
                isRollBack = rollBack;
            }
    
            public RollBack(Boolean isRollBack) {
                this.isRollBack = isRollBack;
            }
        }
    
        public List<Object> FunctionTask(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
            //开启事务
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setName(java.util.UUID.randomUUID().toString());
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 事物隔离级别,开启新事务
            TransactionStatus status=txManager.getTransaction(def);
            
            List<Object> returnDataList = Lists.newArrayList();
            Boolean result = false;
            try {
                for (T entity : taskList) {
                    //执行业务逻辑
                    try {
                        run(entity);
                    } catch (Exception e) {
                        result=true;
                    }
                    returnDataList.add(entity);
                }
                //Exception 和 Error 都需要抓
            } catch (Throwable throwable) {
                throwable.printStackTrace();
                result = true;
            }
            //队列中插入回滚的结果 并对计数器减1
            resultList.add(result);
            threadLatch.countDown();
    
            try {
                //等待主程序的计数器
                mainLatch.await();
            } catch (InterruptedException e) {
                System.err.println("批量操作线程InterruptedException异常");
            }
    
            if (rollBack.getRollBack()) {
                 System.err.println("批量操作线程回滚");
                 txManager.rollback(status);
            }else{
                txManager.commit(status);
            }
            return returnDataList;
        }
    
    }

     1.

    CountDownLatch 线程计数器 创建时指定计数的大小 和监控的线程数相同  是同步的
    wait方法 等计数为0的时候才能继续执行下面代码 并可设置等待时间
    countDown 计数减一
    用于主线程和多个子线程之间的相互等待

    2.阻塞队列 生产不足的时候 阻塞消费 消费不足的时候 阻塞生产 放置数据丢失的问题

  • 相关阅读:
    中位数相关
    带权并查集
    组合数相关、多重集组合数
    LIS最长上升子序列
    提高你css技能的css开发技巧
    如何让搜索引擎抓取AJAX内容?
    Javascript异步编程的4种方法
    前端自动化构建工具gulp
    前端自动化构建工具
    git使用
  • 原文地址:https://www.cnblogs.com/xiatc/p/12803212.html
Copyright © 2011-2022 走看看