java从诞生开始就明智的选择了内置对多线程的支持,这使得java语言相比同一时期的其他语言具有明显的优势。线程作为操作系统调度的最小单元,多个线程能够同时执行,这将显著提升程序的性能,在多核环境中表现的更加明显。但是,过多的创建线程和对线程的不当管理也容易造成问题。本章将着重介绍java并发编程的基础知识,从启动一个线程到线程间不同的通信方式,最后通过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容。
1.线程简介
1.1 什么是线程
现代操作系统中在运行一个程序时,会为其创建一个进程。例如,启动一个java程序,操作系统就会创建一个java进程。现在操作系统调度的最小单元就是线程,也叫轻量级进程(Light Weight Process),在一个进程中可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时进行。
一个java程序从main()方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上java程序天生就是多线程程序,因为执行main()方法的是一个名称为main的线程,下面使用JMX来查看一个普通的java程序包含哪些线程
public static void main(String[] args) { //使用java线程管理MXbean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); //不需要获取同步的monitor和synchronizer信息,进获取线程和线程堆栈信息 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for (ThreadInfo threadInfo : threadInfos) { System.out.println("["+threadInfo.getThreadId()+"]"+threadInfo.getThreadName()); } } // [7]JDWP Command Reader // [6]JDWP Event Helper Thread // [5]JDWP Transport Listener: dt_socket // [4]Signal Dispatcher 分发处理发送给jvm信号的线程 // [3]Finalizer 调用对象finalize方法的线程 // [2]Reference Handler 清除reference 的线程 // [1]main main线程,用户程序入口
可以看到,一个java程序的运行不仅仅是面()方法的运行,而是main线程和多个其他线程的同时执行。
1.2 为什么要使用多线程
执行一个简单的“Hello word!”,却启动了那么多“无关”线程,是不是把简单的问题复杂化了?当然不是,因为真确的使用多线程,总能够给开发人员带来显著的好处,而使用多线程的原因主要有以下几点:
1.更多的处理器核心
随着处理器上的核心数量越来越多,以及超线程技术的广泛运用,现在大多数计算机都比以往更加擅长并行计算,而处理器性能的提升方式,也从更高的主频向更多的核心发展。如何利用好处理器上的核心也成了现在的主要问题
线程是大多数操作系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程,而一个线程在一个时刻只能运行在一个处理器核心上。试想一下,一个单线程程序在运行是只能使用一个处理器核心,那么再多的处理器核心加入也无法显著提升该程序的执行效率。相反,如果改程序使用多线程技术,将计算逻辑分配到多个处理器核心上,就会显著减少程序的处理时间,并且随着更多处理器核心的加入而变得更有效率。
2.更快的响应时间
有事我们会编写一些较为复杂的代码(这的复杂代码不是说复杂的算法,而是复杂的业务逻辑)。例如,一笔订单的创建,他包括插入订单数据、生成订单快照、发送邮件通知买家和记录货品销售数量等。用户从单击“订购”按钮开始,就要等待这些操作全部完成才能看到定购成功的结果。但是这么多业务操作,如何能够让其跟快的完成呢?
在上面的场景中,可以使用多线程技术,即将数据一致性不强的操作派发个其他线程处理(也可以使用消息队列),如生成订单快照、发送邮件等这样做的好处就是想用用户请求的线程能够尽可能的处理完成,缩短了响应时间,提升了用户体验。
3.更好的编程模型
java为多线程模型提供了良好、考究并且一致的编程模型,是开发人员能够更加专注于问题的解决,即为所遇到的问题建立合适的模型,而不是绞尽脑汁的考虑如何将其多线程化。一旦开发人员建立好模型,稍做修改总是能够方便的映射到java提供的多线程编程模型上。
1.3 线程优先级
现在操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个时间片,线程会分配到若干时间片,当线程的时间片用完了就发生线程的调度,并等待下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理资源的线程属性。
在java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围1~10,默认是5,在线程构建的时候可以通过setPriority(int)方法来修改优先级,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级是,针对频繁阻塞(休眠或者I/O操作)的现场需要设置较高的优先级,而偏重计算(需要较多cpu时间或者片预算)的线程则设置较低的优先级,确保处理器不会被独占。在不同jvm以及操作系统上,线程规划存在差异,有些操作系统甚至会忽略线程优先级的设定。
public class Priority { private static volatile boolean notStart = true; private static volatile boolean notEnd = true; public static void main(String[] args) throws InterruptedException { ArrayList<Job> jobs = new ArrayList<>(); for (int i = 0; i < 10; i++) { int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY; Job job = new Job(priority); jobs.add(job); Thread thread = new Thread(job,"thread:"+i); thread.setPriority(priority); thread.start(); } notStart = false; TimeUnit.SECONDS.sleep(10); notEnd = false; for (Job job : jobs) { System.out.println("job priority :"+job.priority+",count : "+job.jobCount); } } static class Job implements Runnable{ private int priority; private long jobCount; public Job(int priority) { this.priority = priority; } @Override public void run() { while (notStart){ Thread.yield(); } while (notEnd){ Thread.yield(); jobCount++; } } } }
job priority :1,count : 9225351
job priority :1,count : 9103654
job priority :1,count : 9276681
job priority :1,count : 9197398
job priority :1,count : 9292870
job priority :10,count : 9330619
job priority :10,count : 9238517
job priority :10,count : 9389986
job priority :10,count : 9340567
job priority :10,count : 8984817
从输出可以看到线程的优先级没有生效,优先级1和优先级10的Job计数器的结果非常先进,没有明显差距,这表示程序正确性不能依赖优先级高低。
1.4 线程的状态
java线程在运行的生命周期可能处于的6中状态,在给定的一个时刻,线程只能处于其中的一个状态
public class ThreadState { public static void main(String[] args) { new Thread(new TIMEWaiting(),"timeWatingThread").start(); new Thread(new Waiting(),"watingThread").start(); //使用两个block线程,一个获取锁成功,另一个阻塞 new Thread(new Blocked(),"blockedThread-1").start(); new Thread(new Blocked(),"blockedThread-2").start(); } //该线程不断地进行睡 static class TIMEWaiting implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(100); } } } // 该线程在 waiting。class实例上等代 static class Waiting implements Runnable{ @Override public void run() { while (true){ synchronized (Waiting.class){ try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } // 改线成在Blocked。class 实例上加锁后,不会释放锁 static class Blocked implements Runnable{ @Override public void run() { synchronized (Blocked.class){ while (true){ SleepUtils.second(100); } } } } static class SleepUtils{ public static final void second(long seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行改实例代开中断或者命令提示符 输入 jps
4067 Launcher
4068 ThreadState
4103 Jps
3983
jstack PID
线程创建之后,调用start()方法开始执行。当线程执行wait()方法之后,线程进入等待状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而超时等待状态相当于在等待方法的基础上增加了超时限制,也就是超时时间到达将会返回运行状态。当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到阻塞状态,线程在执行Runnable的run() 方法之后将进入到终止状态,
1.5 Daemon线程
Daemon线程是一种支持线程,因为它主要被用作程序中后台调度以及支持性工作,这意味着,当一个java虚拟机中不存在非Daemon线程的时候,java虚拟机将会推出,可以通过Thread.setDeamon(true)将线程设置为Daemon线程,当java虚拟机退出时,Daemon线程中的finally块并不一定执行
public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner(), "daemonRunner"); thread.setDaemon(true); thread.start(); } static class DaemonRunner implements Runnable{ @Override public void run() { try { ThreadState.SleepUtils.second(10); }catch (Exception e){ System.out.println("daemonThread finally run ."); } } } }
运行Daemon程序,可以看到在终端或者命令符上没有任何的输出。main线程(非Daemon线程)在启动了DaemonRunner之后随着main方法执行完毕而终止,而此时java虚拟机中已经没有非Daemon线程,虚拟机需要退出。java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner立即终止,但是DaemonRunner中的finally块并没有执行。
在创建Daemon线程时,不能依靠Finally快中的内容来确保执行关闭或清理资源的逻辑
2.启动和终止线程
2.1 构造线程
在运行线程之前手下要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,如线程所属的线程组,线程优先级,是否是Daemon线程等信息
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { throw new NullPointerException("name cannot be null"); } this.name = name; //当前线程就是该线程的父线程 Thread parent = currentThread(); this.group = g; //将Daemon。priority属性设置为父线程的对应属性 this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); this.target = target; setPriority(priority); // 将父线程的inheritThreadLocal复制过来 if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* 分配一个线程id */ tid = nextThreadID(); }
一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon、优先级和加载资源的contenxtClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一id来标识这个Child线程。至此一个能够运行的线程对象就初始化好了,在堆内存中等待运行。
2.2 启动线程
线程对象在初始化完成之后,调用start()方法就可以启动这个线程。线程Start()方法的含义是:当前线程(即parent线程同步告知java虚拟机,只要线程划器空闲,应立即启动调用start()方法的线程)
2.3 理解中断
中断可以理解为线程的一个标识位属性,他表示一个运行中的线程是否被其他线程进行了中断,中断好比其他线程对该线程打了一个招呼,其他线程通过调用该线程的interupt()方法对其进行了中断操作。
线程通过检查自身时候被中断来进行响应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位,如果该线程已经处于终结状态,即使线程被中断过,即使该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返回false。
从java的API中可以看到,许多声明抛出InterruptedException的方法(例如Thread。sleep(Long millis))这些方法在抛出InterruptedException之前,java虚拟机会想讲线程的中断标识位清除,然后抛出InterruptedException,此时调用IsIterrupted()方法将会返回false。
public class Interrupted { public static void main(String[] args) throws InterruptedException { //SleepThread不停的尝试失眠 Thread sleepThread = new Thread(new SleepRunner(), "SleepThread"); sleepThread.setDaemon(true); //busyThread 不停的运行 Thread busyThread = new Thread(new BusyRunner(), "busyThread"); busyThread.setDaemon(true); sleepThread.start(); busyThread.start(); //休眠5秒,让sleepThread和busyThread充分运行 TimeUnit.SECONDS.sleep(5); sleepThread.interrupt(); busyThread.interrupt(); System.out.println("sleepThread interrupted is:"+sleepThread.isInterrupted()); System.out.println("busyThread interrupted is:"+busyThread.isInterrupted()); SleepUtils.second(2); } static class SleepRunner implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(10); } } } static class BusyRunner implements Runnable{ @Override public void run() { while (true){} } } } //输出结果 sleepThread interrupted is:false busyThread interrupted is:true java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at 多线程并发的艺术.并发编程的挑战1.SleepUtils.second(SleepUtils.java:25) at 多线程并发的艺术.并发编程的挑战1.Interrupted$SleepRunner.run(Interrupted.java:45) at java.lang.Thread.run(Thread.java:748)
从结果可以看出,抛出InterruptedException 的线程SleepThread,其中断标识位被清除了,而一直忙碌运作的线程BusyThread,中断标识位没有被清除
2.4 过期的suspend()、resume()、stop()
大家对于CD机肯定不陌生,如果把它播放音乐比作一个线程运作,那么对音乐的播放做出暂停、恢复和停止操作对应的线程Thread的API的supend()/resume()/和stop()。
public class Deprecated { public static void main(String[] args) throws InterruptedException { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); Thread thread = new Thread(new Runner(), "printThread"); thread.setDaemon(true); thread.start(); TimeUnit.SECONDS.sleep(3); //将thread 进行暂停,输出内容工作停止 thread.suspend(); System.out.println("main supend thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.resume(); System.out.println("main resume thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.stop(); System.out.println("main stop thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); } static class Runner implements Runnable{ @Override public void run() { DateFormat format = new SimpleDateFormat("HH:mm:ss"); while (true){ System.out.println(Thread.currentThread().getName()+"run at"+format.format(new Date())); SleepUtils.second(1); } } } } printThreadrun at22:17:37 printThreadrun at22:17:38 printThreadrun at22:17:39 main supend thread at 22:17:40 main resume thread at 22:17:43 printThreadrun at22:17:43 printThreadrun at22:17:44 printThreadrun at22:17:45 main stop thread at 22:17:46
不建议使用的原因有:以suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易已发死锁问题。同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能在不确定状态下。
2.5 安全的终止线程
2.3中提到的中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个Boolean变量来控制是否需要停止任务并终止任务
public class Shutdown {
public static void main(String[] args) throws InterruptedException { Runner one = new Runner(); Thread countThread = new Thread(one, "countThread"); countThread.start(); //睡眠一秒 mian 线程对CountThread 进行中断,使countThread能够感知中断而结束 TimeUnit.SECONDS.sleep(1); countThread.interrupt(); Runner two = new Runner(); countThread = new Thread(two, "countThread"); countThread.start(); TimeUnit.SECONDS.sleep(1); two.cancel(); } private static class Runner implements Runnable{ private long i; private volatile boolean on = true; @Override public void run() { while(on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i ="+i); } public void cancel(){ on = false; } } } Count i =753158205
Count i =769424811
示例在执行过程中,main线程通过中断操作和cancel()方法均可使CountThread线程终止,这种通过标识或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断色将线程停止,因此这种终止线程的做法显得更加安全优雅。
3.线程间通信
线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步的执行,直至终止,但是每一个运行的线程,如果多个线程能够相互配合完成工作,这将会带来巨大价值。
3.1 volatile和synchronized关键字
java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需哟啊从共享内存中获取,而对他的改变必须同步刷新回共享内存,他能保证所有线程对变量访问的可见性。
举个列子,定义一个表示程序是否运行的成员变量Boolean on= true,那么另一线程能对他执行关闭动作(on = false),这里涉及多个线程对变量的访问,因此需要将其定义成为 volatile Boolean on= true,这样其他线程对他进行改变时,可以让所有线程感知到变化,因为所有对on变量的访问和修改都需要以共享内存为准,但是,过多的使用volatile是不必要的,因为他会降低程序的执行效率。
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
public class Synchronized { public static void main(String[] args) { //对Synchronized class对象进行加锁 synchronized (Synchronized.class){} m(); } private static synchronized void m() { } }
public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=3, args_size=1 0: ldc #2 // class 多线程并发的艺术/并发编程的挑战1/Synchronized 2: dup 3: astore_1 4: monitorenter //监视器进入,获取锁 5: aload_1 6: monitorexit //监视器退出,释放锁 7: goto 15 10: astore_2 11: aload_1 12: monitorexit 13: aload_2 14: athrow 15: invokestatic #3 // Method m:()V 18: return public static synchronized void m(); descriptor: ()V
//方法修饰符 表示 public static synchronized void m()
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=0, locals=0, args_size=0 0: return
上面class信息中,对于同步代码块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(moniter)进行获取,而这个获取过程实排他的,也就是同一时刻只能有一个线程获取到由synchronized锁保护对象的监视器。
任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞再同步代码块和同步方法的入口处,进入BLOCKED状态。
该图可以看出,任意线程对Object的访问,首先要获得Object的监视器,如果获取失败,该线程就进入同步状态,线程状态变为BLOCKED,当Object的监视器占有者释放后,在同步队列中得线程就会有机会重新获取该监视器。
3.2 等待/通知机制
一个线程修改了一个对象的值,而另一线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么(what)”和“怎么做(How)”,在功能层面上实现了解耦,体系结构上具备了良好的伸缩性,但是在java语言中如何实现类似的功能呢?
简单的方法是让消费者线程不断地循环检查变量是否符合预期,如下面代码所示,在while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作.
while (value != desire){ Thread.sleep(1000); } doSomething();
上面这段伪代码在条件不满足时就睡眠一段时间,这样做的目的是为了防止过快的"无效"尝试,这种方式看似能够解实现所需的功能,但是存在的如下问题。
1)难以确保及时性。在睡眠时,基本不消耗处理器资源,但是如果睡的过久,就不能及时发现条件已经变化,也就是及时性难以保证。
2)难以降低开销。如果降低睡眠时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件的变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
以上两个问题,看似矛盾难以调和,但是java通过内置的等待/通知机制能够很好地解决这个矛盾并实现了所需的工能。
等待/通知的相关方法是任意java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.object上。
等待/通知机制,是指一个线程A调用对象O的wait()方法进入了等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象的wait()和notify()/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作
穿建了两个线程——WaiThread和NotifyThread,前者检查flag值是否为false,如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠了一段时间后对lock进行通知
public class WaitNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) { Thread waitThread = new Thread(new Wait(), "waitThread"); waitThread.start(); SleepUtils.second(1); Thread notifyThread = new Thread(new Notify(), "notifyThread"); notifyThread.start(); } static class Wait implements Runnable{ @Override public void run() { //加锁 拥有lock的 Moniter synchronized (lock){ //当条件不满足时,继续wait同时释放lock锁 while (flag){ try { System.out.println(Thread.currentThread()+ "flag is true. wait@"+new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //条件满足时,完成工作 System.out.println(Thread.currentThread()+"flag is false. running @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable{ @Override public void run() { //加锁 拥有lock的 Moniter synchronized (lock){ //获取lock的锁,然后进行通知,通知时不会释放lock的锁 //直到当前线程释放了lock后,waitThread才能从wait方法中返回 System.out.println(Thread.currentThread()+"hold lock .notify @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; SleepUtils.second(5); } //再次加锁 synchronized (lock){ System.out.println(Thread.currentThread()+"hold lock again. sleep@"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); SleepUtils.second(5); } } } }
Thread[waitThread,5,main]flag is true. wait@20:38:13
Thread[notifyThread,5,main]hold lock .notify @20:38:14
Thread[notifyThread,5,main]hold lock again. sleep@20:38:19
Thread[waitThread,5,main]flag is false. running @20:38:24
上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait(),notify()以及notifyAll()是需要注意的细节,如下:
1)使用wait(),notify()和notifyAll()时需要先对调用对象加锁。
2)使用wait()方法后,线程的状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
3)notify()和notifuAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifyAll()的线程释放锁之后,等待线程才有机会wait()返回。
4)notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而totifyAll()方法则是将等待队列中所有线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
5)从wait()方法返回的前提是获得了调用对象的锁。
从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改、
WaitThread首先获取了对象锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用了对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。
3.3 等待/通知经典案例
从3.2中的WaitNotify实例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知这(生产者)
等待者遵循如下原则
1)获取对象锁
2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件
3)条件满足则执行对应的逻辑
对应的伪代码
synchronized(对象){
while(条件不满足){
对象.wait();
}
对应的处理逻辑
}
通知方遵循如下原则
1)获取对象锁
2)改变条件
3)通知所有等待的对象的线程
synchronized(对象){
改变条件
对象.notifyAll();
}
3.4 管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体的实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面相字节,而后两中面相字符
创建了printThread,它用来接受main线程的输入,任何main下城的输入均可通过pipedWriter写入,而printThread在另一端通过oioedReader将内容读出并打印。
public class Piped { public static void main(String[] args) throws IOException { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); //将输出流和输入流进行连接,否则在使用时会抛出 IOException out.connect(in); Thread printThread = new Thread(new Print(in), "printThread"); printThread.start(); int receive = 0; try { while ((receive = System.in.read())!= -1){ out.write(receive); } }finally { out.close(); } } static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive = 0; try { while ((receive = in.read() )!= -1){ System.out.println((char) receive); } } catch (IOException e) { e.printStackTrace(); } } } }
运行该示例,输入一组字符串,可以看到被printThread进行了原样输出
Repeat my words
Repeat my words
对于piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。
3.5 Thread.jion()
如果线程A 执行了thread.jion()语句,其含义是:当前线程A等待thread线程终止之后才从thread.jion() 返回。线程Thread除了提供jion()方法之外,还提供了jion(long millis)和jion(long millis,int nanos)两个具备超时时间里没有终止,那么将会从该超时方法中返回。
在代码示例中,创建了10个线程,编号0~9,每个线程调用前一个线程的join()方法,也就是线程0结束了,线程1才能从jion()方法中返回,而线程0需要等待main线程结束
public class Jion { public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { //每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回 Thread thread = new Thread(new Domino(previous), String.valueOf(i)); thread.start(); previous = thread; } TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+"terminate"); } static class Domino implements Runnable{ private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"terminate."); } } }
mainterminate
0terminate.
1terminate.
2terminate.
3terminate.
4terminate.
5terminate.
6terminate.
7terminate.
8terminate.
9terminate.
从上述输出可以看到,每一个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join()方法返回,这里涉及到了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。可以看到join()方法的逻辑结构与3.3节中描述的等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤。
3.6 ThreadLocal的使用
ThreadLocal,即线程变量,是一个以threadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。
在代码清单4-15所示的例子中,构建一个常用的profiler类。它具有begin()和end()两个方法,而end()方法返回begin()方法调用开始到end()方法调用湿的时间差,单位是毫秒。
public class Profiler { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){ @Override protected Long initialValue(){ return System.currentTimeMillis(); } }; public static final void begin(){ TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final Long end(){ return System.currentTimeMillis()-TIME_THREADLOCAL.get(); } public static void main(String[] args) throws Exception { begin(); TimeUnit.SECONDS.sleep(1); System.out.println("cost: "+end()+"mills"); } }
cost: 1004mills
Profiler可以被复用在方法调用耗时统计的功能上,再方法的入口前执行begin()方法,在方法调用后end() 方法,好处就是两个方法的调用不用在一个方法或者类中,比如在AOP(面向方面编程)中,可以在方法调用前的切入点执行begin()方法,而在方法调用后的切入点执行end()方法,这样依旧可以获得方法的执行耗时。
4.线程应用实例
4.1 等待超时模式
开发人员经常会遇到这样的方法调用常用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
前章介绍了等待、通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式无法做到超时等待。而超时等待的加入,只需要对经典范式做出非常小的改动,改动内容如下:
假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。
定义变量
1.等待持续时间 REMAINING=T
2.超时时间:FUTURE=now +T
这是仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后将会执行:REMAINING = FUTURE -now,如果REMAINING小于等于0,表示已经超时,直接退出,否则将继续执行wait(REMAINING)。
上述描述等待超时模式的伪代码如下:
public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; //当超时大于0并且result 返回值不满足要求 while((result == null) && remaining > 0){ wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
可以看出,等待超时模式就是在等待/通知范式的基础上增加了超时控制。这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会"永久阻塞调用者",而时会按照调用者的要求“按时”返回
4.2 一个简单的数据库连接池示例
我们使用等待超时模式来构造一个简答的数据库连接池,在示例模拟从连接池中获取、使用和释放连接的过程,而客户端获取连接的过程被设定为超时等待的模式,也就是在1000毫秒内如果无法获取到可用的连接,将返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接大场景。
首先看一下连接池的定义。他通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用releaseConnection(Connection)方法将连接放回线程池。
public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<>(); public ConnectionPool(int initialSize) { if (initialSize > 0){ for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool){ // 连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接 pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ if(mills <= 0){ while (pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else { long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining >0){ pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection result = null; if(!pool.isEmpty()){ result = pool.removeFirst(); } return result; } } } }
由于java.sql.connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,我们通过动态代理构造了一个Connection,该connection,该connection的代理实现仅仅实在commit()方法调用时休眠100毫秒
public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler{ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("commit")){ TimeUnit.SECONDS.sleep(100); } return null; } } public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[]{Connection.class},new ConnectionHandler()); } }
下面通过一个示例来测试建议数据库连接池的工作情况,模拟客户端ConnextionRunner获取、使用、最后释放连接的过程,当它使用连接将会增加后去到连接的数量,反之,将会增加未获取到连接的数量
public class ConnectionPoolTest { static ConnectionPool connectionPool = new ConnectionPool(10); //保证所有connection 能够同时开始 static CountDownLatch start = new CountDownLatch(1); //main 线程将会等待所有connectionRunner 结束后才能继续执行 static CountDownLatch end; public static void main(String[] args) throws InterruptedException { int threadCount = 10; end = new CountDownLatch(threadCount); int count = 20; AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "connectionRunnerThread"); thread.start(); start.countDown(); end.await(); System.out.println("total invoke :"+(threadCount * count)); System.out.println("got connection:"+got); System.out.println("notgot connection :"+notGot); } } static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notgot; public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notgot) { this.count = count; this.got = got; this.notgot = notgot; } @Override public void run() { try { start.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (count > 0){ //从线程池中获取连接,如果1000ms 内无法获取到,将会返回null //分别统计连接获取的数量got和或获取到的数量 notgot try { Connection connection = connectionPool.fetchConnection(100); if(connection != null){ try { connection.createStatement(); connection.commit(); }finally { connectionPool.releaseConnection(connection); got.incrementAndGet(); } }else { notgot.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); }finally { count --; } } end.countDown(); } } }
上述示例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。当前设定的场景是10个线程同时运行获取连接池(10个连接)中连接,通过调整线程数量来观察未获取到连接的情况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率。
从表中的数据统计可以看出,在资源一定的情况下(连接池中的10个连接),随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升高。虽然客户端线程在这种超时获取的模式下回出现连接无法获取的情况,但是他能够保证客户端线程不会一直关在连接获取的操作上,而是“按时”返回,并告知客户端连接后去出现问题,是系统的一种自我保护机制。数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵的资源(比如数据库连接池)的获取都应该加以超时限制。
4.3 线程技术及其示例
对于一个服务端的程序,经常面对的是客户传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,穿建一个线程。然后进行执行,这在原型阶段是一个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记得线程,这不是一个好的选择。因为这回使操作系统频繁的进行上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要消费系统资源的,也无疑浪费了系统资源。
线程池技术能够很好的解决这个问题,他预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面消除了频繁创建和消亡线程的系统资源的开销,另一方面,面对过量任务的提交能够平缓的劣化。
下面先看一个简单的线程池接口定义
public interface ThreadPool<Job extends Runnable>{ //执行一个Job,这个需要实现Runner void execute(Job job); //关闭线程chi void shutdown(); //增加工作者线程 void addWorkers(int num); //减少工作者线程 void removeWorker(int num); //得到正在等待执行的任务数量 int getJobSize(); }
客户端可以通过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。这里工作者线程代表者一个重复执行Job的线程,而每个有客户段提交的Job都将进入一个工作队列中等待工作者线程的处理。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //线程池最大限制数 private static final int MAX_WORKER_NUMBERS = 10; //线程池默认的数量 private static final int DEFAULT_WORKER_NUMBERS = 5; //线程池最小的数量 private static final int MIN_WORKER_NUMBERS = 1; //这里是一个工作列表,将会向里面插入工作 private final LinkedList<Job> jobs = new LinkedList<>(); //工作者列表 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); //工作者线程的数量 private int workerNum = DEFAULT_WORKER_NUMBERS; //线程编号生成 private AtomicInteger threadNum = new AtomicInteger(); public DefaultThreadPool() { initializWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS :num; initializWokers(workerNum); } //初始化线程工作者 private void initializWokers(int num) { for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet()); thread.start(); } } @Override public void execute(Job job) { if(job != null){ //添加一个工作,然后进行通知 synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } } @Override public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void addWorkers(int num) { synchronized (jobs){ //限制新增的worker 数量不能超过最大值 if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializWokers(num); this.workerNum += num; } } @Override public void removeWorker(int num) { synchronized (jobs){ if(num >= this.workerNum){ throw new IllegalArgumentException("beyond worknum"); } //按照给定数量停止worker int count = 0; while (count < num){ Worker worker = workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } } this.workerNum -= count; } } @Override public int getJobSize() { return jobs.size(); } //工作者,负责消费任务 class Worker implements Runnable{ //是否工作 private volatile boolean running = true; @Override public void run() { while (running){ Job job = null; synchronized (jobs){ while (jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部对workerThread的中断操作,返回 Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try { job.run(); }catch (Exception e){ //忽略执行中的exception } } } } public void shutdown(){ running = false; } } }
从线程池的实现可以看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加job,而每个工作者线程会不断从Jobs上取出一个Job进行执行,当jobs为空时,工作He线程进入等待状态。
添加一个Job后,对工作队列jobs调用了其notify() 方法,而不是notifyAll()方法,因为能够确定有工作者线程被唤醒,这时使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待对列中的线程全部移动到阻塞对列).
可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不段的从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户段提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。
4.4 一个基于线程池技术的简单web服务器
目前的浏览器都支持多线程访问,比如说在请求一个HTML页面的时候,页面中包含的图片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图片完全下载完成才能继续查看文字内容的尴尬情况
如果web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务端还是一个请求一个请求的顺序处理。因此,大部分web服务器都是支持并发访问的。常用的java Web服务器,如tomcat、jetty,在其处理请求的过程中都使用到了线程池技术。
下面通过使用前一节中的线程池来构造一个简单的web服务器,这个web服务器用来处理HTTP请求,目前只能处理简单的文本和JPG图片内容。这个WEB服务器使用main线程不断地接受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得web服务器能够同时处理多个客户端请求,
public class SimpleHttpServer { //处理HttpRequest的线程池 static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1); //simpleHttpServer的根路径 static String basePath; static ServerSocket serverSocket; static int port = 8080; public static void setPort(int port){ if (port > 0){ SimpleHttpServer.port = port; } } public static void setBasePath(String basePath){ if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){ SimpleHttpServer.basePath = basePath; } } //启动SimpleHttpserver public static void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept())!= null){ threadPool.execute(new HttpRequestHandler(socket)); } serverSocket.close(); } static class HttpRequestHandler implements Runnable{ private Socket socket; public HttpRequestHandler(Socket socket) { socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); //由相对路径计算出绝对路径 String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); //如果请求资源的后缀为jpg或者ico,则读取资源并输出 if(filePath.endsWith("jpg") || filePath.endsWith("ico")){ in = new FileInputStream(filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int i = 0; while ((i = in.read()) != -1){ baos.write(i); } byte[] bytes = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server : Molly"); out.println("content-Type: image/jpeg"); out.println("Content-Length: "+bytes.length); out.println(""); socket.getOutputStream().write(bytes,0,bytes.length); }else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200 OK"); out.println("Server: Molly"); out.println("Content-Type: text/html; charset=UTF-8"); out.println(""); while ((line = br.readLine()) != null){ out.println(line); } } out.flush(); }catch (Exception e){ out.println("HTTP/1.1 500"); out.println(""); out.flush(); }finally { close(br,in,reader,out,socket); } } private void close(Closeable... closeables) { if (closeables != null){ for (Closeable closeable : closeables) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
在图中,SimpleHttpeServer在建立了与客户端的连接之后,并不会处理客户端的请求,而是将其包装成HttpRequestHandler并交由线程池处理。在线程池中的Worker处理客户端请求的同时,SimpleHttpServer能够继续完成后续客户端连接的建立,不会阻塞后续客户端的请求。
5.本章小结
本章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多线程之间进行通信的基本方式和等待/通知经典范式。在线程因果给你示例中,使用了等待超时、数据库连接池以及简单的线程池3个不同案例。