示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git
接上篇博客《FutureTask子线程取消执行的状态判断》
一、业务场景:
系统中存在多种场景并发操作事务执行时互锁的情况,导致任务积压,系统崩溃。先做了各场景业务的性能调整,但是并发互锁依然无法避免。于是开始考虑选取调用频繁的同步功能作为死锁的牺牲品,取消执行,释放锁。
二、处理方案:
由于FutureTask.cancel方案无法解决数据库死锁问题。因此接下来切换方案,改为Thread.stop实现,强行杀掉子线程。核心思想是 把需要监控的操作 包装成一个子线程,启动以后主线程开启一个运行时间监控,当判断到子线程已经超出时限,则调用stop方法,强行中断(需要说明一点,stop操作可能会带来一些未知的异常状态,jdk已经将其标记为过时,不建议使用,这里需要个人自行评估方案的影响)。
那么只做一个stop子线程的操作,能同时解决掉其对数据库的锁定吗?经过验证这是不可能的,对数据库的锁定需要在事务提交或回滚时 释放。事务是由spring框架自行接管处理的,在发生死锁,子线程未结束的情况下,事务即使到了超时时限,但是由于没有新的sql执行语句触发其超时校验,spring事务也无法自行结束。它的结束点为 数据库事务锁等待超时。因此,现在我们想要让事务提前回滚还需要一个操作,设置 @Transactional的timeout时间,指定为一个较短的时间,子线程被stop后,spring会自动对该事务进行回滚,从而达到数据库解锁的目的。
三、代码实现:
3.1、创建一个FTaskEndFlag的线程同步标志。父线程等待子线程反馈执行结果后,再执行后续的逻辑;
package simm.framework.threadutils.interrupt; import java.util.concurrent.TimeoutException; /** * futuretask运行终止事件通知 * 2018.09.22 by simm */ public class FTaskEndFlag { private volatile boolean isNormaled = false; private volatile boolean fired = false; private Exception exception =null; public boolean isNormaled() { return isNormaled; } /** * 获取子线程异常信息 * @return */ public Exception getException() { return exception; } /** * 通知结束 * @param result * @param result */ public synchronized void notifyEnd(boolean result){ isNormaled = result; fired = true; notifyAll(); } /** * 通知结束 * @param result * @param result */ public synchronized void notifyEnd(boolean result,Exception ex){ isNormaled = result; exception = ex; fired = true; notifyAll(); } /** * 执行结束通知 */ public synchronized void waitForEnd() throws InterruptedException { while (!fired) { //子线程挂起,释放synchronized同步块 wait(); } } /** * 执行结束通知 */ public void waitForEnd(Thread thread,Long timeout) throws TimeoutException { long begin = System.currentTimeMillis(); while(System.currentTimeMillis()-begin <= timeout){ waitFunc(10); //子线程已经执行完毕,则返回 if(fired) return; } //超时未返回,则终止线程 try{ thread.stop(); }catch(Exception e){ e.printStackTrace(); throw e; } throw new TimeoutException("方法执行超出时限:"+timeout); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2、创建一个BaseThread的抽象类,内置FTaskEndFlag线程同步标志;
package com.zhaogang.ii.biz.threads.future; /** * 基础线程 */ public abstract class BaseThread extends Thread { /** * futuretask 等待标志 */ private FTaskEndFlag flag = new FTaskEndFlag(); public FTaskEndFlag getFlag() { return flag; } }
3.3、创建一个超时重试的工具类,对FutureTask的结果获取设置超时时间;
package com.zhaogang.ii.biz.threads.future; import java.lang.reflect.Constructor; import java.util.List; import java.util.concurrent.*; /** * 方法超时重试工具 * 2018.09.20 by simm */ public class RetryUtil {/** * 默认方法(3秒超时,重试3次) * @param syncThread * @param params * @param <T> * @return * @throws Exception */ public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params) throws Exception { return execute(syncThread,params,3000,1000,3); } /** * 方法超时控制 * @param syncThread 线程类 * @param params 线程构造参数 * @param timeout * @param interval * @param retryTimes * @param <T> * @return * @throws Exception */ public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params, long timeout, long interval, int retryTimes) throws Exception { Boolean result = false; try{ //参数类型数组 Class[] parameterTypeArrs = new Class[params.size()]; for(int i=0;i<params.size();i++){ Class c = params.get(i).getClass(); if(c.getName().indexOf("$$")>0){ String clsName = c.getName().substring(0,c.getName().indexOf("$$")); parameterTypeArrs[i] = Class.forName(clsName); }else{ parameterTypeArrs[i] = c; } } //根据参数类型获取相应的构造函数 Constructor constructor= syncThread.getConstructor(parameterTypeArrs); //参数数组 Object[] parameters= params.toArray(); //根据获取的构造函数和参数,创建实例 BaseThread processor = (BaseThread) constructor.newInstance(parameters); processor.start(); //等待线程结束 processor.getFlag().waitForEnd(processor,timeout); boolean r = processor.getFlag().isNormaled(); if(!r){ throw processor.getFlag().getException(); } return processor.getFlag().isNormaled(); }catch (TimeoutException e) { //超时重试 retryTimes--; if(retryTimes > 0){ System.out.println("方法开始重试:"+retryTimes); Thread.sleep(interval); execute(syncThread,params,timeout,interval,retryTimes); }else{ throw e; } } return result; } }
3.4、设置子线程事务超时时间;
@Transactional(timeout = 3) public void syncProduct(ProductDetailinfo productInfo) {
四、给出一个调用代码。实现一个继承自BaseFutureTask的 FutureTask任务。依旧需要注意子线程涉及到spring的组件,最好是参数从主线程注入到子线程。保证传递的是动态代理对象。
package interrupt; import simm.framework.threadutils.interrupt.BaseThread; public class SyncProductThread extends BaseThread { private ProductBiz productBiz; private ProductDetailinfo productInfo; /** * 构造函数 * @param productBiz * @param productInfo */ public SyncProductThread(ProductBiz productBiz, ProductDetailinfo productInfo){ this.productBiz = productBiz; this.productInfo = productInfo; } @Override public void run() { boolean isNormaled = false; Exception exception = null; try{ productBiz.syncProduct(productInfo); isNormaled = true; }catch(Exception e){ e.printStackTrace(); exception = e; }finally { //通知子线程运行完毕了 super.getFlag().notifyEnd(isNormaled,exception); } } }
/** * 超时中断的同步方法 * @param productInfo * @throws Exception */ public void syncProductTimeout(final ProductDetailinfo productInfo) throws Exception { Long timeout = 3000L; Long interval = 1000L; RetryUtil.execute(SyncProductThread.class,Arrays.asList(productBiz,productInfo),timeout,interval,3); //RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3); }
参考文章
https://www.jianshu.com/p/55221d045f39