zoukankan      html  css  js  c++  java
  • JUC并发学习笔记

    JUC并发

    什么是JUC

    JUC就是java.util .concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

    业务:普通的线程代码,Thread。

    Runable 没有返回值,效率比 Callable 低。

    进程和线程

    进程:操作系统中运行的程序就是线程,一个进程可以包含多个线程,是系统资源分配的的单位。Java默认线程:main线程,GC线程。

    线程:系统运算调度的单位,是进程中实际的运作单位。

    Java可以开启线程吗?

    Thread部分源码:

    public synchronized void start() {
       
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
    
        group.add(this);
    
        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
    
    private native void start0();
    

    Java是不能开启线程的,start调用了本地方法,底层是C++。

    并发,并行

    并发:多个线程操作一个资源,不是同时执行的(单核,模拟多个线程,快速交替)

    并行:多个线程同时执行(多核下)

    package com.zr.demo1;
    
    public class Test01 {
        public static void main(String[] args) {
            //获取cpu核心数
            System.out.println(Runtime.getRuntime().availableProcessors());
        }
    }
    

    并发编程的本质:充分利用cpu的资源。

    线程的状态

    线程的六个状态:可从源码中得到

    Thread部分源码

    public enum State {
      
        //创建
        NEW,
    
       //运行
        RUNNABLE,
    
       //阻塞
        BLOCKED,
    
        //等待
        WAITING,
    
        //超时等待
        TIMED_WAITING,
    
        //终止
        TERMINATED;
    }
    

    wait/sleep的区别

    wait:Object类下的。会释放锁。只能在同步代码块中使用。不需要捕获异常。

    sleep:Thread类下的。不会释放锁。可以在任何地方使用。必须要捕获异常。

    Lock(锁)

    lock

    package com.zr.demo1;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class SaleTicketDemo02 {
        public static void main(String[] args) {
    
            Ticket2 ticket2 = new Ticket2();
    
            new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"A").start();
    
            new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"B").start();
    
            new Thread(()->{ for (int i = 0; i < 40; i++) ticket2.sale(); },"C").start();
        }
    }
    //资源类  OOP
    class Ticket2{
        //属性  方法
        private int number = 30;
    
        Lock lock = new ReentrantLock();
    
        public void sale(){
    
            lock.lock();  //加锁
    
            try {  //业务代码
                if (number>0){
                    System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票,剩余"+number+"张");
                }
            } catch (Exception e) {
                lock.unlock();  //解锁
            }
        }
    }
    

    Synchronized / Lock区别

    1. Synchronized 是Java关键字,Lock是Java一个类。
    2. Synchronized 无法获取锁的状态,Lock可以判断释放获取到了锁。
    3. Synchronized 会自动释放锁,Lock必须手动释放锁,如果不释放锁,死锁。
    4. Synchronized 阻塞会死等,Lock就不一定。
    5. Synchronized 可重入锁,不可以中断,非公平锁。Lock 可重入锁,可以判断锁,默认非公平锁(可以自己设置)。
    6. Synchronized 适合少量的代码同步问题。Lock 适合大量的代码同步代码。

    生产者消费者问题

    package com.zr.pc;
    //生产者,消费者
    
    import javax.activation.DataHandler;
    
    /**
     * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
     * 线程A,B操作同一个变量
     */
    public class Test1 {
        public static void main(String[] args) {
            Data data = new Data();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
            
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
        }
    }
    
    class Data{
        private int number = 0;
    
        //+1
        public synchronized  void increment() throws InterruptedException {
            if (number!=0){
                this.wait();  //等待
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            this.notifyAll();  //通知
        }
    
        //-1
        public synchronized  void decrement() throws InterruptedException {
            if (number==0){
                this.wait();  //等待
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            this.notifyAll();  //通知
        }
    }
    

    如果再增加 两个线程C,D(一个增加,一个减少)。此时测试会发现不安全。 会同时唤醒两个等待的线程,同时加1或者减1。

    虚假唤醒,lang包下的Object类下的wait方法(API)

    应改为: if改为while防止虚假唤醒

    package com.zr.pc;
    //生产者,消费者
    
    import javax.activation.DataHandler;
    
    /**
     * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
     * 线程A,B操作同一个变量
     */
    public class Test1 {
        public static void main(String[] args) {
            Data data = new Data();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
            
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"C").start();
            
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"D").start();
        }
    }
    
    class Data{
        private int number = 0;
    
        //+1
        public synchronized  void increment() throws InterruptedException {
            while (number!=0){
                this.wait();  //等待
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            this.notifyAll();  //通知
        }
    
        //-1
        public synchronized  void decrement() throws InterruptedException {
            while (number==0){
                this.wait();  //等待
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"===>"+number);
            this.notifyAll();  //通知
        }
    }
    

    JUC版本的生产者消费者问题

    代码实现:

    package com.zr.pc;
    //生产者,消费者
    
    import javax.activation.DataHandler;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 线程之间的通信问题,生产者消费者问题,等待唤醒,通知唤醒
     * 线程A,B操作同一个变量
     */
    public class Test2 {
        public static void main(String[] args) {
            Data2 data = new Data2();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"C").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    try {
                        data.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"D").start();
        }
    }
    
    class Data2{
        private int number = 0;
    
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        //condition.await();  //等待
        //condition.signalAll();  //唤醒全部
    
        //+1
        public void increment() throws InterruptedException {
    
            lock.lock();
            try {
                while (number!=0){
                    condition.await();  //等待
                }
                number++;
                System.out.println(Thread.currentThread().getName()+"===>"+number);
                condition.signalAll();  //通知
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        //-1
        public void decrement() throws InterruptedException {
            lock.lock();
            try {
                while (number==0){
                    condition.await();  //等待
                }
                number--;
                System.out.println(Thread.currentThread().getName()+"===>"+number);
                condition.signalAll();  //通知
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    结果是随机的状态:

    此时Condition的作用就体现出来了,可以精准的通知和唤醒线程。

    测试:

    package com.zr.pc;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author Zhour  813794474@qq.com
     * A调B,B调C,C调A
     */
    public class Test3 {
        public static void main(String[] args) {
            Data3 data = new Data3();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    data.printA();
                }
            },"A").start();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    data.printB();
                }
            },"B").start();
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                   data.printC();
                }
            },"C").start();
        }
    }
    
    class Data3{  //资源类 lock
    
        private Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        Condition condition3 = lock.newCondition();
        int num = 1;
    
        public void printA(){
            lock.lock();
            try {
                //业务
                while (num!=1){
                    condition1.await();
                }
                System.out.println(Thread.currentThread().getName() + "==>AAAAA");
                num = 2;
                condition2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void printB(){
            lock.lock();
            try {
                //业务
                while (num!=2){
                    condition2.await();
                }
                System.out.println(Thread.currentThread().getName()+"==>BBBBB");
                num = 3;
                condition3.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void printC(){
            lock.lock();
            try {
                //业务
                while (num!=3){
                    condition3.await();
                }
                System.out.println(Thread.currentThread().getName()+"==>CCCCC");
                num = 1;
                condition1.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    
    

    测试结果:

    八锁现象

    如何判断锁的是谁。

    package com.zr.lock8;
    
    import java.util.concurrent.TimeUnit;
    
    public class Test1 {
        public static void main(String[] args) throws InterruptedException {
            Phone phone = new Phone();
            new Thread(()->{
                phone.sendMassage();
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1);
    
            new Thread(()->{
                phone.call();
            },"B").start();
        }
    }
    
    class Phone{
        //synchronized 锁的对象是方法的调用者
        //两个方法是一把锁,谁先拿到谁先执行
        public synchronized void sendMassage(){
            System.out.println("发短信");
        }
        public synchronized void call(){
            System.out.println("打电话");
        }
    }
    
    package com.zr.lock8;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     *没有锁的方法不受影响
     */
    public class Test2 {
        public static void main(String[] args) throws InterruptedException {
            //现在有两把锁
            Phone2 phone = new Phone2();
            Phone2 phone1 = new Phone2();
            new Thread(()->{
                phone.sendMassage();
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1);
    
            new Thread(()->{
                phone1.call();
            },"B").start();
    
    
            new Thread(()->{
                phone.hello();
            },"B").start();
        }
    }
    
    class Phone2{
        //synchronized 锁的对象是方法的调用者
        //两个方法是一把锁,谁先拿到谁先执行
        public synchronized void sendMassage(){
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("发短信");
        }
        public synchronized void call(){
            System.out.println("打电话");
        }
        //这里没有锁,不受锁的影响
        public void hello(){
            System.out.println("hello");
        }
    }
    
    package com.zr.lock8;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 静态的同步方法,锁的是phone.class!!!
     */
    
    public class Test3 {
        public static void main(String[] args) throws InterruptedException {
    
            //两个对象的class类模板只有一个,两个锁都是锁的class
            Phone3 phone = new Phone3();
            Phone3 phone1 = new Phone3();
    
            new Thread(()->{
                phone.sendMassage();
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1);
    
            new Thread(()->{
                phone1.call();
            },"B").start();
    
        }
    }
    
    class Phone3{
        //synchronized 锁的对象是方法的调用者
        //两个方法是一把锁,谁先拿到谁先执行
        //static 类一加载就有了 锁的是class
        public static synchronized void sendMassage(){
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("发短信");
        }
        public static synchronized void call(){
            System.out.println("打电话");
        }
    }
    
    package com.zr.lock8;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 一个静态同步方法,一个普通同步方法
     */
    public class Test4 {
        public static void main(String[] args) throws InterruptedException {
    
            Phone4 phone = new Phone4();
    
            new Thread(()->{
                phone.sendMassage();
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1);
    
            new Thread(()->{
                phone.call();
            },"B").start();
    
        }
    }
    
    class Phone4{
        //synchronized 锁的对象是方法的调用者
        //两个方法是一把锁,谁先拿到谁先执行
        //static 类一加载就有了 锁的是class
        //静态同步方法  锁class类模板
        public static synchronized void sendMassage(){
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("发短信");
        }
        //普通同步方法  锁的是调用者
        public synchronized void call(){
            System.out.println("打电话");
        }
    }
    

    小结

    new this 具体的一个实例

    static class 唯一的一个模板

    不安全集合

    ArrayList多线程下 add 方法不安全。

    CopyOnWriteArrayList(安全)

    package com.zr.unsafe;
    
    import java.util.*;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    //java.util.ConcurrentModificationException  使用ArrayList并发修改异常
    public class ListTest {
        public static void main(String[] args) {
            //并发下ArrayList是不安全的
            /**
             * 解决方案
             * 1.List<String> arrayList = new Vector(); 底层synchronized
             * 2.List<String> arrayList = Collections.synchronizedList(new ArrayList<>());
             * 3.List<String> arrayList = new CopyOnWriteArrayList();
             */
            //CopyOnWrite 写入并复制  底层 lock
            
            List<String> arrayList = new CopyOnWriteArrayList();
            
            for (int i = 1; i <= 10; i++) {
                new Thread(()->{
                    arrayList.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(arrayList);
                },String.valueOf(i)).start();
            }
        }
    }
    

    HashSet不安全。

    CopyOnWriteArraySet(安全)

    package com.zr.unsafe;
    
    import java.util.Collections;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.UUID;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    public class SetTest {
        public static void main(String[] args) {
            // Set<String> set = new HashSet();  不安全
            // 解决
            //1.Set set = Collections.synchronizedSet(new HashSet());
            //2. Set set = new CopyOnWriteArraySet();
    
            Set set = new CopyOnWriteArraySet();
            for (int i = 0; i < 20; i++) {
                new Thread(()->{
                    set.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(Thread.currentThread().getName()+set);
                },String.valueOf(i)).start();
            }
        }
    }
    

    HashSet底层,就是HashMap

    public HashSet() {
        map = new HashMap<>();
    }
    //add  key是无法重复的
     public boolean add(E e) {
            return map.put(e, PRESENT)==null;
        }
      private static final Object PRESENT = new Object();  //常量
    

    HashMap不安全。

    回顾map

    实现:

    package com.zr.unsafe;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class MapTest {
        public static void main(String[] args) {
            //Map<String,Object> map = new HashMap();  不安全
            //解决线程不安全
            Map<String,Object> map = new ConcurrentHashMap();
    
            for (int i = 0; i < 20; i++) {
                new Thread(()->{
                    map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
                    System.out.println(map);
                },String.valueOf(i)).start();
            }
        }
    }
    

    Callable

    1. 可以有返回值
    2. 可以抛出异常
    3. 方法不同,call()

    Runable

    FutureTask

    package com.zr.callable;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class CallableTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //new Thread().start(); 怎么启动callable
    
            myThread myThread = new myThread();
            FutureTask futureTask = new FutureTask(myThread);
            new Thread(futureTask,"A").start();
            new Thread(futureTask,"B").start();  //结果会被缓存
    
            //获取callable的返回结果
            String o = (String) futureTask.get();  //这个get方法可能会产生阻塞 放到最后一行或者使用异步通信
            System.out.println(o);
        }
    }
    class myThread implements Callable<String> {
    
        @Override
        public String call(){
            System.out.println("call");
            //如果是耗时的操作
            return "123";
        }
    }
    

    注意:

    1. 有缓存
    2. 结果可能需要等待,会阻塞

    常用的辅助类(必会)

    CountDownLatch

    测试代码

    package com.zr.add;
    
    import java.util.concurrent.CountDownLatch;
    
    //计数器
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            //总数是 6  有必须要执行完的任务时使用
            CountDownLatch countDownLatch = new CountDownLatch(5);
            for (int i = 1; i <=5; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"go out");
                    countDownLatch.countDown();  //计数器减一
                },String.valueOf(i)).start();
            }
            countDownLatch.await();  //等待计数器归0,再向下执行
            System.out.println("clsoe");
        }
    }
    

    原理:

    countDownLatch.countDown():数量减一

    countDownLatch.await():直到计数器归0后,才向下执行

    CyclicBarrier

    测试代码:(可以想象成加法计数器)

    package com.zr.add;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
                System.out.println("集齐七个了");
            });
            for (int i = 1; i <= 7; i++) {
                final int temp = i;
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"第"+temp+"个");
                    try {
                        cyclicBarrier.await();  //等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    

    Semaphore

    信号量

    测试代码

    package com.zr.add;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class SemaphoreDemo {
        public static void main(String[] args) {
            //停车位 限流 
            Semaphore semaphore = new Semaphore(3);
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    try {
                        semaphore.acquire();  //获取
                        System.out.println(Thread.currentThread().getName()+"抢到车位");
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println(Thread.currentThread().getName()+"离开车位");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();  //释放
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    

    原理:

    semaphore.acquire(); 获取,假如满了,就等待释放后再执行。

    semaphore.release(); 释放,将得到的信号量释放。

    共享资源互斥的时候使用,并发限流,控制最大线程数。

    读写锁

    测试代码

    package com.zr.rw;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
     * ReadWriteLock
     * 独占锁(写锁)一次只能被一个线程占有
     * 共享锁(读锁)多个线程可以同时占有
     */
    public class ReadWriteLockDemo {
        public static void main(String[] args) {
            MyCacheLock myCache = new MyCacheLock();
            //写入
            for (int i = 0; i < 5; i++) {
                final int temp = i;
                new Thread(()->{
                    myCache.put(temp+"",temp+"");
                },String.valueOf(i)).start();
            }
            //读取
            for (int i = 0; i < 5; i++) {
                final int temp = i;
                new Thread(()->{
                    myCache.get(temp+"");
                },String.valueOf(i)).start();
            }
        }
    }
    //加锁
    class MyCacheLock{
        private volatile Map<String,Object> map = new HashMap<>();
        //读写锁 更加细粒度的控制
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        //存
        public void put(String key,Object value){
            readWriteLock.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"写入"+key);
                map.put(key,value);
                System.out.println(Thread.currentThread().getName()+"写入ok");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.writeLock().unlock();
            }
        }
        //取
        public void get(String key){
            readWriteLock.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"读取"+key);
                Object o = map.get(key);
                System.out.println(Thread.currentThread().getName()+"读取的key是"+o);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();
            }
        }
    }
    /**
     * 自定义缓存
     */
    class MyCache{
        private volatile Map<String,Object> map = new HashMap<>();
        //存
        public void put(String key,Object value){
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入ok");
    
        }
        //取
        public void get(String key){
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取的key是"+o);
        }
    }
    

    阻塞队列

    BlockingQueue

    阻塞队列:多线程并发处理,线程池。

    四组API

    方式 抛出异常 不抛出异常,有返回值 阻塞等待 超时等待
    添加 add offer put offer
    移除 remove poll take poll
    判断队列首 element peek

    测试一:

    package com.zr.queue;
    
    import java.util.Collections;
    import java.util.concurrent.ArrayBlockingQueue;
    
    public class Test {
        public static void main(String[] args) {
            test1();
        }
        /**
         * 抛出异常
         */
        public static void test1(){
            //队列的大小
            ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
    
            System.out.println(queue.add("a"));
            System.out.println(queue.add("b"));
            System.out.println(queue.add("c"));
             System.out.println(queue.element());  //队首元素
    
            //抛出异常 java.lang.IllegalStateException: Queue full
            //System.out.println(queue.add("c"));
    
            System.out.println(queue.remove());  //FIFO
            System.out.println(queue.remove());
            System.out.println(queue.remove());
        }
    }
    

    测试二:

     /**
         * 不怕抛出异常  有返回值
         */
        public static void test2(){
            ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
    
            System.out.println(queue.offer("a"));
            System.out.println(queue.offer("b"));
            System.out.println(queue.offer("c"));
            System.out.println(queue.offer("d"));  //返回布尔值  不抛异常
    		System.out.println(queue.peek());  //队首元素
            
            System.out.println("===============");
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());  //返回null 不抛出异常
        }
    

    测试三:

    /**
     * 等待,阻塞(一直阻塞)
     */
    public static void test3() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
    
        queue.put("a");
        queue.put("b");
        queue.put("c");
        //queue.put("d");  //队列没有位置了  会一直阻塞
    
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        // System.out.println(queue.take());  //没有这个元素 也会一直阻塞
    
    }
    

    测试四:

    /**
     *等待超时(过时不候)
     */
    public static void test4() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
    
        System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        System.out.println(queue.offer("d",2, TimeUnit.SECONDS));  //超时退出
        System.out.println("==========");
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll(2,TimeUnit.SECONDS));
    }
    

    SynchronousQueue

    同步队列,没有容量,进去一个元素必须取出来,才能再放一个元素。

    package com.zr.queue;
    
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 同步队列
     * SynchronousQueue不存储元素 put进去一个值 必须toke出来 才能在put
     */
    public class SynchronousQueueDemo {
        public static void main(String[] args) {
            SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
            new Thread(()->{
    
                try {
                    System.out.println(Thread.currentThread().getName()+"put 1");
                    blockingQueue.put("1");
                    System.out.println(Thread.currentThread().getName()+"put 2");
                    blockingQueue.put("2");
                    System.out.println(Thread.currentThread().getName()+"put 3");
                    blockingQueue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"T1").start();
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+"-->"+blockingQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"T2").start();
        }
    }
    

    线程池(重点)

    池化技术!

    程序的运行,本质会占用系统的资源!优化资源的使用,统一管理分配--->池化技术 !

    线程池,连接池,内存池,对象池....

    线程池的好处:

    1. 降低资源的消耗
    2. 提高响应的速度,创建和销毁十分浪费资源
    3. 方便统一管理

    线程可以复用,控制最大并发数,统一管理!

    线程池:三大方法,七大参数,四种拒绝策略!(可参考阿里巴巴开发手册)

    测试代码

    package com.zr.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     *Executors 工具类 三大方法、
     */
    public class Demo01 {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
            // Executors.newFixedThreadPool(5);  //固定的大小
            //Executors.newCachedThreadPool();  //可伸缩的
    
            try {
                for (int i = 0; i < 10; i++) {
                    //使用了线程池之后要使用线程池来创建线程
                    threadPool.execute(()->{
                        System.out.println(Thread.currentThread().getName()+":ok!");
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //线程池使用完后要关机
                threadPool.shutdown();
            }
        }
    }
    

    七大参数

    源码分析

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    

    本质:ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小
                              int maximumPoolSize,  //最大核心线程池的大小
                              long keepAliveTime,  //最大存活时间(超时未调用释放)
                              TimeUnit unit,  //超时单位
                              BlockingQueue<Runnable> workQueue,  //阻塞队列
                              ThreadFactory threadFactory,  //线程工厂,一般不用动
                              RejectedExecutionHandler handler //拒绝策略) { 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    四种拒绝策略

    手动创建线程池

    package com.zr.pool;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * new ThreadPoolExecutor.AbortPolicy() //不处理,队列满了 抛出异常
     * new ThreadPoolExecutor.CallerRunsPolicy()  //main线程执行
     * new ThreadPoolExecutor.DiscardPolicy() //队列满了 丢掉任务 不会抛出异常
     * new ThreadPoolExecutor.DiscardOldestPolicy());  //将最老的任务丢弃,尝试提交新的任务,不会跑异常
     */
    public class Demo02 {
        //自定义创建线程池
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                    2,
                    5,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());  //将最老的任务丢弃,尝试提交新的任务,不会跑异常
            try {
                //最大承载 queue+max
                // 超出最大承载 使用拒绝处理
                for (int i = 0; i < 10; i++) {
                    //使用了线程池之后要使用线程池来创建线程
                    threadPool.execute(()->{
                        System.out.println(Thread.currentThread().getName()+":ok!");
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //线程池使用完后要关机
                threadPool.shutdown();
            }
        }
    }
    

    最大线程该如何设置

    CPU密集型:计算量大的,线程数量可设置为电脑核心数+1,Runtime.getRuntime().availableProcessors()获取电脑核心数。

    IO密集型:读写操作非常多的,线程数量可设置为电脑核心数*2。

    四大函数式接口

    现在必须掌握的:lambda表达式,链式编程,函数式接口,Stream流式计算。

    函数式接口:只有一个方法的接口。

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    //很多@FunctionalInterface
    //简化编程模型,新版本的框架底层大量应用
    //foreach(消费者类型的函数式接口)
    

    测试代码:Function

    package com.zr.function;
    
    import java.util.function.Function;
    
    /**
     * function  函数型接口
     * 有一个输入参数,有一个输出
     * 只要是函数式接口,可以用lambda表达式简化
     */
    public class Demo01 {
        public static void main(String[] args) {
            //工具类  输出出入的值
            // Function function = new Function<String,String>() {
            //     @Override
            //     public String apply(String str) {
            //         return str;
            //     }
            // };
            Function<String,String> function = (str)->{return str;};
            System.out.println(function.apply("123"));
        }
    }
    

    测试代码:断定型接口 Predicate

    package com.zr.function;
    
    import java.util.function.Predicate;
    
    /**
     * 断定型接口,有一个输入值,返回值是 布尔值
     */
    public class Demo02 {
        public static void main(String[] args) {
            //判断字符串是否为空
            // Predicate<String> predicate = new Predicate<String>() {
            //     @Override
            //     public boolean test(String str) {
            //         return str.isEmpty();
            //     }
            // };
            Predicate<String> predicate = (str)->{return str.isEmpty();};  //(str)的()可以省略
            System.out.println(predicate.test("asdf"));
        }
    }
    

    Consumer:消费型接口

    测试代码:

    package com.zr.function;
    
    import java.util.function.Consumer;
    
    /**
     * 消费型接口  只有输入 没有返回值
     */
    public class Demo03 {
        public static void main(String[] args) {
            // Consumer<String> consumer = new Consumer<String>() {
            //     @Override
            //     public void accept(String str) {
            //         System.out.println(str);
            //     }
            // };
            Consumer<String> consumer = (str)->{
                System.out.println(str);
            };
            consumer.accept("abc");
        }
    }
    

    Supplier:供给型接口

    测试代码:

    package com.zr.function;
    
    import java.util.function.Supplier;
    
    /**
     * 供给型接口 只返回 不输入
     */
    public class Demo04 {
        public static void main(String[] args) {
            // Supplier supplier = new Supplier<Integer>() {
            //     @Override
            //     public Integer get() {
            //         return 1024;
            //     }
            // };
            Supplier supplier = ()->{
                return 1024;
            };
            System.out.println(supplier.get());
        }
    }
    

    Stream流式计算

    大数据:存储+计算

    集合,数据库是来存储的。

    计算都应该交给流来计算。

    测试代码:User

    package com.zr.stream;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class User {
        private int id;
        private String name;
        private int age;
    }
    

    Test

    package com.zr.stream;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * 一行代码实现,现有五个用户,筛选
     * 1.id是偶数的
     * 2.年龄大于22
     * 3.用户名转化为大写字母
     * 4.用户名字母倒着排序
     * 5.只输出一个用户
     */
    public class Test {
        public static void main(String[] args) {
            User u1 = new User(1,"a",20);
            User u2 = new User(2,"b",21);
            User u3 = new User(3,"c",22);
            User u4 = new User(4,"d",23);
            User u5 = new User(6,"e",24);
            //转化为list
            List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
            //计算交给流处理
            // lambda表达式,链式编程,函数式接口,Stream流式计算
            list.stream()
                    .filter(u->{return u.getId()%2==0;})
                    .filter(u->{return u.getAge()>22;})
                    .map(u->{return u.getName().toUpperCase();})
                    .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                    .limit(1)
                    .forEach(System.out::println);
        }
    }
    

    ForkJoin

    分支合并

    在1.7之后出来的,并行执行任务,提高效率的!大数据量的。

    ForkJoin的特点:工作窃取。

    维护的是双端队列,A线程任务执行完后会把B线程还未执行完的任务拿过来一部分执行。

    ForkJoin操作:ForkJoinPool 通过它来执行

    测试代码:

    package com.zr.forkjoin;
    
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 求和计算的任务
     * 使用 ForkJoinPool 来执行
     * 计算任务 forkJoinPool.execute(forkJoinTask task)
     */
    public class ForkJoinDemo extends RecursiveTask<Long> {
        private long start;
        private long end;
    
        //临界值
        private long temp = 100000L;
    
        public ForkJoinDemo(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        //计算方法
        @Override
        protected Long compute() {
            long sum = 0;
            if ((end-start)<temp){
                for (long i = start; i < end; i++) {
                    sum += i;
                }
                return sum;
            }else {  //forkjoin
                long mid = (start+end)/2;  //中间值
                ForkJoinDemo task1 = new ForkJoinDemo(start, mid);
                task1.fork();  //拆分任务,把线程压入线程队列
                ForkJoinDemo task2 = new ForkJoinDemo(mid+1, end);
                task2.fork();
                return task1.join() + task2.join();
            }
        }
    }
    

    test

    package com.zr.forkjoin;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.stream.LongStream;
    
    public class Test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //test1();
            //test2();
            test3();
        }
    
        public static void test1(){
            long sum = 0;
            long start = System.currentTimeMillis();
            for (long i = 0; i < 10_0000_0000; i++) {
                sum+=i;
            }
            long end = System.currentTimeMillis();
            System.out.println("sum="+sum+"时间:"+(end-start));
        }
        //forkjoin
        public static void test2() throws ExecutionException, InterruptedException {
            long start = System.currentTimeMillis();
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinDemo task = new ForkJoinDemo(0, 10_0000_0000);
            ForkJoinTask<Long> submit = forkJoinPool.submit(task);
            Long sum = submit.get();
            long end = System.currentTimeMillis();
            System.out.println("sum="+sum+"时间:"+(end-start));
        }
        //流计算
        public static void test3(){
    
            long start = System.currentTimeMillis();
    
            long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
            long end = System.currentTimeMillis();
            System.out.println("sum="+"时间:"+(end-start));
        }
    }
    

    异步回调

    Future设计的初衷,是对将来某个时间的结果建模。

    测试代码:

    package com.zr.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 异步调用 CompletableFuture
     * 异步执行
     * 成功回调
     * 失败回调
     */
    public class Demo01 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //没有返回值的异步回调
            // CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            //     try {
            //         TimeUnit.SECONDS.sleep(2);
            //     } catch (InterruptedException e) {
            //         e.printStackTrace();
            //     }
            //     System.out.println(Thread.currentThread().getName()+"runAsync==>Void");
            // });
            // System.out.println("111");
            // future.get();
    
            //有返回值的异步回调
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread().getName()+"supplyAsync==>Integer");
                //int i = 10/0;
                return 1024;
            });
            System.out.println(future1.whenComplete((t,u)->{
                System.out.println("t=>"+t+"; u=>"+u);  //t 正常的返回结果 u 错误的信息
            }).exceptionally((e)->{
                System.out.println(e.getMessage());
                return 233;
            }).get());
        }
    }
    

    JMM

    Volatile是Java虚拟机的轻量级同步机制。

    1. 保证可见性
    2. 不保证原子性
    3. 禁止指令重排

    JMM:Java内存模型。不真实存在,概念,约定!

    关于JMM的一些同步约定:

    1. 线程加锁前:必须把主存中变量最新值读取到工作内存中
    2. 线程解锁前:必须把共享变量立即同步到主存
    3. 加锁和解锁是同一把锁

    如果线程A修改了值,线程B还在使用之前读到的值,此时就需要引入Volatile!

    内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

      • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
      • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
      • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
      • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
      • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
      • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
      • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
      • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

      JMM对这八种指令的使用,制定了如下规则:

      • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
      • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
      • 不允许一个线程将没有assign的数据从工作内存同步回主内存
      • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
      • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
      • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
      • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
      • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

    Volatile

    保证可见性

    package com.zr.testvolatile;
    
    import java.sql.Time;
    import java.util.concurrent.TimeUnit;
    
    public class JMMDemo {
        //未加volatile,while会死循环
        private static volatile int num = 0;
        public static void main(String[] args) throws InterruptedException {  //main线程
    
            new Thread(()->{  //未加volatile前,线程1对主内存的变化是不知道的
                while (num == 0){
                    
                }
            }).start();
            TimeUnit.SECONDS.sleep(1);
            num = 1;
            System.out.println(num);
        }
    }
    

    不保证原子性

    原子性,不可分割

    线程A在执行任务的时候是不能被打扰的,也不能变为分割,要么同时成功,要么同时失败。

    package com.zr.testvolatile;
    //不保证原子性
    public class Demo02 {
        //volatile不保证原子性
        private volatile static int num = 0;
        public static void main(String[] args) {
            //理论上的值应该为20000
            for (int i = 1; i <= 20; i++) {
                new Thread(()->{
                    for (int j = 1; j <= 1000; j++) {
                        add();
                    }
                }).start();
            }
            while (Thread.activeCount()>2){
                Thread.yield();
            }
            System.out.println(num);
        }
        public static void add(){
            num++;
        }
    }
    

    如果不加 Lock 或者 Synchronized 怎么保证原子性。

    cmd进入编译编译后的文件夹中,java -c Demo02.class可以查看字节码。可以看到num++不是原子性的操作。

    使用原子类,解决原子性问题。

    测试代码:AtomicInteger

    package com.zr.testvolatile;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    //不保证原子性
    public class Demo03 {
        //volatile不保证原子性
        //原子类的
        private volatile static AtomicInteger num = new AtomicInteger();
    
        public static void main(String[] args) {
            //理论上的值应该为20000
            for (int i = 1; i <= 20; i++) {
                new Thread(()->{
                    for (int j = 1; j <= 1000; j++) {
                        add();
                    }
                }).start();
            }
            while (Thread.activeCount()>2){
                Thread.yield();
            }
            System.out.println(num);
        }
        public synchronized static void add(){
            num.getAndIncrement();  //AtomicInteger的加 1 方法  CAS
        }
    }
    

    这些原子类的底层直接和操作系统挂钩!在内存中修改值。unsafe类是一个特殊的存在(CAS)!

    指令重排:自己写的程序,计算机并不会按照程序写的顺序来执行。

    源代码--->编译器优化重排--->指令并行也可能会重排--->内存系统也会重排--->执行

    处理器在指令重排的时候会考虑数据之间的依赖性。

    volatile可以避免指令重排:

    内存屏障!cpu指令,作用:

    1. 保证特定操作的执行顺序
    2. 保证某些变量的内存可见性(volatile利用这个保证了可见性)

    单例模式

    饿汉式

    package classes.com.zr.single;
    //饿汉式单例
    public class Hungry {
        //构造器私有
        private Hungry(){
    
        }
        private final static Hungry hungry = new Hungry();
    
        public static Hungry getHangry(){
            return hungry;
        }
    }
    

    懒汉式 :DCL懒汉式

    package classes.com.zr.single;
    //懒汉式单例
    public class Lazy01 {
        private Lazy01(){
            System.out.println(Thread.currentThread().getName()+"ok");
        }
        //volatile  防止创建对象时指令重排
        private volatile static Lazy01 lazy01;
    
        //不安全的单例
        // public static Lazy01 getInstance(){
        //     if (lazy01==null){
        //         lazy01 = new Lazy01();
        //     }
        //     return lazy01;
        //}
    
        //双重检查锁  DCL 懒汉式单例
        public static Lazy01 getInstance(){
            if (lazy01==null){
                synchronized (Lazy01.class){
                    if (lazy01==null){
                        lazy01 = new Lazy01();  //不是原子性操作
                        /**
                         *  1.分配内存空间
                         *  2.执行构造方法,初始化对象
                         *  3.把对象指向内存空间
                          */
                    }
                }
            }
            return lazy01;
        }
    
        public static void main(String[] args) {
            new Thread(()->{
                for (int i = 0; i < 10; i++) {
                    Lazy01.getInstance();
                }
            }).start();
        }
    }
    

    使用反射破坏:

    package classes.com.zr.single;
    
    import java.lang.reflect.Constructor;
    
    //懒汉式单例  反射破坏单例
    public class Lazy02 {
        private Lazy02(){
            synchronized (Lazy02.class){
                if (lazy02!=null){
                    throw new RuntimeException("不要使用反射来破坏单例");
                }
            }
        }
        //volatile  防止创建对象时指令重排
        private volatile static Lazy02 lazy02;
    
    
        //双重检查锁  检查 DCL 懒汉式单例
        public static Lazy02 getInstance(){
            if (lazy02==null){
                synchronized (Lazy01.class){
                    if (lazy02==null){
                        lazy02 = new Lazy02();  //不是原子性操作
                    }
                }
            }
            return lazy02;
        }
    
        public static void main(String[] args) throws Exception {
            Lazy02 instance1 = Lazy02.getInstance();
            Constructor<Lazy02> constructor = Lazy02.class.getDeclaredConstructor(null);
            constructor.setAccessible(true);  //可以访问
            Lazy02 instance2 = constructor.newInstance();  //创建实例
            System.out.println(instance1);
            System.out.println(instance2);
    
        }
    }
    

    静态内部类

    package com.zr.single;
    //静态内部类实现
    public class Holder {
        private Holder(){
    
        }
    
        public static Holder getInstance(){
            return Test.HOLDER;
        }
    
        public static class Test{
            private static final Holder HOLDER = new Holder();
        }
    }
    

    枚举:反射无法破坏枚举

    package com.zr.single;
    
    
    import java.lang.reflect.Constructor;
    
    //枚举  jdk 1.5 有的  本身也是一个类
    public enum EnumSingle {
    
        INSTANCE;
    
        public EnumSingle getInstance(){
            return INSTANCE;
        }
    }
    
    class Test{
        public static void main(String[] args) throws Exception {
             EnumSingle instance1 = EnumSingle.INSTANCE;
    
             //String.class,int.class 通过jad反编译看到构造方法是有参数的
            Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
            declaredConstructor.setAccessible(true);
            EnumSingle instance2 = declaredConstructor.newInstance();
    
            System.out.println(instance1.hashCode());
            System.out.println(instance2.hashCode());
        }
    }
    

    无法破坏枚举:

    深入理解CAS

    什么是cas:compareAndSet 比较并交换,如果期望的值达到了就更新,否则就不更新,CAS cpu的并发原语。

    package com.zr.cas;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class CASDemo {
    
        //CAS  compareAndSet 比较并交换
        public static void main(String[] args) {
            AtomicInteger atomicInteger = new AtomicInteger(2021);
    
            // public final boolean compareAndSet(int expect, int update)
            //如果期望的值达到了就更新,否则就不更新,CAS  cpu的并发原语
            System.out.println(atomicInteger.compareAndSet(2021, 2022));
            System.out.println(atomicInteger.get());
        }
    }
    

    unsafe:

    getAndIncrement:增加1

    getAndAddInt:自旋锁

    CAS缺点:自旋锁循环会耗时,一次只能保证一个共享变量的原子性,会导致ABA问题。

    CAS:ABA问题,狸猫换太子。

    A,B两个线程访问内存中的变量1,A拿到后将变量变为2,再把2变为1,此时B并不知道这个1是不是原来访问的那个1。

    package com.zr.cas;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ABADemo {
        //CAS  compareAndSet 比较并交换
        public static void main(String[] args) {
            AtomicInteger atomicInteger = new AtomicInteger(2021);
            
            // public final boolean compareAndSet(int expect, int update)
            //如果期望的值达到了就更新,否则就不更新,CAS  cpu的并发原语
            System.out.println(atomicInteger.compareAndSet(2021, 2022));
            System.out.println(atomicInteger.get());
    
            System.out.println(atomicInteger.compareAndSet(2022, 2021));
            System.out.println(atomicInteger.get());
    
            System.out.println(atomicInteger.compareAndSet(2021, 2022));
            System.out.println(atomicInteger.get());
        }
    }
    

    原子引用

    解决ABA问题:引入原子引用,对应的思想,乐观锁!

    带版本号的原子操作。

    package com.zr.cas;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicStampedReference;
    
    public class RefrenceDemo {
        //注意,如果泛型是一个包装类,注意对象引用问题!!!
        static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
    
        public static void main(String[] args) {
    
            new Thread(()->{
                int stamp = atomicStampedReference.getStamp();  //获得版本号
                System.out.println("A线程拿到的是"+stamp);
    
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(atomicStampedReference.compareAndSet(1, 2,
                        atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
                System.out.println("A2:"+atomicStampedReference.getStamp());
    
                System.out.println(atomicStampedReference.compareAndSet(2, 1,
                        atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
                System.out.println("A3:"+atomicStampedReference.getStamp());
            },"A").start();
    
            new Thread(()->{
                int stamp = atomicStampedReference.getStamp();  //获得版本号
                System.out.println("B线程拿到的是"+stamp);
    
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(atomicStampedReference.compareAndSet(1, 6,
                        stamp, stamp + 1));
                System.out.println("B2:"+atomicStampedReference.getStamp());
            },"B").start();
        }
    }
    

    AtomicStampedReference

    public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }
    

    因为 Integer 有缓存值,这里的 initialRef 测试应在-128到127之间,超过会在堆中重新创建对象,是个大坑!!

    各种锁的理解

    公平锁/非公平锁

    公平锁:非常公平,不能插队,线程讲究先来先到。

    非公平锁:非常不公平,可以插队,线程不讲究先来先到。

    Synchronzied,ReentrantLock默认是非公平的。

     public ReentrantLock() {
            sync = new NonfairSync();
        }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    可重入锁

    可重入锁(递归锁)

    Synchronized

    package com.zr.lock;
    
    import java.awt.*;
    
    //Synchronized
    public class Demo01 {
        public static void main(String[] args) {
            Phone1 phone1 = new Phone1();
            new Thread(()->{
                phone1.sms();
            },"A").start();
    
            new Thread(()->{
                phone1.sms();
            },"B").start();
        }
    }
    
    class Phone1{
        public synchronized void sms(){
            System.out.println(Thread.currentThread().getName()+"==>sms");
            call();
        }
    
        public synchronized void call(){
            System.out.println(Thread.currentThread().getName()+"==>call");
        }
    }
    

    Lock

    package com.zr.lock;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Demo02 {
        public static void main(String[] args) {
            Phone2 phone2 = new Phone2();
            new Thread(()->{
                phone2.sms();
            },"A").start();
    
            new Thread(()->{
                phone2.sms();
            },"B").start();
        }
    }
    
    class Phone2{
        Lock lock = new ReentrantLock();
    
        public synchronized void sms(){
            lock.lock();  //lock 加锁解锁必须成对出现
            try {
                System.out.println(Thread.currentThread().getName()+"==>sms");
                call();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public synchronized void call(){
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+"==>call");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    自旋锁

    Unsafe.getAndAddInt:这里就应用了自旋锁

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
        return var5;
    }
    

    测试代码:利用CAS自己定义锁

    package com.zr.lock;
    
    import java.util.concurrent.atomic.AtomicReference;
    
    //自旋锁
    public class SpinLockDemo {
    
        //Thread null
        AtomicReference<Thread> atomicReference = new AtomicReference<>();
        //加锁
        public void myLock(){
            Thread thread = Thread.currentThread();
            System.out.println(Thread.currentThread().getName()+"==>myLock");
            //自旋锁
            while(!atomicReference.compareAndSet(null,thread)){
                //System.out.println(Thread.currentThread().getName()+"自旋中...");
            }
        }
    
        //解锁
        public void myUnLock(){
            Thread thread = Thread.currentThread();
            System.out.println(Thread.currentThread().getName()+"==>myUnLock");
            atomicReference.compareAndSet(thread,null);
        }
    }
    

    测试:

    package com.zr.lock;
    
    import java.util.concurrent.TimeUnit;
    
    public class TestSpinLock {
        public static void main(String[] args) throws InterruptedException {
            SpinLockDemo lock = new SpinLockDemo();
    
            new Thread(()->{
                lock.myLock();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.myUnLock();
                }
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1);
    
            new Thread(()->{
                lock.myLock();
                try {
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.myUnLock();
                }
            },"B").start();
        }
    }
    

    死锁

    死锁:互斥,不可剥夺,请求保持,循环等待。

    测试:

    package com.zr.lock;
    
    import java.util.concurrent.TimeUnit;
    
    //模拟死锁
    public class DeadLock {
        public static void main(String[] args) {
            String lockA = "lockA";
            String lockB = "lockB";
    
            new Thread(new myThread(lockA,lockB),"T1").start();
            new Thread(new myThread(lockB,lockA),"T2").start();
        }
    }
    
    class myThread implements Runnable{
        private String lockA;
        private String lockB;
    
        public myThread(String lockA, String lockB) {
            this.lockA = lockA;
            this.lockB = lockB;
        }
    
        @Override
        public void run() {
            synchronized (lockA){
                System.out.println(Thread.currentThread().getName()+"=lock:"+lockA+"==>get "+lockB);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lockB){
                    System.out.println(Thread.currentThread().getName()+"=lock:"+lockB+"==>get "+lockA);
                }
            }
        }
    }
    

    解决问题

    1. 使用 jps -l 定位进程号

    2. jstack 进程号(24380)

    通过日志和堆栈信息排查问题!

    文字截图来自 API 文档!

  • 相关阅读:
    将某个MySQL库中的UTF8字符列都转成GBK格式
    挺苹果的声音,iPhone 5s的两处进步
    Cookie 路径在本机测试及服务器部署,在浏览器处理方式上的不同
    inner join、left join、right join中where和and的作用
    TIA Portal V12不能添加新的CPU
    Linux下可执行程序调试信息的分离及release程序的调试
    STM32.定时器
    STM32.SPI(25Q16)
    模电之运放篇
    Zigbee学习
  • 原文地址:https://www.cnblogs.com/zhou-zr/p/14698535.html
Copyright © 2011-2022 走看看