前段时间发过此类demo,后经大神改版,学到了一点,遂记录一下
1 /** 2 * 利用线程池起两个任务 3 */ 4 public String thredSubmit(String uuidStr, List<VariationMapIn> mapIns, List<ApproveHis> hisList, String[] ids, 5 Long guToJuPartyId) { 6 SubmitToPer submitToPerTask = new SubmitToPer(uuidStr, hisList, mapIns, 7 JtCommonUtil.getUserView().getPartyId()); 8 9 XianSubmit xianianSubmitTask = new XianSubmit(JtCommonUtil.getUserView().getOrganization().getGroupCode(), 10 mapIns, JtCommonUtil.getUserView().getPartyId()); 11 12 String result = SynTaskUtils.doWork(Arrays.asList(submitToPerTask, xianianSubmitTask), transactionManager); 13 return result; 14 }
1 /** 2 * 第一个任务 3 */ 4 public class SubmitToPer extends BaseCallBack { 5 6 private String uuidStr; 7 private List<ApproveHis> hisList; 8 private List<VariationMapIn> mapIns; 9 private Long partyId; 10 11 public SubmitToPer(String uuidStr, List<ApproveHis> hisList, List<VariationMapIn> mapIns, Long partyId) { 12 super(); 13 this.uuidStr = uuidStr; 14 this.hisList = hisList; 15 this.mapIns = mapIns; 16 this.partyId = partyId; 17 } 18 19 @Override 20 protected void doWork() { 21 // 业务处理开始 22 MapInDealService mapInDealService = (MapInDealService) ApplicationUtil.getBean("mapInDealService"); 23 mapInDealService.submitToPer(uuidStr, mapIns, hisList, partyId); 24 } 25 26 }
1 /** 2 * 第二个任务 3 */ 4 public class XianSubmit extends BaseCallBack { 5 6 private String groupCode; 7 private List<VariationMapIn> mapIns; 8 private Long partyId; 9 10 public XianSubmit(String groupCode, List<VariationMapIn> mapIns, Long partyId) { 11 super(); 12 this.groupCode = groupCode; 13 this.mapIns = mapIns; 14 this.partyId = partyId; 15 } 16 17 @Override 18 protected void doWork() { 19 SpanCheckService spanCheckService = (SpanCheckService) ApplicationUtil.getBean("spanCheckService"); 20 String str = spanCheckService.xianCheckIn(mapIns, partyId, groupCode); 21 if (!Constant.SUCCESS_STR.equals(str)) 22 throw new RuntimeException(str); 23 } 24 25 }
1 /** 2 * 带回滚的异步任务回调 3 * 基类 4 * @author Administrator 5 * 6 */ 7 public abstract class BaseCallBack implements Callable<String> { 8 9 private static Logger logger = LoggerFactory.getLogger(BaseCallBack.class); 10 /** 11 * 需要回滚计数器 12 */ 13 protected CountDownLatch rollBackLatch; 14 /** 15 * 主线程等待计数器 16 */ 17 protected CountDownLatch mainThreadLatch; 18 /** 19 * 是否需要回滚 20 */ 21 protected AtomicBoolean rollbackFlag; 22 /** 23 * 事务 24 */ 25 protected PlatformTransactionManager transactionManager; 26 27 protected abstract void doWork(); 28 29 @Override 30 public String call() throws Exception { 31 if (rollbackFlag.get()) { 32 logger.info("需要回滚,直接不用执行了"); 33 mainThreadLatch.countDown(); 34 return Constant.ERROR_STR; // 如果其他线程已经报错 就停止线程 35 } 36 // 设置一个事务 37 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); 38 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。 39 TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态 40 try { 41 logger.info("业务开始处理:{}", this.getClass().getName()); 42 this.doWork(); 43 logger.info("业务处理结束:{}", this.getClass().getName()); 44 // 业务处理结束 45 mainThreadLatch.countDown(); 46 logger.info("线程内正常 mainThreadLatch.countDown();"); 47 rollBackLatch.await();// 线程等待 48 if (rollbackFlag.get()) { 49 logger.info("回滚事务:{}", this.getClass().getName()); 50 transactionManager.rollback(status); 51 } else { 52 logger.info("提交事务:{}", this.getClass().getName()); 53 transactionManager.commit(status); 54 } 55 return Constant.SAVE_SUCCESS; 56 } catch (Exception e) { 57 e.printStackTrace(); 58 // 如果出错了 就放开锁 让别的线程进入提交/回滚 本线程进行回滚 59 rollbackFlag.set(true); 60 transactionManager.rollback(status); 61 rollBackLatch.countDown(); 62 mainThreadLatch.countDown(); 63 logger.info("线程内异常 mainThreadLatch.countDown();"); 64 return "操作失败:" + e.getMessage(); 65 } 66 } 67 68 public CountDownLatch getRollBackLatch() { 69 return rollBackLatch; 70 } 71 72 public void setRollBackLatch(CountDownLatch rollBackLatch) { 73 this.rollBackLatch = rollBackLatch; 74 } 75 76 public CountDownLatch getMainThreadLatch() { 77 return mainThreadLatch; 78 } 79 80 public void setMainThreadLatch(CountDownLatch mainThreadLatch) { 81 this.mainThreadLatch = mainThreadLatch; 82 } 83 84 public AtomicBoolean getRollbackFlag() { 85 return rollbackFlag; 86 } 87 88 public void setRollbackFlag(AtomicBoolean rollbackFlag) { 89 this.rollbackFlag = rollbackFlag; 90 } 91 92 public PlatformTransactionManager getTransactionManager() { 93 return transactionManager; 94 } 95 96 public void setTransactionManager(PlatformTransactionManager transactionManager) { 97 this.transactionManager = transactionManager; 98 } 99 100 }
1 /** 2 * 异步线程执行器 携带回滚 3 * 4 * @author Administrator 5 * 6 */ 7 public class SynTaskUtils { 8 9 /** 10 * 日志 11 */ 12 private static Logger logger = LoggerFactory.getLogger(SynTaskUtils.class); 13 14 public static String doWork(List<? extends BaseCallBack> tasks, PlatformTransactionManager transactionManager) { 15 if (tasks == null || tasks.size() <= 0) { 16 return Constant.SUCCESS_STR; 17 } 18 logger.info("开始执行一组线程.........................................................."); 19 CountDownLatch rollBackLatch = new CountDownLatch(1); 20 CountDownLatch mainThreadLatch = new CountDownLatch(tasks.size()); 21 AtomicBoolean rollbackFlag = new AtomicBoolean(false); 22 List<Future<String>> list = new ArrayList<Future<String>>(); 23 for (BaseCallBack task : tasks) { 24 task.setMainThreadLatch(mainThreadLatch); 25 task.setRollbackFlag(rollbackFlag); 26 task.setRollBackLatch(rollBackLatch); 27 task.setTransactionManager(transactionManager); 28 logger.info("添加任务:{}", task.getClass().getName()); 29 Future<String> future = TestExecutorUtil.getInstance().getExecutor().submit(task); 30 list.add(future); 31 } 32 // 主线程业务执行完毕 如果其他线程也执行完毕 且没有报异常 正在阻塞状态中 唤醒其他线程 提交所有的事务 33 // 如果其他线程或者主线程报错 则不会进入if 会触发回滚 34 try { 35 logger.info("主线程开始等待。"); 36 mainThreadLatch.await(); 37 logger.info("主线程等待结束。"); 38 if (!rollbackFlag.get()) { 39 logger.info("不需要回滚。"); 40 rollBackLatch.countDown(); 41 return Constant.SUCCESS_STR; 42 } else { 43 logger.info("需要回滚。"); 44 for (Future<String> f : list) { 45 String result = f.get(); 46 if (!Constant.SAVE_SUCCESS.equals(result)) { 47 logger.info("返回值:{}", result); 48 return result; 49 } 50 } 51 return Constant.SUCCESS_STR; 52 } 53 } catch (Exception e) { 54 return "操作出现异常。"; 55 }finally { 56 logger.info("结束执行一组线程.........................................................."); 57 } 58 } 59 60 }
1 /** 2 * 公用线程池 3 * 4 * @author Administrator 5 * 6 */ 7 public class TestExecutorUtil { 8 9 /** 10 * 线程池 11 */ 12 private ExecutorService executor = Executors.newFixedThreadPool(10); 13 14 /** 15 * 单利 16 * 17 * @author Administrator 18 * 19 */ 20 private static class Instance { 21 private static final TestExecutorUtil instance = new TestExecutorUtil(); 22 } 23 24 private TestExecutorUtil() { 25 } 26 27 public static TestExecutorUtil getInstance() { 28 return Instance.instance; 29 } 30 31 public TestExecutorService getExecutor() { 32 return executor; 33 } 34 35 }