一、Java内存模型与线程
由于计算机的存储设配与处理器的运算速度有几个数量级的差距,所以加入高速缓存来作为内存与处理器之间的缓冲。
Java虚拟机规范定义一种Java内存模型来屏蔽各种硬件和操作系统的内存访问差异。
1.1 Java Memery Model即JMM
① JMM规定所有变量规定了所有的变量都存在主内存中。
② 每条线程还有自己的工作内存,线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,不能直接读写主内存的变量。
③ 不同线程无法访问对方的工作内存,线程间变量值的传递需要通过主内存来实现。
1.2 对volatile 型变量的特殊规则
关键字volatile是Java虚拟机提供的最轻量级的同步机制。
① 可见性
当一条线程修改了这个变量的值,新值对于其他线程来说可以立刻得知。
使用场景:主线程中,某个用volatile修饰的变量-如是否配置信息加载完成,定义成boolean,初始值是false,线程A负责加载配置信息,加载完后,它会把false改成true,而线程B采用while循环,一但加载完配置信息的标记从false,改成true,线程B就立马可以感知,线程B就可以进行相应的初始化工作。
② 非原子性
在多个线程同时拷贝主内存的变量到自己的工作内存中,对副本进行修改,之后各自向主内存写入操作的时候,出现写覆盖,写丢失。
③ 禁止指令重排
(Double Check Lock)双端检锁-单例模式
public class Singleton { /** * volatile:禁止指令重排 * * new Singleton(): * 指令①:分配内存地址 * 指令②:初始化 * 指令③ :指针指向初始化后的变量 */ private volatile static Singleton instance; private Singleton() { System.out.println("no args construtor..."); } /** * double check lock */ public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; } }
上述代码中,如果instance没有用volatile修饰,那么在高并发的情况下,有可能出现指令③和指令②重排,对于其他线程来说,由于被synchronized锁住代码块,但是当前拿到锁的线程已经有了指针指向变量(实际上还没有初始化),其他线程直接用这个引用操作的时候,就会发生异常。因为这个指针的引用指向的对象还没有完成初始化。
1.3 Java Memery Model 规范
① 可见性:当一个线程修改了共享变量的值,其他线程能够立刻得知这个变量的修改。
volatile的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。
② 原子性:Java给用户开发了synchronized关键字(底层实现monitorenter 与monitorexit)
③ 有序性
volatile与synchroniezed(一个变量在同一时刻只允许一条线程来对其进行lock操作)
1.4 状态转化
一个线程在任意时间点,只能有且只有其中一种
① 先建New:创建后尚未启动的线程处于这种状态
② 运行Runable:可能正在执行,或等待CPU为他分配执行时间。
③ 无限期等待Waiting:处于这种状态的线程不会被分配CPU执行时间,它需要被其他线程唤醒。
④ 限期等待Timed Waiting:一段时间后,自动唤醒。
⑤ 阻塞Blocked:线程等待进入同步代码块时,进入这种状态。
⑥ 结束Terminated:已终止的线程状态。
1.5 CAS(比较并交换)-常用的原子基本类型如AtomicInteger
CAS(Compare-And-Swap),它是一条CPU并发原语,原语的执行必须是连续的,在执行过程中不允许被中断,不会造成所谓的数据不一致问题。
源码:
/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
/** * Atomically increments by one the current value. */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
unsafe类:
是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过native方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部可以像C的指针一样直接操作内存,Java中CAS操作的执行依赖于Unsafe类的方法。
/** * Unsafe.class:getAndAddInt * 如果工作内存中预期的值与主内存中一致,那么就把更新后的值(var5 +var4)写入主内存中,否则继续循环(线程自旋) * compareAndSwapInt 返回 boolean */ public final int getAndAddInt(Object var1,long var2,int var4) { int var5; do { var5 = this.getIntVolatile(var1,var2); }while (!this.compareAndSwapInt(var1,var2,var5,var5 + var4)) return var5; }
CAS缺点:
① ABA问题--解决(时间戳原则引用)--AtomicStampReference<>()。
② 循环开销时间很大
③ 只能保证一个共享变量的原子操作
1.6 锁
① 公平锁和非公平锁
公平锁:指多个线程按照申请锁的顺序来获取锁,并发环境,每个线程在获取锁时先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有,否则就会加入到等待队列中,以后按照FIFO从规则中获取自己。
非公平锁:获取锁的顺序不是按照申请锁的顺序,可能后申请的线程比先申请的线程优先获得锁,在高并发的情况下,可能造成优先级反转或者饥饿现象。
② 可重入锁(又名递归锁):线程可以进入任何一个它已经拥有的锁所同步的代码块。
③ 独占锁/共享锁
④ 自旋锁
1.7 阻塞队列 BlockQueue
传统1:
// 资源类 class ShareData { private int number = 0;
public void increment() throws InterruptedException { synchronized (this){ // 判断 while (number !=0){ this.wait(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + " " +number); // 唤醒 this.notifyAll(); } } public void decrement() throws InterruptedException { synchronized (this){ // 判断 while (number ==0){ this.wait(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + " " +number); // 唤醒 this.notifyAll(); } } } /** * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮. * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒通知 */ public class Prod_Consumer_BlockQueueDemo { public static void main(String[] args) { } }
传统2:
// 资源类 class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void increment() throws InterruptedException { lock.lock(); try { // 判断 while (number != 0) { //等待,不能生产 condition.await(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + " " + number); // 通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try { // 判断 while (number == 0) { //等待,不能消费 condition.await(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + " " + number); // 通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } /** * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮. * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒通知 */ public class Prod_Consumer_BlockQueueDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "AA").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "BB").start(); } }
1.8 synchronized 与Lock区别
①. 原始构成
synchronized是关键字属于JVM层面:monitorenter、monitorexit(底层是通过monitor对象来完成,其实wait/notify等方法也是依赖于monitor对象只有在同步块或者方法中才能调用)
Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁
② 使用方法
synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用。(即使执行发生异常也会释放锁,毕竟有两个monitorexit)
ReetrantLock则需要用户手动释放锁,如果没有主动释放,就可能导致死锁的存在。lock.lock try{...}....finally {lock.unlock}
③ 等待是否中断
synchronized不可中断,除非抛出异常或者正常执行完毕。
ReentrancLock可中断。a: 设置超时方法tryLock(long timeout,TimeUnit unit)。b: lockInterruptibly()放入代码块中,调用interrupt方法可以中断。
④ 加锁是否公平
synchronized非公平锁。
ReentrancLock两者都可以,默认非公平锁,构造方法传入boolean值,true为公平锁,false为非公平锁。
⑤ 锁绑定多个条件Condiion
synchronized没有
ReentrantLock用来实现分组唤醒需要的线程,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
class shareResource{ private int number =1; private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); public void print1(){ // 同步-判断-执行-通知 lock.lock(); try { while (number !=1){ c1.await(); } for (int i = 1; i <=1 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i) ; } number =2; c2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print2(){ // 同步-判断-执行-通知 lock.lock(); try { while (number !=2){ c2.await(); } for (int i = 1; i <=2 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i) ; } number =3; c3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print3(){ // 同步-判断-执行-通知 lock.lock(); try { while (number !=3){ c3.await(); } for (int i = 1; i <=3 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i) ; } number =1; c1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class LockConditions { public static void main(String[] args) { shareResource shareResource = new shareResource(); new Thread(()->{ // 来三轮 for (int i = 0; i < 3; i++) { shareResource.print1(); } },"A").start(); new Thread(()->{ // 来三轮 for (int i = 0; i < 3; i++) { shareResource.print2(); } },"B").start(); new Thread(()->{ // 来三轮 for (int i = 0; i < 3; i++) { shareResource.print3(); } },"C").start(); } }
1.9 Callable接口
/** * * <p>The {@code Callable} interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * {@code Runnable}, however, does not return a result and cannot * throw a checked exception. */ @FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. */ V call() throws Exception; }
class MyThread implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("come in Callbale..."); try { System.out.println(Thread.currentThread().getName()+"*****"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 1024; } } public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // public FutureTask(Callable<V> callable) FutureTask<Integer> futureTask = new FutureTask(new MyThread()); new Thread(futureTask,"AA").start(); // 多个线程抢futureTask只抢一次 new Thread(futureTask,"BB").start(); System.out.println(Thread.currentThread().getName() + "****"); int result2 = 100; // 任务执行完需要时间 while (!futureTask.isDone()){} // futureTask.get() 尽量放在最后 int result = result2 + futureTask.get(); System.out.println("******result: " + futureTask.get()); } }
2.0 CountDownLatch/CyclicBarrier/Semaphore
CountDownLatch:让一些线程阻塞到另一些线程完成一系列操作后才被唤醒。
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,开始继续执行。
public class CountDownLatchDemo { private static final Integer COUNT = 6; public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(COUNT); for (int i = 1; i <= COUNT; i++) { new Thread(()-> { System.out.println(Thread.currentThread().getName() + " worker getOn" ); countDownLatch.countDown(); },WorksEnum.getWorkName(Integer.valueOf(i))).start();; } countDownLatch.await(); System.out.println("Driver drive"); } }
import lombok.Getter; public enum WorksEnum { ONE(1, "张三"), TWO(2, "李四"), THREE(3, "王五"), FOUR(4, "赵六"), FIVE(5, "孙七"), SIX(6, "刘八"); @Getter private Integer workId; @Getter private String workName; private WorksEnum(Integer workId, String workName) { this.workId = workId; this.workName = workName; } public static WorksEnum getWorksEnum(Integer workId){ WorksEnum[] works = WorksEnum.values(); if(workId ==null){ return null; } for (WorksEnum worksEnum : works) { if(workId == worksEnum.workId){ return worksEnum; } } return null; } public static String getWorkName(Integer workId){ WorksEnum[] works = WorksEnum.values(); if(workId ==null){ return null; } for (WorksEnum worksEnum : works) { if(workId == worksEnum.workId){ return worksEnum.workName; } } return null; } }
CycliBarrier:可以循环使用的屏障。让一组线程到达一个屏障(或同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{System.out.println("Boss 来开会");}); for (int i = 1; i <= 6; i++) { final int workNo = i; new Thread(() -> { System.out.println("员工" + workNo + "来会议间...等着"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } },String.valueOf(i)).start();; } } }
Semaphore:“信号量”主要用于两个目的,一是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " :抢到车位" ); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " :停了3秒后释放"); semaphore.release(); } catch (Exception e1) { e1.printStackTrace(); } }, String.valueOf(i)).start();; } } }
2.1 线程池
线程池主要工作:控制运行的线程数量,处理过程中将任务放入队列,然后再线程创建后启动这些任务,如果线程数量超过了最大核心的数量,进入线程队列等候,等其他线程执行完毕,再从队列中取出任务来执行。
特点:线程复用,控制最大并发数,管理线程。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立刻被线程池中已有的线程执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。
public class ExecutorsDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5); // 一池5个处理线程 ExecutorService executorService1 = Executors.newSingleThreadExecutor(); // 一池1个处理线程 ExecutorService executorService2 = Executors.newCachedThreadPool(); // 一池N个处理线程 try { for (int i = 0; i < 30; i++) { executorService2.execute(() ->{ System.out.println(Thread.currentThread().getName() +" 办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { executorService.shutdown(); } } }
/** * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
① 再创建了线程池后,等待提交过来的任务请求。
② 当调用execute()方法添加一个请求任务时,线程池会做如下判断:
* 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
* 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
* 如果这时候队列满了且正在运行的线程数量还小于maxinumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
* 如果队列满了且正在运行的线程数量大于或等于maxinumPoolSize,那么线程池会启动饱和拒绝策略来执行。
③ 当一个线程完成任务时,它会从队列中取下一个任务来执行。
④ 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断;
* 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
* 所有线程池的所有任务完成后它最终会收缩到corePoolSize的大小。
http://www.imooc.com/video/5176
一、如何扩展Java并发知识
Java Memory Mode
JMM描述了Java线程如何通过内存进行交互
happens-before原则
synchronized,volatile&final
Locks &Conditon(Java1.5引入,加锁-同步通信)
Java锁机制和等待条件的高层实现
java.util.concurrent.locks
线程安全性
原子性与可见性
java.util.concurrent.atomic
synchronized&volatile
DeadLocks
多线程编程常用的交互模型
Producer-Consumer模型
Read-Write Lock模型
Future模型
Worker Thread模型
Java 1.5 中并发编程工具
java.util.concurrent
如:线程池ExecutorService;Callable & Future;BlockingQueue
推荐书籍: