zoukankan      html  css  js  c++  java
  • 切分大任务成多个子任务(事务),汇总后统一提交或回滚

    示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git
    一、业务场景:
      系统中存在一个盘库的功能,用户一次盘库形成一两万条的盘库明细单,一次性提交给服务器进行处理。服务器性能比较优越,平均也得运行30秒左右。性能上需要进行优化。
     
    二、处理方案:
      做过代码分析后,发现单线程逻辑没有什么优化空间。开始考虑引入多线程处理模型,用10个子线程进行任务切分处理。切分子线程问题需要考虑事务的一致性。10个子线程对应10个事务,需要保证所有事务一起提交或一起回滚。这里使用synchronized(wait,notifyall)机制做线程协作。
     
    三、代码实现:
       3.1、添加一个多线程协作标志类,用于做子线程运行状态统计,通知子线程做事务提交还是回滚的操作;
    package simm.framework.threadutils.multi;
    
    import java.util.UUID;
    
    /**
     * 多线程结束标志
     * 2018.09.22 by simm
     */
    public class MultiEndFlag {
        private volatile boolean fired = false;
        //是否执行成功
        private volatile boolean isAllSuccess = false;
        private volatile int threadCount = 0;
        private volatile int failCount = 0;
    
        /**
         * 初始化子线程的总数
         * @param count
         */
        public MultiEndFlag(int count){
            threadCount = count;
        }
    
        public boolean isAllSuccess() {
            return isAllSuccess;
        }
    
        /**
         * 等待全部结束
         * @param threadId
         * @param result
         */
        public synchronized void waitForEnd(UUID threadId,int result){
            //统计失败的线程个数
            if(result==0){
                failCount++;
            }
            threadCount--;
            while (!fired){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 执行结束通知
         */
        public synchronized void go(){
            fired = true;
            //结果都显示成功
            isAllSuccess = (failCount == 0);
            notifyAll();
        }
        /**
         * 等待结束
         */
        public void end(){
            while (threadCount > 0){
                waitFunc(50);
            }
            System.out.println("线程全部执行完毕通知");
            go();
        }
    
        /**
         * 等待
         */
        private void waitFunc(long millis){
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

       3.2、提供一个数据保存服务的接口定义,一个默认的子线程任务执行类(需要接收数据保存服务实现,业务数据,协作标志变量);

    package simm.framework.threadutils.multi;
    
    import java.util.List;
    import java.util.UUID;
    
    /**
     * 保存服务接口
     * 2018.09.22 by simm
     * @param <T>
     */
    public interface ISaveService<T> {
        /**
         * 子线程批量保存方法
         * @param list
         * @param endFlag
         * @param threadId
         * @return
         * @throws Exception
         */
        Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception;
    }
    package simm.framework.threadutils.multi;
    
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.Callable;
    
    /**
     * 默认的执行任务
     * 2018.09.22 by simm
     */
    public class DefaultExecTask<T> implements Callable<Integer> {
        private List<T> list;
        private ISaveService saveService;
        private MultiEndFlag endFlag;
        private UUID threadId;
        /**
         * 盘库子任务
         * @param saveService
         * @param notes
         * @param flag
         */
        public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){
            this.saveService = saveService;
            this.list = notes;
            this.endFlag = flag;
            this.threadId = UUID.randomUUID();
        }
        @Override
        public Integer call() throws Exception {
            return saveService.batchSave(this.list,this.endFlag,this.threadId);
        }
    }
        3.3、实现最核心的线程池分发子线程,并汇总结果通知子线程事务做最终的提交或回滚。线程池使用定长池 newFixedThreadPool,子线程使用futureTask,可接收返回值和异常信息。
    package simm.framework.threadutils.multi;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * 多线程切分执行器
     * 2018.09.22 by simm
     */
    public class MultiExecutor {
        private static int maxThreadCount = 10;
        /**
         * 执行方法(分批创建子线程)
         * @param saveService
         * @param notes
         * @param groupLen
         * @return
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException {
            if(notes==null || notes.size()==0) return true;
            //创建一个线程池,最大10个线程
            ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
            List<Future<Integer>> futures = new ArrayList<>();
            int noteSize = notes.size();
            int batches = (int) Math.ceil(noteSize * 1.0 /groupLen);
            //分组超长最大线程限制,则设置分组数为10,计算分组集合尺寸
            if(batches>maxThreadCount){
                batches = maxThreadCount;
                groupLen = (int) Math.ceil(noteSize * 1.0 /batches);
            }
            System.out.println("总长度:"+noteSize+"  批次信息:"+batches+"  分组长度:"+groupLen);
            MultiEndFlag flag = new MultiEndFlag(batches);
            int startIndex, toIndex, maxIndex = notes.size();
            for(int i=0;i<batches;i++){
                startIndex = i * groupLen;
                toIndex = startIndex + groupLen;
                if(toIndex> maxIndex) {
                    toIndex = maxIndex;
                }
                List<T> temp = notes.subList(startIndex,toIndex);
                if(temp == null || temp.size()==0) continue;
                futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag)));
            }
            flag.end();
            //子线程全部等待返回(存在异常,则直接抛向主线程)
            for(Future<Integer> future:futures){
                future.get();
            }
            //所有线程返回后,关闭线程池
            executorService.shutdown();
            return true;
        }
    }
     
    四、给出一个调用伪代码。需要注意的一点,子线程开启事务,这里使用@Transactional声明式事务,这要求服务的实体类需要通过spring的bean工厂创建,得到一个动态代理类,以达到支持事务拦截器的目的,保证注解的有效性。
    package multi;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import simm.framework.threadutils.multi.DefaultExecTask;
    import simm.framework.threadutils.multi.ISaveService;
    import simm.framework.threadutils.multi.MultiEndFlag;
    import simm.framework.threadutils.multi.MultiExecutor;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.ExecutionException;
    
    /**
     * 损益单保存服务
     */
    @Service
    public class DemoService implements ISaveService<NoteCheckBalance> {
        private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class);
        @Autowired
        private NoteCheckBalanceMapper noteCheckBalanceMapper;
    
        /**
         * 业务保存
         * @param list
         */
        public void save(List<NoteCheckBalance> list){
            for(NoteCheckBalance item :list){
                noteCheckBalanceMapper.insert(item);
            }
        }
        /**
         * 批量保存事件
         */
        @Transactional(rollbackFor = Exception.class)
        @Override
        public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception {
            int result = 0;
            try{
                //业务操作
                save(list);
                result = 1;
                //进行waitForEnd 操作,是为了确保所有的线程都最终通知同步协作标志
                endFlag.waitForEnd(threadId ,result);
                //其他线程异常手工回滚
                if(result==1 && !endFlag.isAllSuccess()){
                    String message = "子线程未全部执行成功,对线程["+threadId+"]进行回滚";
                    throw new Exception(message);
                }
                return result;
            }catch (Exception ex){
                logger.error(ex.toString());
                if(result ==0){
                    //本身线程异常抛出异常,通知已经做完(判断是为了防止 与 try块中的通知重复)
                    endFlag.waitForEnd(threadId ,result);
                }
                throw ex;
            }
        }
    
        /**
         * 调用示例
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //调用示例
            MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500);
        }
    }
     
    参考文章
  • 相关阅读:
    Maven中的依赖相关总结
    Redis(序)应用场景
    Java并发专栏(一)—— Process vs Thread
    Java中转换为二进制的几种实现
    Java中转换为十六进制的几种实现
    oracle日期格式和java日期格式区别 HH24:mm:ss和HH24:mi:ss的区别
    远程教学的一些准备
    将latex的beamer做的幻灯片转换为ppt格式后的一些问题
    最近看过的雨课堂和智慧树的笔记
    关于在线授课的探索
  • 原文地址:https://www.cnblogs.com/MrSi/p/9690937.html
Copyright © 2011-2022 走看看