前一篇项目总结里,说了电影票项目的worker的界面控制。这里来说一下多线程的运行,并且这里当时开发的还出现过一个问题。所以说,多线程,也最容易出问题。
场景:取一百部电影近3天的排期信息,创建一个线程池,多线程去请求数据
代码:2个类,一个配置文件,采用模拟方式,构造数据,模拟执行库的update等
错误的方式其实就在这一段代码里:
for (CinemaDetail cinemaDetail : cinemaDetails) { for (int i = 1; i <= 3; i++) {// 一次循环取一家影院一天的排期 System.out.println(Thread.currentThread().getName() + "取影院ID:" + cinemaDetail.getCinemaId() + "第" + i + "天的排期"); cinemaDetail.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetail)); while (executorService.getQueue().size() >= (queueSize - 3)) { // 等待队列有空位置,任务先创建corePoolSize大小的线程,再往队列中压,超队列再创建线程直到maxPoolSize // executorService.getActiveCount(),由于通过work.isLock判,不在一个线程中,线程对象被创建,不表示马上会调度,使isLock返回TRUE,会有并发问题 // executorService.getPoolSize(),等待超过空闲,线程回收后才会变化 System.out.println("取排期有几积压,在排队"); try { Thread.sleep(500); } catch (InterruptedException e) { // 不处理 } } } }
眼尖的同学可能已经看到问题所在了。
先来剖析一下这个主要的业务处理类吧
在Spring初始bean时,先初始一下线程池:
public void init() { // 创建一个固定大小的线程池 executorService = new ThreadPoolExecutor(threadSize, maxThreadSize, idletime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize)); }
这里有文档介绍ThreadPoolExecutor:
在这个bean创建的时候,静态初始100条影院数据
static { System.out.println("开始创建:---------"); long start = System.currentTimeMillis(); cds = new ArrayList<CinemaDetail>(); for (Integer i = 1; i <= 100; i++) { CinemaDetail cd = new CinemaDetail(); cd.setCinemaId(i); cd.setCinemaName(i.toString()); cds.add(cd); } System.out.println("结束创建,总耗时:" + (System.currentTimeMillis() - start)); }
我下面展示的所以输出都是把log替代成直接输出到控制台,具体要执行的方法:
public void execute() { if (executorService == null) { throw new NullPointerException("executorService is null,please call init()!"); } boolean flag = true; while (flag) { List<CinemaDetail> cinemaDetails = getAllCinemas(iCinemaId, pageSize); if (CollectionUtils.isEmpty(cinemaDetails)) { flag = false; System.out.println(Thread.currentThread().getName() + "无影院,本轮同步结束"); break; } // 记录下次循环需要执行的位置 iCinemaId = iCinemaId + pageSize; System.out.println(Thread.currentThread().getName() + "取所有影院排期信息===task===>开始,iCinemaId" + iCinemaId); for (CinemaDetail cinemaDetail : cinemaDetails) { for (int i = 1; i <= 3; i++) {// 一次循环取影院每一天的排期 System.out.println(Thread.currentThread().getName() + "取影院ID:" + cinemaDetail.getCinemaId() + "第" + i + "天的排期"); cinemaDetail.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetail)); while (executorService.getQueue().size() >= (queueSize - 3)) { System.out.println("取排期有几积压,在排队"); try { Thread.sleep(500); } catch (InterruptedException e) { // 不处理 } } } } while (executorService.getActiveCount() > 0) { // 本次时间任务跑完后,才能处理下次时间任务的调度 log.debug("取排期等待本次投注任务调度完成"); try { Thread.sleep(500); } catch (InterruptedException e) { // 不处理 } } } System.out.println("取所有影院排期信息===task===>结束"); }
然而executorService.execute(new getDataTask(cinemaDetail));所执行的就是一个模拟处理,这里只计数和打印信息,看一下代码
public class getDataTask implements Runnable { private CinemaDetail cinemaDetail; public getDataTask(CinemaDetail cinemaDetail) { this.cinemaDetail = cinemaDetail; } public void run() { executeData(cinemaDetail); } }
具体执行如下:
private void executeData(CinemaDetail cinemaDetail) { count.addAndGet(1); System.out.println(Thread.currentThread().getName() + "当前执行数:" + count.get() + "影院ID:" + cinemaDetail.getCinemaId() + "排期信息:第" + cinemaDetail.getIssueDate() + "天"); try { Thread.sleep(500);//模拟处理业务逻辑 } catch (InterruptedException e) { // 不处理 } }
那再来看一下配置文件,具体配了些啥,为了简单点,我就没有用上次说的Scheduler了,直接用随应用启动而启动,启动时间是每59秒一次。
<!-- JOB start --> <bean id="taskMultiThread" class="com.project.task.multithreading.TaskMultiThread" init-method="init" destroy-method="destroy"/> <bean id="taskMultiThreadTask" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <!-- false表示,当前Worker未完成时,即便到了启动时间点,也不启动新Worker --> <property name="concurrent" value="false"></property> <property name="targetObject"> <ref bean="taskMultiThread" /> </property> <property name="targetMethod"> <value>execute</value> </property> </bean> <bean id="taskMultiThreadJob" class="org.springframework.scheduling.quartz.CronTriggerBean"> <property name="jobDetail"> <ref bean="taskMultiThreadTask" /> </property> <property name="cronExpression"> <value>0/59 * * * * ?</value> </property> </bean> <!-- JOB end --> <bean id="startQuertz" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="taskMultiThreadJob" /> </list> </property> </bean>
打包,启动程序,执行结果,一看,咋就不对呢?
全取到的是第三天的结果,就是对第三天的结果进行了三次处理
这里也模拟出来了,需要排队等待的请况:
执行结束时的结果
这里会有quartz worker 2的原因是我把配置文件里的concurrent属性配成了true,请看配置处对此属性的注释
从运行结果来看,基本不会执行取第一天,第二天的数据,全执行的是第三天的数据(这时的脑袋是短路了,根本不会先问题),多线程造成的?
那我们用同步来试试?
synchronized (cinemaDetail) { cinemaDetail.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetail)); }
如果没有发现问题的本质,你同步也是不行的。请看结果:
冷静下来,再看,你会发现,其实不是线程公用变量之类的事情造成的
for (CinemaDetail cinemaDetail : cinemaDetails) { //一次循环,取1家影院近3天排期数据 for (int i = 1; i <= 3; i++) { // 一次循环取一家影院一天的排期 cinemaDetail.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetail)); } }
再来仔细看一下这个for循环,这里我去掉了一些不必要的信息,这是两个循环引起的,再认真看一下注释。在第二个循环来,我们还是用的同一个对象(引用)。那我们每次做的set操作,都是针对同一个引用来的。而当多线程去执行的时候,当然只认现在这个引用的状态,在这里也就是当前的属性(最后一次修改为3)。想明白这里之后,改起来应该就方便了。有几种方式可以选择,可以clone一个对象,或者新建一个对象。
这里有文档对clone的介绍:
如果采用clone的话,CinemaDetail这个类需要实现Cloneable接口,如下:
public class CinemaDetail implements Cloneable{ public CinemaDetail clone() { CinemaDetail cd; try { cd = (CinemaDetail) super.clone(); return cd; } catch (CloneNotSupportedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }
然后for循环里进行相应的修改:
for (CinemaDetail cinemaDetail : cinemaDetails) { //一次循环,取1家影院近3天排期数据 for (int i = 1; i <= 3; i++) { CinemaDetail cinemaDetailClone = cinemaDetail.clone(); // 一次循环取一家影院一天的排期 cinemaDetailClone.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetailClone)); } }
如果采用new对象的话,相应修改如下:
for (CinemaDetail cinemaDetail : cinemaDetails) { //一次循环,取1家影院近3天排期数据 for (int i = 1; i <= 3; i++) { // 一次循环取一家影院一天的排期 CinemaDetail cinemaDetailNew = new CinemaDetail(); //有多少属性,就复制多少属性。。。如果属性多就麻烦了 cinemaDetailNew.setCinemaId(cinemaDetail.getCinemaId()); cinemaDetailNew.setCinemaName("cinemaName"); cinemaDetailNew.setIssueDate(i); executorService.execute(new getDataTask(cinemaDetailNew)); } }
这两种方式,都可以正常的运作了。
看一下运行的结果吧
解决了之后,心里还是美滋滋的,项目也正常往下进行着。
总结:
有多线程的使用时,全局变量的使用需要谨慎
Synchronized也并不是万能的,首先应明白原因
一个对象在两重以上的循环里处理时,需要注意
多线程ThreadPoolExecutor创建的使用
对象的clone,需要实现cloneable
SpringQuartz的配置熟练使用
总结中,有些是针对这次的问题的,有些是针对这次的内容的。