zoukankan      html  css  js  c++  java
  • 多线程与并发(三)——JUC概述、一些API

    iwehdio的博客园:https://www.cnblogs.com/iwehdio/

    1、JUC概述

    • JUC相关的JDK下的包:

      • java.util.concurrent包。
      • java.util.concurrent.automic包。
      • java.util.concurrent.locks包。
      • java.util.function包。
    • 普通业务的线程代码:Thread。

    • Runnable接口实现:没有返回值,效率没有Callable高。

    • 进程:一个程序或程序的集合,一个进程往往包含多个线程。

    • Java可以开启线程吗?

      • 其实是不能的,Java没有权限去开启线程,只能通过native本地方法,调用底层的C++开启线程。
      • Java运行在虚拟机上,无法直接操作硬件。
    • 并发与并行:

      • 并发:多线程操作同一个资源。
        • CPU单核模拟多条线程,快速交替实现多线程。
      • 并行:多个线程同时执行。
        • CPU多核,真正的同时执行。可以使用线程池操作。
        • 获取CPU的核数Runtime.getRuntime().availableProcessors()
      • 并发编程的本质:充分利用CPU的资源。
    • 线程的状态:新生/运行/阻塞/等待/超时等待/终止。

    • wait()方法和sleep()方法的区别:

      • 来自不同的类,wait来自Object类,sleep来自Thread类。
      • (一般sleep都用java.util.concurrent下的TimeUnit下的sleep方法操作。)
      • wait会释放锁,sleep不会释放锁。
      • wait必须在同步代码块中使用,sleep可以在任何地方使用。
    • 传统的Synchronized锁解决卖票问题:

      public class Test1 {
          public static void main(String[] args) {
              Ticket ticket = new Ticket();
              //在实战中使用Ticket实现Runnable会导致高耦合,一般使用函数式编程
              new Thread(()->{
                  for (int i = 0; i < 40; i++) {
                      ticket.sale();
                  }
              }, "A").start();
              new Thread(()->{
                  for (int i = 0; i < 40; i++) {
                      ticket.sale();
                  }
              }, "B").start();
              new Thread(()->{
                  for (int i = 0; i < 40; i++) {
                      ticket.sale();
                  }
              }, "C").start();
          }
      }
      
      class Ticket{
         ReentrantLock , ReentrantReadWriteLock.ReadLock , ReentrantReadWriteLock.WriteLock  private int number = 30;
          public synchronized void sale(){
              if(number>0){
                  System.out.println(Thread.currentThread().getName() + " " + number--);
              }
          }
      }
      
    • Lock接口:

      • 实现类:可重入锁ReentrantLock ,读可重入锁锁ReentrantReadWriteLock.ReadLock,写可重入锁锁ReentrantReadWriteLock.WriteLock 。

      • 可重入锁ReentrantLock:不传入参数默认非公平锁,可以传入参数设置为公平锁还是非公平锁。

        • 公平锁:在队列中有先来后到。
        • 非公平锁:在队列中可以插队,由CPU调度。
      • 使用:

        lock.lock();	//加锁
        lock.tryLock();	//尝试获取锁,返回是否获取到了锁
        try {
            if(number>0){
                System.out.println(Thread.currentThread().getName() + " " + number--);
            }
        } finally {
            lock.unlock();	//解锁
        }
        
      • Lock锁和Synchronized锁的区别:

        • Synchronized是内置的关键字,Lock是一个接口。
        • Synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁。
        • Synchronized会自动释放锁,Lock锁必须手动释放(否则会死锁)。
        • Synchronized在线程1阻塞后线程2会一直等待,Lock锁可以判断能否获取锁,不会一直等待。
        • Synchronized是可重入锁、不可以中断的、非公平的,Lock是可重入锁、可以判断锁、可以设置是否公平锁(Lock锁的灵活度更高)。
        • Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码。
    • 生产者的消费者问题(线程通信):

      • 流程:业务 -> 判断 -> 执行 -> 唤醒。

      • Synchronized版:

        public class Test3 {
            public static void main(String[] args) {
                Source source = new Source();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        try {
                            source.increment();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },"A").start();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        try {
                            source.decrement();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },"B").start();
            }
        }
        
        class Source{
            private int number = 0;
        
            public synchronized void increment() throws InterruptedException {
                if(number!=0){
                    this.wait();
                }
                number++;
                System.out.println(Thread.currentThread().getName() + "->" + number);
                this.notifyAll();
            }
        
            public synchronized void decrement() throws InterruptedException {
                if(number==0){
                    this.wait();
                }
                number--;
                System.out.println(Thread.currentThread().getName() + "->" + number);
                this.notifyAll();
            }
        }
        
      • 存在的问题:

        • 如果存在多个生产者和消费者,需要将判断if改为循环while,否则会虚假唤醒。
        • 比如线程A和C都是加1,number为1时,A和C都在判断中阻塞,被唤醒后,A先加1,此时虽然number已经是1了,但是C还是会加1。
      • Lock版:

        • java.util.concurrent.locks下的Condination接口,可以用await方法代替wait方法,signalAll方法代替notifyAll。
        public class Test4 {
            public static void main(String[] args) {
                Source1 source = new Source1();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        try {
                            source.increment();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },"A").start();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        try {
                            source.decrement();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },"B").start();
            }
        }
        
        class Source1{
            private int number = 0;
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            public void increment() throws InterruptedException {
                lock.lock();
                try {
                    if(number!=0){
                        condition.await();
                    }
                    number++;
                    System.out.println(Thread.currentThread().getName() + "->" + number);
                    condition.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        
            public void decrement() throws InterruptedException {
                lock.lock();
                try {
                    if(number==0){
                        condition.await();
                    }
                    number--;
                    System.out.println(Thread.currentThread().getName() + "->" + number);
                    condition.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }
        
        • Condination接口的优势:可以精准的通知和唤醒线程。
        • 通过设置多个同步监视器,Condition可以实现精准的通知唤醒某个线程。
        //线程A通知线程B运行,线程B通知线程C运行,线程C通知线程A运行
        public class Test5 {
            public static void main(String[] args) {
                Source2 source = new Source2();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        source.printA();
                    }
                },"A").start();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        source.printB();
                    }
                },"B").start();
                new Thread(()->{
                    for (int i = 0; i < 10; i++) {
                        source.printC();
                    }
                },"C").start();
            }
        }
        
        class Source2{
            private int num = 0;
            private Lock lock = new ReentrantLock();
            private Condition condition1 = lock.newCondition();
            private Condition condition2 = lock.newCondition();
            private Condition condition3 = lock.newCondition();
        
            public void printA(){
                lock.lock();
                try {
                    if(num!=0){
                        condition1.await();
                    }
                    System.out.println(Thread.currentThread().getName());
                    num = 1;
                    condition2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
            public void printB(){
                lock.lock();
                try {
                    if(num!=1){
                        condition2.await();
                    }
                    System.out.println(Thread.currentThread().getName());
                    num = 2;
                    condition3.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
            public void printC(){
                lock.lock();
                try {
                    if(num!=2){
                        condition3.await();
                    }
                    System.out.println(Thread.currentThread().getName());
                    num = 0;
                    condition1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
        
    • 锁是什么,如何判断锁的是谁?

      • 锁只会锁两个东西:new出来的实例对象,对象的class字节码。
      • 普通方法synchronized,锁的是方法的调用者(实例化的对象)。
      • 如果是静态方法synchronized,锁的是Class类模板。
      • 锁实例对象和锁Class类模板不冲突,是两个锁。
      • 非同步方法不会受锁的影响。
    • Callable接口:

      • 相比Runnable接口,它可以有返回值,而且可以抛出异常。run方法变为了call方法。
      • 带一个泛型参数,是call方法的返回值。
      • 使用thread.start启动线程:new Thread(new Futuretask<V>(Callable<V> c))。
      • 获取Callable的返回值:futureTask.get()。线程有缓存,而且get方法可能会产生阻塞。
    • 读写锁:

      • JUC包下的ReadWriteLock接口,实现类ReentrantReadWriteLock。
      • 读可以被多线程同时读,但写的时候只能有一个线程去写。也不能读写同时操作。
      • 更加细粒度的控制,可以提高效率。
      • 可以用writeLock()方法和readLock()方法创建读锁和写锁,创建出的是Lock锁对象。
      • 读锁就是一种共享锁,写锁就是一种独占锁。
      public class Test9 {
          public static void main(String[] args) {
              Cache cache = new Cache();
              for (int i = 0; i < 5; i++) {
                  final int temp = i;
                  new Thread(()->{
                      cache.write(temp+"",temp+"");
                  },String.valueOf(i)).start();
              }
              for (int i = 0; i < 5; i++) {
                  final int temp = i;
                  new Thread(()->{
                      cache.read(temp+"");
                  },String.valueOf(i)).start();
              }
          }
      }
      
      class Cache {
          private volatile Map<String,Object> map = new HashMap<>();
          private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
      
          public void read(String key){
              Lock lock = readWriteLock.readLock();
              lock.lock();
              try {
                  System.out.println(Thread.currentThread().getName()+"读取"+key);
                  Object o = map.get(key);
              } finally {
                  lock.unlock();
                  System.out.println(Thread.currentThread().getName()+"读取完成");
              }
          }
      
          public void write(String key, Object o){
              Lock lock = readWriteLock.writeLock();
              lock.lock();
              try {
                  System.out.println(Thread.currentThread().getName()+"写入"+key);
                  map.put(key,o);
              } finally {
                  lock.unlock();
                  System.out.println(Thread.currentThread().getName()+"写入完成");
              }
          }
      }
      

    2、集合类不安全

    • 多条线程对ArrayList进行add操作:

      • 使用ArrayList,在并发下,资源是不安全的。会产生并发修改异常ConcurrentModificationexception。
      • 解决1:ArrayList是线程不安全的,Vector是线程安全的(其实就是add方法加了synchronized)。
      • 解决2:使用Collection工具类下的synchronizedList(new ArrayList<>())
      • 解决3:也可以使用CopyOnWriteArrayList,是写入时复制(COW思想,是计算机程序设计领域的一种优化策略)。
      • 写入时复制是指,在写入之前先将原有的数据复制出来一份,在复制出来的这份数据上进行修改,修改完后再返回新复制的这份数据。避免数据覆盖。
      • CopyOnWriteArrayList比Vector好在哪:没有使用synchronized,使用Lock锁,效率更高。
    • 多条线程对HashSet进行add操作:

      • 与List出现的问题类似。
      • 解决1:使用Collection工具类下的synchronizedSet(new HashSet<>())
      • 解决2:使用CopyOnWriteArraySet
      • HashSet的底层就是HashMap,set.add方法本质就是map.put了一个key。
    • 多条线程对HashMap进行add操作:

      • HashMap的构造有多个重载,除了空参以外还有加载因子和初始化容量的构造。在实际情况下都是用带参构造。
      • 在并发下也会出现并发修改异常。
      • 解决1:使用Collection工具类下的synchronizedMap(new HashMap<>())
      • 解决2:使用ConCurrentHashMap

    3、常用的辅助类

    • CountDownLatch类:

      • 构造方法可以传入计数器的初始值,使用countDown()方法每次减1。
      • 等待计数器归零,然后再向下执行await()方法。
      • 其实就是一个减法计数器。
      public class Test7 {
          public static void main(String[] args) {
              CountDownLatch countDownLatch = new CountDownLatch(5);
              for (int i = 0; i < 7; i++) {
                  new Thread(()->{
                      System.out.println(Thread.currentThread().getName());
                      countDownLatch.countDown();
                  },String.valueOf(i)).start();
              }
              try {
                  countDownLatch.await();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("减到0");
      
          }
      }
      
    • CyclicBarrier类:

      • 其实就是一个加法计数器。
      • 有两个构造方法,其中一个只需要传入计数器的目标值;另一个不但需要初始值,还需要传入一个达到目标值后执行的线程。
      • await()方法在计数器达到目标值之前等待。
      public class Test7 {
          public static void main(String[] args) {
              CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
                  System.out.println("达到5");
              });
              for (int i = 0; i < 7; i++) {
                  new Thread(()->{
                      System.out.println(Thread.currentThread().getName());
                      try {
                          cyclicBarrier.await();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  },String.valueOf(i)).start();
              }
          }
      }
      
    • Semaphore类:

      • 构造方法传入的是线程数量,可以理解为停车位,限流控制数量。
      • acquire()方法获得车位,假设已经满了,线程就需要等待,等待其他线程被释放。
      • release()方法离开车位,会将当前信号量的释放,然后唤醒等待的线程。
      • 可以用于多个资源互斥的作用,并发限流,控制最大的线程数。
      public class Test8 {
          public static void main(String[] args) {
              Semaphore semaphore = new Semaphore(3);
              for (int i = 0; i < 6; i++) {
                  final int temp = i;
                  new Thread(()->{
                      try {
                          semaphore.acquire();
                          System.out.println(temp+"抢到车位");
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } finally {
                          semaphore.release();
                          System.out.println(temp+"离开车位");
                      }
                  }).start();
              }
          }
      }
      

    4、阻塞队列

    • 阻塞队列:

      • 是一个队列结构,有写入和读取操作,即入队和出队。
      • 对于写入操作,如果队列满了,就必须阻塞等待。
      • 对于读取操作,如果队列是空的,必须阻塞生产。
      • JUC包下的BlockingQueue接口,继承于Collection的子接口Queue,实现类包括ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。
      • 此外,Queue还有实现类双端队列Deque和非阻塞队列AbstractQueue。
    • 什么情况下使用阻塞队列:多线程并发处理,线程池。

    • 阻塞队列的四组API:

      方式 抛出异常 不抛出异常,有返回值 阻塞等待 超时等待
      添加 add offer put offer
      移除 remove poll take poll
      检测队首元素 element peek - -
      • 不抛出异常,有返回值的API,如果添加/移除失败会返回false/null,不会抛出异常。
      • 阻塞等待是指一直阻塞,等待超时是指超过指定时间后不再阻塞而是退出。
      • 超时等待的方法,可以传入参数超时时间和时间单位。
    • SynchronizedQueue同步队列:

      • 容量只有1,进去一个元素,必须等待取出来之后才能再往里面放一个元素。
      • 和其他的阻塞队列不同,同步队列相当于不存储元素,put一个元素必须先take取出来,否则不能再put进去元素。

    iwehdio的博客园:https://www.cnblogs.com/iwehdio/
    来源与结束于否定之否定。
  • 相关阅读:
    如何用UE(UltraEdit)删除重复行?--转
    spring源码分析之<context:component-scan/>vs<annotation-config/>
    annotation-config vs component-scan – Spring Core--转
    spring源码分析之spring注解@Aspect是如何工作的?
    Spring之LoadTimeWeaver——一个需求引发的思考---转
    Linux上的free命令详解
    MVC/MVP/MVVM区别——MVVM就是angular,视图和数据双向绑定
    elasticsearch如何安全重启节点
    ES等待任务——是master节点上的task任务
    1. 批量梯度下降法BGD 2. 随机梯度下降法SGD 3. 小批量梯度下降法MBGD
  • 原文地址:https://www.cnblogs.com/iwehdio/p/13616492.html
Copyright © 2011-2022 走看看