内容概况:
异步执行配置相关:
asyncExecutorActivate:这个属性是激活作业执行器,它的默认参数是false,只有设为true,activiti启动的时候才会开启线程池去扫描定时操作的任务
asyncExecutorXXX:这些属性的操作都是基于asyncExecutor这样一个前缀,后面有各种类型的属性配置,(其实里面的属性配置大多都是与线程池、队列相关的配置)
(很重要的配置)asyncExecutor:asyncExecutor的bean的配置,它本身是一个接口,用它也可以配置自定义的线程池。
如何自定义一个线程池?
在设置了核心线程数的情况下,比如设为了5,那么在开启了五个线程之后,有新的任务来了之后,回去检测核心线程数是否都在执行任务中,如果是,那么新的请求就会放在队列里去等待,
等到核心线程中有一个线程执行完了自己的任务,那么排在队列最前面的这个请求,回去获取这个线程,然后去执行。那么当队列一直有新的请求加入时,负载过大,超过了队列大小的时候,最大线程数就会起作用了,新建线程至最大线程数,来一起执行超过容量的新请求。 当三个都满了以后,就会拒绝服务、请求。
作业执行器的配置:
这里要说明的是R5/P1DT1H, / 后面的指时间间隔,1D表示一天,是24个小时,而1H表示一小时加起来就是25个小时。R5是指执行次数为5,这个属性的含义是:在流程启动时开始计时,25小时后第一次执行这个指定事件, 并且每隔25小时执行一次,总共执行次数为5次(ps:包含第一次执行在内)。
流程定义文件my-process_job.bpmn20.xml的改变:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration"> <!-- 给引擎设置自定义的commandInvoker --> <!--<property name="commandInvoker" ref="commandInvoker" />--> <!-- 若为true则开启记录事件、节点的状态,完成后将完成状态插入数据库,若为false则关闭,不记录 --> <property name="enableDatabaseEventLogging" value="true"/> <!--打开异步激活器激活异步,如果不配置线程池就会使用它的默认线程池--> <property name="asyncExecutorActivate" value="true"/> <!--如果使用我们自己定义的线程池,需要先定义一个执行器--> <property name="asyncExecutor" ref="asyncExecutor" /> <!-- 配置事件监听器 --> <property name="eventListeners"> <list> <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/> </list> </property> </bean> <!-- 执行器默认使用DefaultAsyncJobExecutor --> <bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor"> <!-- 需要配置一个服务 基于spring去配置它 --> <property name="executorService" ref="executorService"/> </bean> <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> <property name="threadNamePrefix" value="activiti-job-"/> <property name="corePoolSize" value="10"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <!-- 设置当线程池满了时候的拒绝策略,这里是使用的默认策略,抛出异常 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/> </property> </bean> <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
设置一个监听器JobEventListener,以便测试观察:
public class JobEventListener implements ActivitiEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(JobEventListener.class); //简单的完成一下监听器的效果 @Override public void onEvent(ActivitiEvent event) { ActivitiEventType eventType = event.getType(); String name = eventType.name(); if (name.startsWith("TIMER") || name.startsWith("JOB")){ LOGGER.info("监听到job事件 {} {}",eventType,event.getProcessInstanceId()); } } @Override public boolean isFailOnException() { return false; } }
编写activiti配置文件activiti_job.cfg.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration"> <!-- 给引擎设置自定义的commandInvoker --> <!--<property name="commandInvoker" ref="commandInvoker" />--> <!-- 若为true则开启记录事件、节点的状态,完成后将完成状态插入数据库,若为false则关闭,不记录 --> <property name="enableDatabaseEventLogging" value="true"/> <!--打开异步激活器激活异步,如果不配置线程池就会使用它的默认线程池--> <property name="asyncExecutorActivate" value="true"/> <!--如果使用我们自己定义的线程池,需要先定义一个执行器--> <property name="asyncExecutor" ref="asyncExecutor" /> <!-- 配置事件监听器 --> <property name="eventListeners"> <list> <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/> </list> </property> </bean> <!-- 执行器默认使用DefaultAsyncJobExecutor --> <bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor"> <!-- 需要配置一个服务 基于spring去配置它 --> <property name="executorService" ref="executorService"/> </bean> <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> <property name="threadNamePrefix" value="activiti-job-"/> <property name="corePoolSize" value="10"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <!-- 设置当线程池满了时候的拒绝策略,这里是使用的默认策略,抛出异常 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/> </property> </bean> <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
编写测试类configJobTest :
public class configJobTest { private static final Logger LOGGER = LoggerFactory.getLogger(configTest.class); @Rule public ActivitiRule activitiRule = new ActivitiRule("activiti_job.cfg.xml");//传入自定义的mdc配置文件 @Test @Deployment(resources = {"my-process_job.bpmn20.xml"})//流程定义文件 public void test() throws InterruptedException { //这里流程定义文件里设置了定时任务在一定事件内启动五次,所以不需要自行启动了。这里启动的代码就可以不要了。 //这里记录一下时间,看下流程每次启动时间与结束时间 LOGGER.info("start"); //在流程定义文件初始化以后,就开始定时启动了。 //那么我应该要查询一下在这段时间内有多少定时任务去执行 List<Job> jobList = activitiRule .getManagementService() .createTimerJobQuery() .listPage(0, 100); for (Job job : jobList) { LOGGER.info("定时任务 {} ,默认重复次数 {}",job,job.getRetries()); } LOGGER.info("jobList.size = {}",jobList.size()); //因为主线程很快就能执行完,而定时任务还没有执行,所以让线程等待一下 Thread.sleep(1000*10); LOGGER.info("end"); } }
测试结果如下: