zoukankan      html  css  js  c++  java
  • java基础-多线程(+整理的思维导图)

    本文内容主要整理转载自廖雪峰的官方网站

    12 多线程

    整理的思维导图,github下载地址)

    image-20200426215828269

    多线程是Java最基本的一种并发模型,本章我们将详细介绍Java多线程编程。

    12.1 多线程基础

    操作系统执行多任务实际上就是让CPU对多个任务轮流交替执行。

    例如,让浏览器执行0.001秒,让QQ执行0.001秒,再让音乐播放器执行0.001秒,在人看来,CPU就是在同时执行多个任务。

    进程

    • 一个任务称为一个进程,浏览器就是一个进程,视频播放器是另一个进程,类似的,音乐播放器和Word都是进程。

    • 某些进程内部还需要同时执行多个子任务。
      例如,我们在使用Word时,Word可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行打印,
      我们把子任务称为线程。

    进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个线程。

    image-20200415232635257

    实现多任务的方法

    因为同一个应用程序,既可以有多个进程,也可以有多个线程,因此,实现多任务的方法,有以下几种:

    • 多进程模式(每个进程只有一个线程):

    image-20200415232902433

    • 多线程模式(一个进程有多个线程):

    image-20200415232923951

    • 多进程+多线程模式(复杂度最高):

    image-20200415232944176

    多任务:进程VS线程

    和多线程相比,多进程的缺点在于:

    • 创建进程比创建线程开销大,尤其是在Windows系统上;
    • 进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。

    而多进程的优点在于:

    • 多进程稳定性比多线程高,因为在多进程的情况下,一个进程崩溃不会影响其他进程,

    • 在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃。

    多线程特点

    Java语言内置了多线程支持:
    一个Java程序实际上是一个JVM进程,
    JVM进程用一个主线程来执行main()方法,
    main()方法内部,我们又可以启动多个线程。
    此外,JVM还有负责垃圾回收的其他工作线程等。

    因此,对于大多数Java程序来说,我们说多任务,实际上是说如何使用多线程实现多任务。

    和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步
    例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。

    Java多线程编程的特点又在于:

    • 多线程模型是Java程序最基本的并发模型;
    • 后续读写网络、数据库、Web开发等都依赖Java多线程模型。

    因此,必须掌握Java多线程编程才能继续深入学习其他内容。

    12.2 创建新线程

    Java语言内置了多线程支持。
    当Java程序启动的时候,实际上是启动了一个JVM进程,
    然后,JVM启动主线程来执行main()方法。
    main()方法中,我们又可以启动其他线程。

    方法一:

    Thread派生一个自定义类,然后覆写run()方法:

    start()方法会在内部自动调用实例的run()方法。

    // 多线程 
    public class Main {
        public static void main(String[] args) {
            Thread t = new MyThread();
            t.start(); // 启动新线程
        }
    }
    
    class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("start new thread!");
        }
    }
    

    方法二:

    创建Thread实例时,传入一个Runnable实例:

    public class Main {
        public static void main(String[] args) {
            Thread t = new Thread(new MyRunnable());
            t.start(); // 启动新线程
        }
    }
    
    class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("start new thread!");
        }
    }
    

    或者用Java8引入的lambda语法进一步简写为:

    // 多线程 
    public class Main {
        public static void main(String[] args) {
            Thread t = new Thread(() -> {
                System.out.println("start new thread!");
            });
            t.start(); // 启动新线程
        }
    }
    

    线程打印 VS main()打印

    image-20200415235102645

    蓝色表示主线程,也就是main线程,

    main线程执行的代码有4行,
    首先打印main start
    然后创建Thread对象,
    紧接着调用start()启动新线程。

    start()方法被调用时,JVM就创建了一个新线程,
    我们通过实例变量t来表示这个新线程对象,并开始执行。

    接着,main线程继续执行打印main end语句,
    t线程在main线程执行的同时会并发执行,打印thread runthread end语句。

    run()方法结束时,新线程就结束了。而main()方法结束时,主线程也结束了。

    我们再来看线程的执行顺序:

    1. main线程肯定是先打印main start,再打印main end
    2. t线程肯定是先打印thread run,再打印thread end

    但是,除了可以肯定,main start会先打印外,main end打印在thread run之前、thread end之后或者之间,都无法确定。
    t线程开始运行以后,两个线程就开始同时运行了,并且由操作系统调度,程序本身无法确定线程的调度顺序。

    模拟并发执行的效果

    sleep()传入的参数是毫秒。调整暂停时间的大小,我们可以看到main线程和t线程执行的先后顺序。

    public class Main {
        public static void main(String[] args) {
            
            System.out.println("main start...");
            
            Thread t = new Thread() {
                public void run() {
                    System.out.println("thread run...");
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {}
                    System.out.println("thread end.");
                }
            };
            
            t.start();
            
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {}
            
            System.out.println("main end...");
        }
    }
    

    要特别注意:直接调用Thread实例的run()方法是无效的:

    必须调用Thread实例的start()方法才能启动新线程

    线程的优先级

    可以对线程设定优先级,设定优先级的方法是:

    Thread.setPriority(int n) // 1~10, 默认值5
    

    优先级高的线程被操作系统调度的优先级较高,操作系统对高优先级线程可能调度更频繁,
    但我们决不能通过设置优先级来确保高优先级的线程一定会先执行。

    小结

    • Java用Thread对象表示一个线程,通过调用start()启动一个新线程;
    • 一个线程对象只能调用一次start()方法;
    • 线程的执行代码写在run()方法中;
    • 线程调度由操作系统决定,程序本身无法决定调度顺序;
    • Thread.sleep()可以把当前线程暂停一段时间。

    12.3 线程的状态

    在Java程序中,一个线程对象只能调用一次start()方法启动新线程,并在新线程中执行run()方法。
    一旦run()方法执行完毕,线程就结束了。因此,Java线程的状态有以下几种:

    • New:新创建的线程,尚未执行;
    • Runnable:运行中的线程,正在执行run()方法的Java代码;
    • Blocked:运行中的线程,因为某些操作被阻塞而挂起;
    • Waiting:运行中的线程,因为某些操作在等待中;
    • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;
    • Terminated:线程已终止,因为run()方法执行完毕。

    状态转移图

    用一个状态转移图表示如下:

    image-20200416000603153

    当线程启动后,它可以在RunnableBlockedWaitingTimed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止。

    线程终止的原因有:

    • 线程正常终止:run()方法执行到return语句返回;
    • 线程意外终止:run()方法因为未捕获的异常导致线程终止;
    • 线程强制终止:对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

    线程等待

    一个线程还可以等待另一个线程直到其运行结束。例如,main线程在启动t线程后,可以通过t.join()等待t线程结束后再继续运行:
    join就是指等待该线程结束,然后才继续往下执行自身线程。)

    // 多线程 
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            Thread t = new Thread(() -> {
                System.out.println("hello");
            });
            System.out.println("start");
            t.start();
            t.join();
            System.out.println("end");
        }
    }
    
    start
    hello
    end
    

    main线程对线程对象t调用join()方法时,主线程将等待变量t表示的线程运行结束。

    如果t线程已经结束,对实例t调用join()会立刻返回。
    此外,join(long)的重载方法也可以指定一个等待时间,超过等待时间后就不再继续等待。

    小结

    • Java线程对象Thread的状态包括:NewRunnableBlockedWaitingTimed WaitingTerminated
    • 通过对另一个线程对象调用join()方法可以等待其执行结束;
    • 可以指定等待时间,超过等待时间线程仍然没有结束就不再等待;
    • 对已经运行结束的线程调用join()方法会立刻返回。

    12.4 中断线程

    常规中断

    如果线程需要执行一个长时间任务,就可能需要能中断线程。中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行。

    我们举个栗子:假设从网络下载一个100M的文件,如果网速很慢,用户等得不耐烦,就可能在下载过程中点“取消”,这时,程序就需要中断下载线程的执行。

    中断一个线程非常简单,只需要在其他线程中对目标线程调用interrupt()方法,目标线程需要反复检测自身状态是否是interrupted状态,如果是,就立刻结束运行。

    我们还是看示例代码:

    // 中断线程
    public class Main {
        public static void main(String[] args) throws InterruptedException {
    
            Thread t = new MyThread();
    
            t.start();
    
            Thread.sleep(1); // 暂停1毫秒
    
            t.interrupt(); // 中断t线程
    
            t.join(); // 等待t线程结束
    
            System.out.println("end");
        }
    }
    
    class MyThread extends Thread {
        public void run() {
            int n = 0;
            while (! isInterrupted()) {
                n ++;
                System.out.println(n + " hello!");
            }
        }
    }
    
    //每次执行的结果是不一样的,不能确定1ms以内会执行上述代码多少次
    1 hello!
    end
    

    main线程通过调用t.interrupt()方法中断t线程,
    但要注意,interrupt()方法仅仅向t线程发出了“中断请求”,至于t线程是否能立刻响应,要看具体代码。
    t线程的while循环会检测isInterrupted(),所以上述代码能正确响应interrupt()请求,使得自身立刻结束运行run()方法。

    等待状态线程

    如果线程处于等待状态,例如,t.join()main线程进入等待状态
    此时,如果对main线程中的t线程调用interrupt(),那么t线程的join()方法会立刻抛出InterruptedException

    因此,目标线程(也就是这里的t线程)只要捕获到join()方法抛出的InterruptedException,就说明有其他线程对其调用了interrupt()方法(在这里是指,在main线程中对t线程调用了interrupt方法),通常情况下该线程应该立刻结束运行。

    我们来看下面的示例代码:

    // 中断线程 
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            Thread t = new MyThread();
            t.start();
            Thread.sleep(1000);//t线程开始后,将主线程暂停1000ms,让t线程执行
            t.interrupt(); // 中断t线程(提出中断请求)
            t.join(); // 等待t线程结束
            System.out.println("end");
        }
    }
    
    class MyThread extends Thread {
        public void run() {
            Thread hello = new HelloThread();
            hello.start(); // 启动hello线程
            try {
                hello.join(); // 等待hello线程结束
                //主线程结束1000ms暂停后,执行t中断的时候,t线程在这里等待中
                //在上一级对t线程执行interrupt,
                //那么这里的 hello.join()方法会立刻抛出InterruptedException异常
            } catch (InterruptedException e) {
                System.out.println("interrupted!");
            }
            hello.interrupt();
        }
    }
    
    class HelloThread extends Thread {
        public void run() {
            int n = 0;
            while (!isInterrupted()) {
                n++;
                System.out.println(n + " hello!");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    
    1 hello!
    2 hello!
    3 hello!
    4 hello!
    5 hello!
    6 hello!
    7 hello!
    8 hello!
    9 hello!
    10 hello!
    interrupted!
    end
    

    main线程通过调用t.interrupt()从而通知t线程中断,而此时t线程正位于hello.join()的等待中,此方法会立刻结束等待并抛出InterruptedException

    由于我们在t线程中捕获了InterruptedException,因此,就可以准备结束该线程。

    t线程结束前,对hello线程也进行了interrupt()调用通知其中断。

    如果去掉这一行代码,可以发现hello线程仍然会继续运行,且JVM不会退出。

    设置中断标志位

    另一个常用的中断线程的方法是设置标志位。我们通常会用一个running标志位来标识线程是否应该继续运行,在外部线程中,通过把HelloThread.running置为false,就可以让线程结束:

    // 中断线程 
    public class Main {
        public static void main(String[] args)  throws InterruptedException {
            HelloThread t = new HelloThread();
            t.start();
            Thread.sleep(1);
            t.running = false; // 标志位置为false
        }
    }
    
    class HelloThread extends Thread {
        public volatile boolean running = true;
        public void run() {
            int n = 0;
            while (running) {
                n ++;
                System.out.println(n + " hello!");
            }
            System.out.println("end!");
        }
    }
    
    1 hello!
    end!
    

    注意到HelloThread的标志位boolean running是一个线程间共享的变量。线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值。

    volatile关键字

    为什么要对线程间共享的变量用关键字volatile声明?这涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

    image-20200416165809735

    这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
    例如,主内存的变量a = true,线程1执行a = false时,
    它在此刻仅仅是把变量a的副本变成了false,主内存的变量a还是true
    在JVM把修改后的a回写到主内存之前,其他线程读取到的a的值仍然是true
    这就造成了多线程之间共享的变量不一致。

    因此,volatile关键字的目的是告诉虚拟机:

    • 每次访问变量时,总是获取主内存的最新值;
    • 每次修改变量后,立刻回写到主内存。

    volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

    如果我们去掉volatile关键字,运行上述程序,发现效果和带volatile差不多,这是因为在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

    小结

    • 对目标线程调用interrupt()方法可以请求中断一个线程,目标线程通过检测isInterrupted()标志获取自身是否已中断。如果目标线程处于等待状态,该线程会捕获到InterruptedException
    • 目标线程检测到isInterrupted()true或者捕获了InterruptedException都应该立刻结束自身线程;
    • 通过标志位判断需要正确使用volatile关键字;
    • volatile关键字解决了共享变量在线程间的可见性问题。

    12.5 守护线程

    有一种线程的目的就是无限循环,例如,一个定时触发任务的线程:

    class TimerThread extends Thread {
        @Override
        public void run() {
            while (true) {
                System.out.println(LocalTime.now());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    

    如果这个线程不结束,JVM进程就无法结束。问题是,由谁负责结束这个线程?

    然而这类线程经常没有负责人来负责结束它们。但是,当其他线程结束时,JVM进程又必须要结束,怎么办?

    答案是使用守护线程(Daemon Thread)。

    守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

    因此,JVM退出时,不必关心守护线程是否已结束。

    如何创建守护线程呢?方法和普通线程一样,只是在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程:

    Thread t = new MyThread();
    t.setDaemon(true);
    t.start();
    

    在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

    小结

    • 守护线程是为其他线程服务的线程;
    • 所有非守护线程都执行完毕后,虚拟机退出;
    • 守护线程不能持有需要关闭的资源(如打开文件等)。

    12.6 线程同步-synchronized

    同步的本质就是给指定对象加锁,加锁后才能继续执行后续代码

    当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。

    这个时候,有个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题。

    多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:
    image-20200423110745778

    通过加锁和解锁的操作,就能保证3条指令总是在一个线程执行期间,不会有其他线程会进入此指令区间。
    即使在执行期线程被操作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。
    只有执行线程将锁释放后,其他线程才有机会获得锁并执行。
    这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。

    实现加锁

    概括一下如何使用synchronized

    1. 找出修改共享变量的线程代码块;
    2. 选择一个共享实例作为锁;
    3. 使用synchronized(lockObject) { ... }
    public class Main {
        public static void main(String[] args) throws Exception {
            var add = new AddThread();
            var dec = new DecThread();
            add.start();
            dec.start();
            add.join();
            dec.join();
            System.out.println(Counter.count);
        }
    }
    
    class Counter {
        public static final Object lock = new Object();
        public static int count = 0;
    }
    
    class AddThread extends Thread {
        public void run() {
            for (int i=0; i<10000; i++) {
                synchronized(Counter.lock) {
                    Counter.count += 1;
                }
            }
        }
    }
    
    class DecThread extends Thread {
        public void run() {
            for (int i=0; i<10000; i++) {
                synchronized(Counter.lock) {
                    Counter.count -= 1;
                }
            }
        }
    }
    

    小结

    • 多线程同时读写共享变量时,会造成逻辑错误,因此需要通过synchronized同步;
    • 同步的本质就是给指定对象加锁,加锁后才能继续执行后续代码;
    • 注意加锁对象必须是同一个实例;
    • 对JVM定义的单个原子操作不需要同步。

    12.7 同步方法-封装synchronized

    使得类里面的"方法"进行同步

    使用synchronized的时候,锁住的是哪个对象非常重要。
    好的方法是把synchronized逻辑封装起来。例如,我们编写一个计数器如下:

    例:封装Counter

    public class Counter {
        private int count = 0;
    
        public void add(int n) {
            synchronized(this) {
                count += n;
            }
        }
    
        public void dec(int n) {
            synchronized(this) {
                count -= n;
            }
        }
    
        public int get() {
            return count;
        }
    }
    

    synchronized锁住的对象是this,即当前实例,这又使得创建多个Counter实例的时候,它们之间互不影响,可以并发执行:

    var c1 = Counter();
    var c2 = Counter();
    
    // 对c1进行操作的线程:
    new Thread(() -> {
        c1.add();
    }).start();
    new Thread(() -> {
        c1.dec();
    }).start();
    
    // 对c2进行操作的线程:
    new Thread(() -> {
        c2.add();
    }).start();
    new Thread(() -> {
        c2.dec();
    }).start();
    

    现在,对于Counter类,多线程可以正确调用。

    线程安全

    没有特殊说明时,一个类默认是非线程安全的

    如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),上面的Counter类就是线程安全的。

    线程安全:

    • Java标准库的java.lang.StringBuffer

    • 一些不变类,例如StringIntegerLocalDate
      它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

    • 最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

    非线程安全:

    • 除了上述几种少数情况,大部分类,例如ArrayList,都是非线程安全的类,
      我们不能在多线程中修改它们。
    • 但是,如果所有线程都只读取,不写入,那么ArrayList是可以安全地在线程间共享的。

    锁住this实例的写法

    当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:

    public void add(int n) {
        synchronized(this) { // 锁住this
            count += n;
        } // 解锁
    }
    
    
    public synchronized void add(int n) { // 锁住this
        count += n;
    } // 解锁
    

    因此,用synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁。

    对一个静态方法添加synchronized

    如果对一个静态方法添加synchronized修饰符,它锁住的是哪个对象?

    public synchronized static void test(int n) {
        ...
    }
    

    对于static方法,是没有this实例的,因为static方法是针对类而不是实例。
    但是我们注意到任何一个类都有一个由JVM自动创建的Class实例,因此,对static方法添加synchronized,锁住的是该类的Class实例。

    上述synchronized static方法实际上相当于:

    public class Counter {
        public static void test(int n) {
            synchronized(Counter.class) {
                ...
            }
        }
    }
    

    小结

    • synchronized修饰方法可以把整个方法变为同步代码块,synchronized方法加锁对象是this
    • 通过合理的设计和数据封装可以让一个类变为“线程安全”;
    • 一个类没有特殊说明,默认不是thread-safe;
    • 多线程能否安全访问某个非线程安全的实例,需要具体问题具体分析。

    12.8 死锁

    两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。

    Java的synchronized锁是可重入锁

    JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。

    public class Counter {
        private int count = 0;
    
        public synchronized void add(int n) {
            if (n < 0) {
                dec(-n);
            } else {
                count += n;
            }
        }
    
        public synchronized void dec(int n) {
            count += n;
        }
    }
    

    观察synchronized修饰的add()方法,一旦线程执行到add()方法内部,说明它已经获取了当前实例的this锁。如果传入的n < 0,将在add()方法内部调用dec()方法。由于dec()方法也需要获取this

    死锁例子

    两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁

    public void add(int m) {
        synchronized(lockA) { // 获得lockA的锁
            this.value += m;
            synchronized(lockB) { // 获得lockB的锁
                this.another += m;
            } // 释放lockB的锁
        } // 释放lockA的锁
    }
    
    public void dec(int m) {
        synchronized(lockB) { // 获得lockB的锁
            this.another -= m;
            synchronized(lockA) { // 获得lockA的锁
                this.value -= m;
            } // 释放lockA的锁
        } // 释放lockB的锁
    }
    

    避免死锁

    线程获取锁的顺序要一致。两个线程要严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

    public void dec(int m) {
        synchronized(lockA) { // 获得lockA的锁
            this.value -= m;
            synchronized(lockB) { // 获得lockB的锁
                this.another -= m;
            } // 释放lockB的锁
        } // 释放lockA的锁
    }
    

    小结

    • Java的synchronized锁是可重入锁;
    • 死锁产生的条件是多线程各自持有不同的锁,并互相试图获取对方已持有的锁,导致无限等待;
    • 避免死锁的方法是多线程获取锁的顺序要一致。

    12.9 多线程协调: wait 和 notify

    在Java程序中,synchronized解决了多线程竞争的问题。例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁:

    import java.util.*;
    
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            //新建TaskQueue实例对象
            var q = new TaskQueue();
            //存放线程的数组
            var ts = new ArrayList<Thread>();
    
            //开启并运行5个线程,每个线程都尝试打印q中的task
            for (int i=0; i<5; i++) {
                var t = new Thread() {
                    public void run() {
                        // 执行task:
                        while (true) {
                            try {
                                String s = q.getTask();
    //                            System.out.println("execute task: " + s);
                                System.out.println(this.getName() + " execute task: " + s + "
    ");
                            } catch (InterruptedException e) {
                                return;
                            }
                        }
                    }
                };
                t.start();
                ts.add(t);
            }
    
            //新建add线程,在线程中,每隔100ms,往q中添加一个String
            var add = new Thread(() -> {
                for (int i=0; i<10; i++) {
                    // 放入task:
                    String s = "t-" + Math.random();
                    System.out.println("add task: " + s);
                    q.addTask(s);
                    //这里每次往q中添加一个String后暂停100ms的原因是为了让上述5个线程中的某一个能够及时捕捉到,并打印
                    try { Thread.sleep(100); } catch(InterruptedException e) {}
                }
            });
    
            //开始执行add线程,并等待add执行完毕
            //这里是main线程等待add线程执行完毕,但是for循环创建的5个线程是在一直运行中的
            add.start();
            add.join();
    
            //主线程暂停100ms
            //目的是留出一点时间,让上述5个线程能够将q中的String全部get出来
            //然后再对所有的 提出中断请求
            Thread.sleep(100);
    
            //
            for (var t : ts) {
                t.interrupt();
            }
    
            System.out.println("main Thread ended");
        }
    }
    
    class TaskQueue {
        Queue<String> queue = new LinkedList<>();
    
        public synchronized void addTask(String s) {
            this.queue.add(s);
            this.notifyAll();
        }
    
        public synchronized String getTask() throws InterruptedException {
            while (queue.isEmpty()) {
                this.wait();
            }
            return queue.remove();
        }
    }
    
    add task: t-0.3008528971311387
    Thread-0 execute task: t-0.3008528971311387
    
    add task: t-0.8437246350254718
    Thread-4 execute task: t-0.8437246350254718
    
    add task: t-0.40109144938751884
    Thread-0 execute task: t-0.40109144938751884
    
    add task: t-0.6488781210842749
    Thread-4 execute task: t-0.6488781210842749
    
    add task: t-0.7625266753242343
    Thread-1 execute task: t-0.7625266753242343
    
    add task: t-0.9845878231829128
    Thread-4 execute task: t-0.9845878231829128
    
    add task: t-0.4077696485977367
    Thread-4 execute task: t-0.4077696485977367
    
    add task: t-0.33171418799079655
    Thread-4 execute task: t-0.33171418799079655
    
    add task: t-0.2938665331723114
    Thread-1 execute task: t-0.2938665331723114
    
    add task: t-0.6479359799196959
    Thread-4 execute task: t-0.6479359799196959
    
    main Thread ended
    
    Process finished with exit code 0
    
    

    12.10 使用ReentrantLock

    Java语言直接提供了synchronized关键字用于加锁,但这种锁
    一是很
    二是获取时必须一直等待,没有额外的尝试机制。

    java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,

    传统的synchronized代码:

    public class Counter {
        private int count;
    
        public void add(int n) {
            synchronized(this) {
                count += n;
            }
        }
    }
    

    如果用ReentrantLock替代,可以把代码改造为:

    public class Counter {
        private final Lock lock = new ReentrantLock();
        private int count;
    
        public void add(int n) {
            lock.lock();
            try {
                count += n;
            } finally {
                lock.unlock();
            }
        }
    }
    

    因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,
    ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

    顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。

    尝试获取锁

    synchronized不同的是,ReentrantLock可以尝试获取锁:

    if (lock.tryLock(1, TimeUnit.SECONDS)) {
        try {
            ...
        } finally {
            lock.unlock();
        }
    }
    

    上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

    所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

    小结

    • ReentrantLock可以替代synchronized进行同步;
    • ReentrantLock获取锁更安全;
    • 必须先获取到锁,再进入try {...}代码块,最后使用finally保证释放锁;
    • 可以使用tryLock()尝试获取锁。

    12.11 使用Condition

    使用来实现和 synchronized 类似的 wait 和 notify 功能的

    class TaskQueue {
        //引用的`Condition`对象必须从`Lock`实例的`newCondition()`返回,
        //这样才能获得一个绑定了`Lock`实例的`Condition`实例。
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();
        private Queue<String> queue = new LinkedList<>();
    
        public void addTask(String s) {
            lock.lock();
            try {
                queue.add(s);
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public String getTask() {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    condition.await();
                }
                return queue.remove();
            } finally {
                lock.unlock();
            }
        }
    }
    

    可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

    Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

    • await()会释放当前锁,进入等待状态;
    • signal()会唤醒某个等待线程;
    • signalAll()会唤醒所有等待线程;
    • 唤醒线程从await()返回后需要重新获得锁。

    此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

    if (condition.await(1, TimeUnit.SECOND)) {
        // 被其他线程唤醒
    } else {
        // 指定时间内没有被其他线程唤醒
    }
    

    可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

    小结

    • Condition可以替代waitnotify
    • Condition对象必须从Lock对象获取。

    12.12 使用ReadWriteLock

    前面讲到的ReentrantLock保证了只有一个线程可以执行临界区代码:

    但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。

    实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待

    使用ReadWriteLock可以解决这个问题,它保证:

    • 只允许一个线程写入(其他线程既不能写入也不能读取);
    • 没有写入时,多个线程允许同时读(提高性能)。

    ReadWriteLock实现这个功能十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:

    public class Counter {
        private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
        private final Lock rlock = rwlock.readLock();
        private final Lock wlock = rwlock.writeLock();
        private int[] counts = new int[10];
    
        public void inc(int index) {
            wlock.lock(); // 加写锁
            try {
                counts[index] += 1;
            } finally {
                wlock.unlock(); // 释放写锁
            }
        }
    
        public int[] get() {
            rlock.lock(); // 加读锁
            try {
                return Arrays.copyOf(counts, counts.length);
            } finally {
                rlock.unlock(); // 释放读锁
            }
        }
    }
    

    把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。

    使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。

    例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock

    小结

    使用ReadWriteLock可以提高读取效率:

    • ReadWriteLock只允许一个线程写入;
    • ReadWriteLock允许多个线程在没有写入时同时读取;
    • ReadWriteLock适合读多写少的场景。

    12.13 使用StampedLock

    ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。

    他的潜在问题是:
    如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

    乐观锁

    Java 8引入了新的乐观读写锁:StampedLock

    改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

    public class Point {
        private final StampedLock stampedLock = new StampedLock();
    
        private double x;
        private double y;
    
        public void move(double deltaX, double deltaY) {
            long stamp = stampedLock.writeLock(); // 获取写锁
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                stampedLock.unlockWrite(stamp); // 释放写锁
            }
        }
    
        public double distanceFromOrigin() {
            long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
            // 注意下面两行代码不是原子操作
            // 假设x,y = (100,200)
            double currentX = x;
            // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
            double currentY = y;
            // 此处已读取到y,如果没有写入,读取是正确的(100,200)
            // 如果有写入,读取是错误的(100,400)
            if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
                stamp = stampedLock.readLock(); // 获取一个悲观读锁
                try {
                    currentX = x;
                    currentY = y;
                } finally {
                    stampedLock.unlockRead(stamp); // 释放悲观读锁
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    }
    

    ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。

    • 注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。

    • 接着进行读取,读取完成后,我们通过validate()去验证版本号,
      如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。
      如果在读取过程中有写入,版本号会发生变化,验证将失败。

    • 在失败的时候,我们再通过获取悲观读锁再次读取。
      由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

    可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

    StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

    tryOptimisticRead()返回的是版本号,不是锁,根本没有锁

    后面validate()就是为了验证在这段时间内版本号变了没,如果没变,那就没有写入

    版本号就是个long

    readLock()才返回真正的读锁,必须finally中unlock

    小结

    • StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;
    • StampedLock是不可重入锁。

    12.14 使用Concurrent集合

    针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

    interface non-thread-safe thread-safe
    List ArrayList CopyOnWriteArrayList
    Map HashMap ConcurrentHashMap
    Set HashSet / TreeSet CopyOnWriteArraySet
    Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
    Deque ArrayDeque / LinkedList LinkedBlockingDeque

    使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

    Map<String, String> map = new ConcurrentHashMap<>();
    // 在不同的线程读写:
    map.put("A", "1");
    map.put("B", "2");
    map.get("A", "1");
    

    因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

    Map<String, String> map = new HashMap<>();
    

    改为:

    Map<String, String> map = new ConcurrentHashMap<>();
    

    小结

    • 使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程:
    • 多线程同时读写并发集合是安全的;
    • 尽量使用Java标准库提供的并发集合,避免自己编写同步代码。

    12.15 使用Atomic

    Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。

    我们以AtomicInteger为例,它提供的主要操作有:

    • 增加值并返回新值:int addAndGet(int delta)
    • 加1后返回新值:int incrementAndGet()
    • 获取当前值:int get()
    • 用CAS方式设置:int compareAndSet(int expect, int update)

    Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

    利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

    class IdGenerator {
        AtomicLong var = new AtomicLong(0);
    
        public long getNextId() {
            return var.incrementAndGet();
        }
    }
    

    通常情况下,我们并不需要直接用do ... while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。

    在高度竞争的情况下,还可以使用Java 8提供的LongAdderLongAccumulator

    小结

    使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

    • 原子操作实现了无锁的线程安全;
    • 适用于计数器,累加器等。

    12.16 使用线程池

    线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

    Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

    // 创建固定大小的线程池:
    ExecutorService executor = Executors.newFixedThreadPool(3);
    // 提交任务:
    executor.submit(task1);
    executor.submit(task2);
    executor.submit(task3);
    executor.submit(task4);
    executor.submit(task5);
    

    因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

    • FixedThreadPool:线程数固定的线程池;
    • CachedThreadPool:线程数根据任务动态调整的线程池;
    • SingleThreadExecutor:仅单线程执行的线程池。

    创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

    提交任务的时候只需要实现runnable接口

    // thread-pool 
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) {
            // 创建一个固定大小的线程池:
            ExecutorService es = Executors.newFixedThreadPool(4);
            for (int i = 0; i < 6; i++) {
                es.submit(new Task("" + i));
            }
            // 关闭线程池:
            es.shutdown();
        }
    }
    
    class Task implements Runnable {
        private final String name;
    
        public Task(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            System.out.println("start task " + name);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("end task " + name);
        }
    }
    
    start task 0
    start task 2
    start task 3
    start task 1
    end task 0
    end task 2
    end task 1
    end task 3
    start task 4
    start task 5
    end task 4
    end task 5
    

    观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

    线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

    如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。

    ScheduledThreadPool

    还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。

    创建一个ScheduledThreadPool仍然是通过Executors类:

    ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
    

    我们可以提交一次性任务,它会在指定延迟后只执行一次:

    // 1秒后执行一次性任务:
    ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
    

    如果任务以固定的每3秒执行,我们可以这样写:

    // 2秒后开始执行定时任务,每3秒执行:
    ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
    

    如果任务以固定的3秒为间隔执行,我们可以这样写:

    // 2秒后开始执行定时任务,以3秒为间隔执行:
    ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
    

    注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

    │░░░░   │░░░░░░ │░░░    │░░░░░  │░░░  
    ├───────┼───────┼───────┼───────┼────>
    │<─────>│<─────>│<─────>│<─────>│
    

    而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

    │░░░│       │░░░░░│       │░░│       │░
    └───┼───────┼─────┼───────┼──┼───────┼──>
        │<─────>│     │<─────>│  │<─────>│
    

    因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

    小结

    JDK提供了ExecutorService实现了线程池功能:

    • 线程池内部维护一组线程,可以高效执行大量小任务;
    • Executors提供了静态方法创建不同类型的ExecutorService
    • 必须调用shutdown()关闭ExecutorService
    • ScheduledThreadPool可以定期调度多个任务。

    12.17 使用Future

    12.18 使用ForkJoin

  • 相关阅读:
    如何获取SQL Server数据库连接字符串的某些部分
    .NET同步原语Barrier简介
    模版引擎RazorEngine简介
    如何使用SQL Server实现SignalR的横向扩展
    SignalR的客户端.NET Client介绍
    一个简单的SignalR例子
    看视频学SignalR—在微软虚拟学院学习SignalR
    看视频学Bootstrap—在微软虚拟学院学习Bootstrap
    C# 窗口与控件的相关操作
    opencv——常见的操作
  • 原文地址:https://www.cnblogs.com/tangg/p/12782802.html
Copyright © 2011-2022 走看看