一个简单的多线程的例子:
package multiThread; public class BasicThread implements Runnable{ private int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public static void main(String [ ] args) { Thread t = new Thread(new BasicThread()); t.setName("test_thread1"); t.start(); //not t.run(); t.run() will not start a new thread,just exist one thread System.out.println("i am finished!"); } @Override public void run() { while(countDown>=0){ System.out.print("#"+ id + "(" + (countDown>0 ? countDown : "lift off! "+ Thread.currentThread().getName()+ " ") + "),");
countDown --; } } }
运行的结果为:
i am finished!
#0(10),#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(lift off!),
关于上面的代码有几点说明一下:
1.final int id = taskCount++,final修饰基本变量时,值是不变的,从结果可以看到,id的值始终为0。
2.使用t.start()来开启一个线程,而不是t.run(),t.run()还是运行在main方法的线程中,始终只有1个线程。
3.使用t.setName("test_thread1");是一个好的习惯,方便后续的定位问题。
使用setDaemon(true)可以将线程设置为后台线程,后台线程有如下的特点:
1.JVM中只剩下Daemon线程时就会退出,只要还有一个non-Daemon线程存活,JVM就不会退出。Daemon线程可以在做一些后台的服务性工作,例如JVM的gc线程就是一个低优先级的Daemon线程。
2.线程启动时,默认是non-Daemon的。
3.setDaemon(true)一定要在start方法之前,否则回报异常。
4.无法将一个已经启动的non-Daemon线程变为Daemon线程。
5.Daemon线程中产生的新线程默认将是Daemon的。
使用Executor来完成多线程,例子如下:
package multiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorTest { public static void main(String [ ] args) { ExecutorService exec = Executors.newCachedThreadPool(); //will create one thread for each task //ExecutorService exec2 = Executors.newFixedThreadPool(5); only eixsts 5 threads,others will wait //ExecutorService exec3 = Executors.newSingleThreadExecutor(); for(int i=0;i<7;i++){ exec.execute(new BasicThread()); } //exec.shutdown(); programe will not exit
//exec.execute(new BasicThread()); error exec.execute(new BasicThread()); exec.execute(new BasicThread()); } }
所有的线程池在线程可用的情况下,都会去复用已创建的线程,上面的代码运行结果为:
#0(10),#1(10),#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#3(10),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(lift off! pool-1-thread-4 ),#2(10),#0(lift off! pool-1-thread-1 ),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(lift off! pool-1-thread-2 ),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(lift off! pool-1-thread-3 ),#7(10),#7(9),#7(8),#7(7),#7(6),#7(5),#7(4),#7(3),#7(2),#7(1),#6(10),#6(9),#6(8),#6(7),#6(6),#6(5),#6(4),#6(3),#6(2),#6(1),#4(10),#4(9),#4(8),#4(7),#4(6),#8(10),#6(lift off! pool-1-thread-2 ),#7(lift off! pool-1-thread-4 ),#5(10),#8(9),#8(8),#8(7),#8(6),#8(5),#8(4),#8(3),#4(5),#4(4),#4(3),#4(2),#4(1),#4(lift off! pool-1-thread-5 ),#8(2),#8(1),#8(lift off! pool-1-thread-6 ),#5(9),#5(8),#5(7),#5(6),#5(5),#5(4),#5(3),#5(2),#5(1),#5(lift off! pool-1-thread-1 ),
上面的例子有一点是需要注意的:
注释掉exec.shutdown();之后,程序将在所有线程运行完成后一分钟后才结束。如果想要想线程运行完后,程序自动退出,需要加上exec.shutdown();
exec.shutdown();的作用是为了防止新的任务加入线程队列中。
如果希望线程在结束时可以带上返回值,可以使用Callable来代替Runnable,如下例:
package multiThread; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableTest { @SuppressWarnings({ "rawtypes", "unchecked" }) public static void main(String[] args) throws Exception{ ExecutorService es = Executors.newCachedThreadPool(); List<Future> ls = new ArrayList<Future>(); for(int i =0;i<5;i++){ ls.add(es.submit(new RuturnObj(i))); } es.shutdown(); //only with es.shutdown(),the program will shut down,unless program will not shut down for(Future f:ls){ System.out.println(f.get()); } } } @SuppressWarnings("rawtypes") class RuturnObj implements Callable{ private int id; RuturnObj(int id){ this.id = id; } @Override public Object call() throws Exception { return "RuturnObj return id: " + id ; } }
运行结果为:
RuturnObj return id: 0 RuturnObj return id: 1 RuturnObj return id: 2 RuturnObj return id: 3 RuturnObj return id: 4
在第一个线程上调用第二个线程的join方法,第一个线程将等待第二个线程完成后再接着执行。如果第一个线程不想等待,可以调用interrupt方法,第一个方法将继续执行,第二个方法将等待执行。
package multiThread; public class JoinTest { public static void main(String [ ] args) { Sleeper sheep = new Sleeper("sheep"),pig = new Sleeper("pig"); Joiner doc = new Joiner("doc",sheep),gru = new Joiner("gru",pig); gru.interrupt(); } } class Sleeper extends Thread{ public Sleeper(String name){ super(name); start(); } public void run(){ try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Sleeper " + getName() + " was interrupted!"); } System.out.println("Sleeper " + getName() + " has finished!"); } } class Joiner extends Thread{ private Sleeper sl; public Joiner(String name,Sleeper sl){ super(name); this.sl = sl; start(); } public void run(){ try { sl.join(); } catch (InterruptedException e) { System.out.println("Joiner " + getName() + " was interrupted!"); } System.out.println("Joiner " + getName() + " has finished!"); } }
输出结果为:
Joiner gru was interrupted! Joiner gru has finished! Sleeper sheep has finished! Sleeper pig has finished! Joiner doc has finished!
Java SE5中的concurrent包中的CyclicBarrier使用上比join更为合适。
下面是一个使用CyclicBarrier的例子,开启了6个线程,模拟统计各省的信息,各省的信息都统计完后再统计全国的信息,如下:
package multiThread; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { //CountDownLatch: 一个或者是一部分线程 ,等待另外一部线程都完成了,再继续执行 //CyclicBarrier: 所有线程互相等待完成,再执行主任务 //一个是在主任务里面await,一个是在子任务里面await public static void main(String[] args) { TotalService totalService = new TotalServiceImpl(); CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService)); List<String> provinceNames = Arrays.asList("北京","上海","广西","四川","黑龙江"); ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<provinceNames.size();i++){ es.execute(new BillTask(new BillServiceImpl(), barrier, provinceNames.get(i))); } es.shutdown(); } } /** * 主任务:汇总任务 */ class TotalTask implements Runnable { private TotalService totalService; TotalTask(TotalService totalService) { this.totalService = totalService; } public void run() { // 读取内存中各省的数据汇总,过程略。 System.out.println("======================================="); System.out.println("开始全国汇总"); totalService.count(); System.out.println("全国数据汇总完毕"); } } /** * 子任务:计费任务 */ class BillTask extends Thread { // 计费服务 private BillService billService; private CyclicBarrier barrier; // 代码,按省代码分类,各省数据库独立。 private String code; BillTask(BillService billService, CyclicBarrier barrier, String code) { this.billService = billService; this.barrier = barrier; this.code = code; } public void run() { System.out.println("开始计算--" + code + "省--数据!"); billService.bill(code); // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略 System.out.println(code + "省已经计算完成,并通知汇总Service!"); try { // 通知barrier已经完成 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } interface BillService { public void bill(String code); } class BillServiceImpl implements BillService { @Override public void bill(String code) { Random rd = new Random(47); int i = rd.nextInt(4000); try { Thread.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } } interface TotalService { public void count(); } class TotalServiceImpl implements TotalService { @Override public void count() { Random rd = new Random(47); int i = rd.nextInt(4000); try { Thread.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出结果为:
开始计算--北京省--数据! 开始计算--广西省--数据! 开始计算--上海省--数据! 开始计算--黑龙江省--数据! 开始计算--四川省--数据! 四川省已经计算完成,并通知汇总Service! 北京省已经计算完成,并通知汇总Service! 黑龙江省已经计算完成,并通知汇总Service! 广西省已经计算完成,并通知汇总Service! 上海省已经计算完成,并通知汇总Service! ======================================= 开始全国汇总 全国数据汇总完毕
CountDownLatch可以完成与CyclicBarrier类似的功能,CyclicBarrier是可以复用的,CountDownLatch却不可以。
下面用CountDownLatch模拟项目的开发,只有当每个模块都完成后,项目才完成, 每个模块的用时不同。
package multiThread; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 模拟项目的开发,只有当每个模块都完成后,项目才完成 每个模块的用时不同 * */ public class CountDownLatchTest { private static final int SIZE = 5; public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(SIZE); Random r = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); Controller controller = new Controller(latch); exec.execute(controller); for (int i = 0; i < SIZE; i++) { exec.execute(new Module(latch, "模块" + (i + 1), r.nextInt(2000))); } exec.shutdown(); } } class Module implements Runnable { private CountDownLatch latch; private String moduleName; private int time;// 用时 public Module(CountDownLatch latch, String moduleName, int time) { super(); this.latch = latch; this.moduleName = moduleName; this.time = time; } @Override public void run() { try { work(); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } private void work() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(time); System.out.println(moduleName + " 完成,耗时:" + time); } } class Controller implements Runnable { private CountDownLatch latch; public Controller(CountDownLatch latch) { super(); this.latch = latch; } @Override public void run() { try { latch.await(); System.out.println("所有模块都完成,任务完成"); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果为:
模块2 完成,耗时:555 模块3 完成,耗时:693 模块5 完成,耗时:961 模块1 完成,耗时:1258 模块4 完成,耗时:1861 所有模块都完成,任务完成