zoukankan      html  css  js  c++  java
  • Java 多线程学习扩展

    一、Java内存模型与线程

    由于计算机的存储设配与处理器的运算速度有几个数量级的差距,所以加入高速缓存来作为内存与处理器之间的缓冲。

    Java虚拟机规范定义一种Java内存模型来屏蔽各种硬件和操作系统的内存访问差异。

     1.1 Java Memery Model即JMM

    ① JMM规定所有变量规定了所有的变量都存在主内存中。

    ② 每条线程还有自己的工作内存,线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,不能直接读写主内存的变量。

    ③ 不同线程无法访问对方的工作内存,线程间变量值的传递需要通过主内存来实现。

    1.2 对volatile 型变量的特殊规则

      关键字volatile是Java虚拟机提供的最轻量级的同步机制。

      ①  可见性

        当一条线程修改了这个变量的值,新值对于其他线程来说可以立刻得知。

        使用场景:主线程中,某个用volatile修饰的变量-如是否配置信息加载完成,定义成boolean,初始值是false,线程A负责加载配置信息,加载完后,它会把false改成true,而线程B采用while循环,一但加载完配置信息的标记从false,改成true,线程B就立马可以感知,线程B就可以进行相应的初始化工作。

      ② 非原子性

        在多个线程同时拷贝主内存的变量到自己的工作内存中,对副本进行修改,之后各自向主内存写入操作的时候,出现写覆盖,写丢失。

      ③ 禁止指令重排

        (Double Check Lock)双端检锁-单例模式

    public class Singleton {
        /**
         *  volatile:禁止指令重排
         *  
         *  new Singleton():
         *  指令①:分配内存地址
         *  指令②:初始化
         *  指令③ :指针指向初始化后的变量
         */
        private volatile static Singleton instance;
        
        private Singleton() {
            System.out.println("no args construtor...");
        }
        
        /**
         * double check lock
         */
        public static Singleton getInstance() {
            if (instance == null) {
                synchronized (Singleton.class) {
                    if (instance == null) {
                        instance = new Singleton();
                    }
                }
            }
            return instance;
        }
    }

    上述代码中,如果instance没有用volatile修饰,那么在高并发的情况下,有可能出现指令③和指令②重排,对于其他线程来说,由于被synchronized锁住代码块,但是当前拿到锁的线程已经有了指针指向变量(实际上还没有初始化),其他线程直接用这个引用操作的时候,就会发生异常。因为这个指针的引用指向的对象还没有完成初始化。

    1.3 Java Memery Model 规范

      ① 可见性:当一个线程修改了共享变量的值,其他线程能够立刻得知这个变量的修改。

      volatile的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。

      ② 原子性:Java给用户开发了synchronized关键字(底层实现monitorenter 与monitorexit)

      ③ 有序性

        volatile与synchroniezed(一个变量在同一时刻只允许一条线程来对其进行lock操作)

        

    1.4 状态转化

      一个线程在任意时间点,只能有且只有其中一种

      ① 先建New:创建后尚未启动的线程处于这种状态

      ② 运行Runable:可能正在执行,或等待CPU为他分配执行时间。

      ③ 无限期等待Waiting:处于这种状态的线程不会被分配CPU执行时间,它需要被其他线程唤醒。

      ④ 限期等待Timed Waiting:一段时间后,自动唤醒。

      ⑤ 阻塞Blocked:线程等待进入同步代码块时,进入这种状态。

      ⑥ 结束Terminated:已终止的线程状态。

    1.5 CAS(比较并交换)-常用的原子基本类型如AtomicInteger

    CAS(Compare-And-Swap),它是一条CPU并发原语,原语的执行必须是连续的,在执行过程中不允许被中断,不会造成所谓的数据不一致问题。

    源码:

        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         */
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
        /**
         * Atomically increments by one the current value.
         */
        public final int getAndIncrement() {
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }

     unsafe类:

      是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过native方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部可以像C的指针一样直接操作内存,Java中CAS操作的执行依赖于Unsafe类的方法。

        /**
         *  Unsafe.class:getAndAddInt
         *  如果工作内存中预期的值与主内存中一致,那么就把更新后的值(var5 +var4)写入主内存中,否则继续循环(线程自旋)
         *  compareAndSwapInt 返回 boolean    
         */
        
        public final int getAndAddInt(Object var1,long var2,int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1,var2);
            }while (!this.compareAndSwapInt(var1,var2,var5,var5 + var4))
            return var5;
        }

    CAS缺点:

      ① ABA问题--解决(时间戳原则引用)--AtomicStampReference<>()。

      ② 循环开销时间很大

      ③ 只能保证一个共享变量的原子操作 

    1.6  锁

      ① 公平锁和非公平锁

        公平锁:指多个线程按照申请锁的顺序来获取锁,并发环境,每个线程在获取锁时先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有,否则就会加入到等待队列中,以后按照FIFO从规则中获取自己。

        非公平锁:获取锁的顺序不是按照申请锁的顺序,可能后申请的线程比先申请的线程优先获得锁,在高并发的情况下,可能造成优先级反转或者饥饿现象。

      ② 可重入锁(又名递归锁):线程可以进入任何一个它已经拥有的锁所同步的代码块。 

      ③ 独占锁/共享锁

      ④ 自旋锁 

    1.7 阻塞队列 BlockQueue

     

      传统1:

    // 资源类
    class ShareData {
        private int number = 0;

      public void increment() throws InterruptedException { synchronized (this){ // 判断 while (number !=0){ this.wait(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + " " +number); // 唤醒 this.notifyAll(); } } public void decrement() throws InterruptedException { synchronized (this){ // 判断 while (number ==0){ this.wait(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + " " +number); // 唤醒 this.notifyAll(); } } } /** * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮. * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒通知 */ public class Prod_Consumer_BlockQueueDemo { public static void main(String[] args) { } }

    传统2:

    // 资源类
    class ShareData {
        private int number = 0;
        private Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
    
        public void increment() throws InterruptedException {
            lock.lock();
            try {
                // 判断
                while (number != 0) {
                    //等待,不能生产
                    condition.await();
                }
                // 干活
                number++;
                System.out.println(Thread.currentThread().getName() + "	 " + number);
                // 通知
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void decrement() throws InterruptedException {
            lock.lock();
            try {
                // 判断
                while (number == 0) {
                    //等待,不能消费
                    condition.await();
                }
                // 干活
                number--;
                System.out.println(Thread.currentThread().getName() + "	 " + number);
                // 通知
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    /**
     * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮.
     *
     * 1 线程  操作  资源类
     * 2 判断  干活  通知
     * 3 防止虚假唤醒通知
     */
    public class Prod_Consumer_BlockQueueDemo {
        public static void main(String[] args) {
            ShareData shareData = new ShareData();
    
            new Thread(() -> {
                for (int i = 0; i < 5; i++) {
                    try {
                        shareData.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "AA").start();
    
    
            new Thread(() -> {
                for (int i = 0; i < 5; i++) {
                    try {
                        shareData.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "BB").start();
        }
    }

     1.8 synchronized 与Lock区别

      ①. 原始构成

      synchronized是关键字属于JVM层面:monitorenter、monitorexit(底层是通过monitor对象来完成,其实wait/notify等方法也是依赖于monitor对象只有在同步块或者方法中才能调用)

       Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁

      ② 使用方法

      synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用。(即使执行发生异常也会释放锁,毕竟有两个monitorexit)

      ReetrantLock则需要用户手动释放锁,如果没有主动释放,就可能导致死锁的存在。lock.lock  try{...}....finally {lock.unlock}

      ③ 等待是否中断

      synchronized不可中断,除非抛出异常或者正常执行完毕。

      ReentrancLock可中断。a: 设置超时方法tryLock(long timeout,TimeUnit unit)。b: lockInterruptibly()放入代码块中,调用interrupt方法可以中断。

      ④ 加锁是否公平

      synchronized非公平锁。

      ReentrancLock两者都可以,默认非公平锁,构造方法传入boolean值,true为公平锁,false为非公平锁。

      ⑤ 锁绑定多个条件Condiion

      synchronized没有

      ReentrantLock用来实现分组唤醒需要的线程,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

    class shareResource{
        private int number =1;
        private Lock lock = new ReentrantLock();
        private Condition c1 = lock.newCondition();
        private Condition c2 = lock.newCondition();
        private Condition c3 = lock.newCondition();
    
        public void print1(){
            // 同步-判断-执行-通知
            lock.lock();
            try {
                while (number !=1){
                    c1.await();
                }
                for (int i = 1; i <=1 ; i++) {
                    System.out.println(Thread.currentThread().getName() + "	 " + i) ;
                }
                number =2;
                c2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void print2(){
            // 同步-判断-执行-通知
            lock.lock();
            try {
                while (number !=2){
                    c2.await();
                }
                for (int i = 1; i <=2 ; i++) {
                    System.out.println(Thread.currentThread().getName() + "	 " + i) ;
                }
                number =3;
                c3.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void print3(){
            // 同步-判断-执行-通知
            lock.lock();
            try {
                while (number !=3){
                    c3.await();
                }
                for (int i = 1; i <=3 ; i++) {
                    System.out.println(Thread.currentThread().getName() + "	 " + i) ;
                }
                number =1;
                c1.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    public class LockConditions {
        public static void main(String[] args) {
            shareResource shareResource = new shareResource();
    
            new Thread(()->{
                // 来三轮
                for (int i = 0; i < 3; i++) {
                    shareResource.print1();
                }
            },"A").start();
            new Thread(()->{
                // 来三轮
                for (int i = 0; i < 3; i++) {
                    shareResource.print2();
                }
            },"B").start();
            new Thread(()->{
                // 来三轮
                for (int i = 0; i < 3; i++) {
                    shareResource.print3();
                }
            },"C").start();
        }
    }

    1.9 Callable接口

    /**
     *
     * <p>The {@code Callable} interface is similar to {@link
     * java.lang.Runnable}, in that both are designed for classes whose
     * instances are potentially executed by another thread.  A
     * {@code Runnable}, however, does not return a result and cannot
     * throw a checked exception.
     */
    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         */
        V call() throws Exception;
    }

    class MyThread implements Callable<Integer>{
    
        @Override
        public Integer call() throws Exception {
            System.out.println("come in Callbale...");
            try {
                System.out.println(Thread.currentThread().getName()+"*****");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1024;
        }
    }
    public class CallableDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // public FutureTask(Callable<V> callable)
            FutureTask<Integer> futureTask = new FutureTask(new MyThread());
            new Thread(futureTask,"AA").start();
            // 多个线程抢futureTask只抢一次
            new Thread(futureTask,"BB").start();
    
            System.out.println(Thread.currentThread().getName() + "****");
            int result2 = 100;
            // 任务执行完需要时间
            while (!futureTask.isDone()){}
    
            //  futureTask.get() 尽量放在最后
            int result = result2 + futureTask.get();
            System.out.println("******result: " + futureTask.get());
        }
    }

    2.0 CountDownLatch/CyclicBarrier/Semaphore

    CountDownLatch:让一些线程阻塞到另一些线程完成一系列操作后才被唤醒。

    CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,开始继续执行。

    public class CountDownLatchDemo {
        private static final Integer COUNT = 6;
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(COUNT);
            for (int i = 1; i <= COUNT; i++) {
                new Thread(()-> {
                    System.out.println(Thread.currentThread().getName() + " worker getOn" );
                    countDownLatch.countDown();
                },WorksEnum.getWorkName(Integer.valueOf(i))).start();;
            }
            countDownLatch.await();
            System.out.println("Driver drive");
        }
    }
    import lombok.Getter;
    
    public enum WorksEnum {
        ONE(1, "张三"), TWO(2, "李四"), THREE(3, "王五"), FOUR(4, "赵六"), FIVE(5, "孙七"), SIX(6, "刘八");
        
        @Getter
        private Integer workId;
        
        @Getter
        private String workName;
        
        private WorksEnum(Integer workId, String workName) {
            this.workId = workId;
            this.workName = workName;
        }
        
        public static WorksEnum getWorksEnum(Integer workId){
            WorksEnum[] works = WorksEnum.values();
            if(workId ==null){
                return null;
            }
            for (WorksEnum worksEnum : works) {
                if(workId == worksEnum.workId){
                    return worksEnum;
                }
            }
            return null;
        }
        
        public static String getWorkName(Integer workId){
            WorksEnum[] works = WorksEnum.values();
            if(workId ==null){
                return null;
            }
            for (WorksEnum worksEnum : works) {
                if(workId == worksEnum.workId){
                    return worksEnum.workName;
                }
            }
            return null;
        }
    }

     CycliBarrier:可以循环使用的屏障。让一组线程到达一个屏障(或同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{System.out.println("Boss 来开会");});
            for (int i = 1; i <= 6; i++) {
                final int workNo = i;
                new Thread(() -> {
                    System.out.println("员工" + workNo + "来会议间...等着");
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },String.valueOf(i)).start();;
            }
        }
    }

    Semaphore:“信号量”主要用于两个目的,一是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

    public class SemaphoreDemo {
        public static void main(String[] args)  {
            Semaphore semaphore = new Semaphore(3);
            for (int i = 1; i <= 6; i++) {
                new Thread(() -> {
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + " :抢到车位" );
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + " :停了3秒后释放");
                        semaphore.release();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }, String.valueOf(i)).start();;
            }
        }
    }

    2.1 线程池

      线程池主要工作:控制运行的线程数量,处理过程中将任务放入队列,然后再线程创建后启动这些任务,如果线程数量超过了最大核心的数量,进入线程队列等候,等其他线程执行完毕,再从队列中取出任务来执行。

      特点:线程复用,控制最大并发数,管理线程。

      第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

      第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立刻被线程池中已有的线程执行。

      第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

      

    public class ExecutorsDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5); // 一池5个处理线程
            ExecutorService executorService1 = Executors.newSingleThreadExecutor(); // 一池1个处理线程
            ExecutorService executorService2 = Executors.newCachedThreadPool(); // 一池N个处理线程
    
            try {
                for (int i = 0; i < 30; i++) {
                    executorService2.execute(() ->{
                        System.out.println(Thread.currentThread().getName() +"	 办理业务");
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                executorService.shutdown();
            }
        }
    }
        /**
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {

      ① 再创建了线程池后,等待提交过来的任务请求。

      ② 当调用execute()方法添加一个请求任务时,线程池会做如下判断:

        * 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

        * 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;

        * 如果这时候队列满了且正在运行的线程数量还小于maxinumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

        * 如果队列满了且正在运行的线程数量大于或等于maxinumPoolSize,那么线程池会启动饱和拒绝策略来执行。

      ③ 当一个线程完成任务时,它会从队列中取下一个任务来执行。

      ④ 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断;

        * 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。

        * 所有线程池的所有任务完成后它最终会收缩到corePoolSize的大小。

      

       

    http://www.imooc.com/video/5176

    一、如何扩展Java并发知识

    Java Memory Mode

      JMM描述了Java线程如何通过内存进行交互

      happens-before原则

      synchronized,volatile&final

    Locks &Conditon(Java1.5引入,加锁-同步通信)

      Java锁机制和等待条件的高层实现

      java.util.concurrent.locks

    线程安全性

      原子性与可见性

      java.util.concurrent.atomic

      synchronized&volatile

      DeadLocks

    多线程编程常用的交互模型

      Producer-Consumer模型

      Read-Write Lock模型

      Future模型

      Worker Thread模型

    Java 1.5 中并发编程工具

      java.util.concurrent

      如:线程池ExecutorService;Callable & Future;BlockingQueue

    推荐书籍:

  • 相关阅读:
    spark 查看 job history 日志
    Kafka集群安装
    spark总体概况
    hadoop distcp使用
    基于spark1.3.1的spark-sql实战-02
    HiveServer2 入门使用
    基于spark1.3.1的spark-sql实战-01
    Hive基础学习文档和入门教程
    HDFS HA与QJM(Quorum Journal Manager)介绍及官网内容整理
    Akka DEMO
  • 原文地址:https://www.cnblogs.com/gzhcsu/p/7674894.html
Copyright © 2011-2022 走看看