zoukankan      html  css  js  c++  java
  • Java笔记(十四) 并发基础知识

    并发基础知识

    一、线程的基本概念

    线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。

    1.创建线程

    1)继承Thread

    Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:

    public class MyThread extends Thread{
        @Override
        public void run() {
            System.out.println("thread name: " + Thread.currentThread().getName() +
                    " thread id: " + Thread.currentThread().getId());
            System.out.println("Running my thread!");
        }
    
        public static void main(String[] args) {
            MyThread thread = new MyThread();
            //启动线程
            thread.start();
            /*thread name: Thread-0 thread id: 11
            Running my thread!*/
        }
    }

    2)实现Runnable接口

    public class MyRunnable implements Runnable{
        @Override
        public void run() {
            System.out.println("thread name:" + Thread.currentThread().getName());
        }
    
        public static void main(String[] args) {
            System.out.println("Main thread name : "
                    + Thread.currentThread().getName()); //Main thread name : main
            Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0
            thread.start();
        }
    }

    2.线程的基本属性和方法

    1)id和name

    id: 一个递增整数,每创建一个线程就加一

    name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。

    2)优先级

    优先级从1到10,默认为5,相关方法:

    public final void setPriority(int newPriority)
    public final int getPriority()

    在编程中不要过分依赖优先级。

    3)状态

    线程有一个状态概念,Thread获取状态方法:

    public State getState()

    返回值类型为Thread.State,它是一个枚举值:

    public enum State {
    NEW, //线程还没调用start
    //调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,
    //不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在
    //等待操作系统分配时间片,只是它在等待其他条件。
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED; //线程已经结束运行
    }

    Thread还有一个方法,返回线程是否alive:

    //线程被启动后,run方法运行结束前,返回值都是true
    public final native boolean isAlive()

    4.是否是daemon线程 

    一般情况下整个程序只有在所有的线程都结束后,线程才会退出。

    而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。

    public final void setDaemon(boolean on)
    public final boolean isDaemon()

    5.sleep方法

    Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。

    public static native void sleep(long millis) throws InterruptedException;

    在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。

    睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。

    6.yield方法 

    public static native void yield();

    调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。

    不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。

    7.join方法 

    在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。

    Thread有一个join方法,可以让调用join的线程等待该线程的结束。

    public final void join() throws InterruptedException
    //限定等待的最长时间
    public final synchronized void join(long millis) throws InterruptedException
    MyThread thread = new MyThread();
    thread.start();
    //让main线程在子线程调用结束后再退出,相当于阻塞了main线程
    thread.join();

    3.共享内存及可能出现的问题

    虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,

    但线程之间可以共享内存,它们可以访问和操作相同的对象。

    public class ShareMemoryDemo {
        private static int shared = 0;
        private static void incrShared() {
            shared ++;
        }
        static class ChildThread extends Thread {
            List<String> list;
            public ChildThread(List<String> list) {
                this.list = list;
            }
            @Override
            public void run() {
                incrShared();
                list.add(Thread.currentThread().getName());
            }
        }
        public static void main(String[] args) throws InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            ChildThread t1 = new ChildThread(list);
            ChildThread t2 = new ChildThread(list);
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(shared);
            System.out.println(list);
            /*2
            [Thread-0, Thread-1]*/
        }
    }

    ChildThread的run方法访问了共享变量shared和list。

    执行流、内存和程序代码之间的关系:

    1)不同的执行流可以访问和操作相同的变量

    2)不同的执行流可以执行相同的程序代码

    所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的

    3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中

    的参数和局部变量都有自己的一份。

    当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。

    1.竞态条件 

    所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序

    有关,可能正确也可能不正确。

    public class CounterThread extends Thread{
        private static int counter = 0;
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                counter++;
            }
        }
        public static void main(String[] args) throws InterruptedException {
            int num = 1000;
            Thread[] threads = new Thread[num];
            for (int i = 0; i< num; i++) {
                threads[i] = new CounterThread();
                threads[i].start();
            }
            for (int i = 0; i < 1000; i++) {
                threads[i].join();
            }
            System.out.println(counter); //998400
        }
    }

    期望结果应该是10000000,实际结果为998400,为什么呢?

    因为counter++这个操作不是原子操作,它分为了3个步骤:

    1)去counter的当前值

    2)在当前值上加1

    3)将新值重新赋值给counter

    两个线程可能同时执行第一步,取到了相同的counter值。

    2.内存的可见性 

    多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,

    另一个线程不一定能马上见到,甚至永远见不到。

    public class VisibilityDemo {
        private static boolean shutdown = false;
        static class MyThread extends Thread {
            @Override
            public void run() {
                while (!shutdown) {
                    //do nothing
                }
                System.out.println("exit myThread");
            }
        }
        public static void main(String[] args) throws InterruptedException {
            new MyThread().start();
            Thread.sleep(1000);
            shutdown = true;
            System.out.println("exit main!");
        }
    }

    期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,

    也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。

    这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及

    各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当

    修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这

    一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对

    内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。

    二、理解synchronized(同步)

    1.用法和基本原理

    synchronized可以用于:

    1)实例方法:

    public class Counter {
        private int count;
        public synchronized void incre() {
            count ++;
        }
        public synchronized int getCount() {
            return count;
        }
    }
    public class CounterThread extends Thread {
        private Counter counter;
        public CounterThread(Counter counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                counter.incre();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            int num = 1000;
            Counter counter = new Counter();
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new CounterThread(counter);
                threads[i].start();
                threads[i].join();
            }
            System.out.println(counter.getCount());
        }
    }

    看上去,synchronized使得同时只能有一个线程执行实例方法,

    但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,

    只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个

    对象方法的调用,确保同时只能有一个线程执行。大致过程如下:

    1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;

    2)执行实例方法代码;

    3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒

    哪一个是不一定的,不保证公平性。

    当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.

    注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。

    再次强调:synchronized保护的是对象而非代码,只要访问的是同一个对象的synchronized方法,

    即使是不同的代码,也会被同步顺序访问。比如,Counter中的两个实例方法,incre和getCount,

    对于同一个Counter对象,一个线程想去执行incre,另一个线程想执行getCount,它们是不能同时执行的,

    会被synchronized同步顺序执行。

    2)静态方法 

    synchronized保护的是类对象,而非实例对象(实际上,每个对象都有一个锁和等待队列,类对象也不例外)。

    public class StaticCounter {
        private static int count = 0;
        public static void incr() {
            synchronized (StaticCounter.class) {
                count ++;
            }
        }
        public static int getCount() {
            synchronized (StaticCounter.class) {
                return count;
            }
        }
    }

    3)代码块 

    public class Counter {
        private int count;
        public void incre() {
            //synchronized括号里面的就是保护对象{}里就是同步执行代码
            synchronized (this) {
                count ++;
            }
        }
        public synchronized int getCount() {
            synchronized (this) {
                return count;
            }
        }
    }
    public class Counter {
        private int count;
        private Object obj = new Object();
        public void incre() {
            //synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列
            synchronized (obj) {
                count ++;
            }
        }
        public synchronized int getCount() {
            synchronized (obj) {
                return count;
            }
        }
    }

    2.进一步理解synchronized

    1)可重入性 

    对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

    比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。

    2)内存可见性 

    synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,

    在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取最新数据。

    不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。

    加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。

    3)死锁 

    死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,

    a、b陷入了互相等待,最后谁都执行不下去。

    public class DeadLockDemo {
        private static Object lockA = new Object();
        private static Object lockB = new Object();
        public static void startThreadA() {
            Thread t = new Thread(() -> {
                synchronized (lockA) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (lockB) {}
                }
            });
            t.start();
        }
        public static void startThreadB() {
            Thread t = new Thread(() -> {
                synchronized (lockB) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (lockA) {}
                }
            });
            t.start();
        }
        public static void main(String[] args) {
            startThreadA();
            startThreadA();
        }
    }

    如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,

    如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

    3.同步容器

    Collections类中有一些方法,可以返回线程安全的同步容器:

    public static <T> Collection<T> synchronizedCollection(Collection<T> c)
    public static <T> List<T> synchronizedList(List<T> list)
    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)

    它们是给所有容器方法加上synchronized来实现安全的,例如:

    static class SynchronizedCollection<E> implements Collection<E> {
        final Collection<E> c; //Backing Collection
        final Object mutex; //Object on which to synchronize
        SynchronizedCollection(Collection<E> c) {
            if(c==null)
                throw new NullPointerException();
            this.c = c;
            mutex = this;
        }

    public int size() { synchronized (mutex) {return c.size();} } public boolean add(E e) { synchronized (mutex) {return c.add(e);} } public boolean remove(Object o) { synchronized (mutex) {return c.remove(o);} } // }

    这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,  

    不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是

    的我们还需要注意以下情况:

    1)复合操作 

    public class EnhancedMap <K, V> {
        Map<K, V> map;
        public EnhancedMap(Map<K,V> map){
            this.map = Collections.synchronizedMap(map);
        }
        public V putIfAbsent(K key, V value){
            V old = map.get(key);
            if(old!=null){
                return old;
            }
            return map.put(key, value);
        }
        public V put(K key, V value){
            return map.put(key, value);
        }
    //
    }

    其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,

    这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程

    执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。

    2.伪同步 

    那么给该方法加上synchronized就线程安全了吗?

    public synchronized V putIfAbsent(K key, V value){
        V old = map.get(key);
        if(old!=null){
            return old;
        }
        return map.put(key, value);
    }

    答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,

    而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须

    使用相同的锁。所以可以改为:

    public V putIfAbsent(K key, V value){
        synchronized(map){
            V old = map.get(key);
            if(old!=null){
                return old;
            }
            return map.put(key, value);
        }
    }

    3.迭代 

    对于同步容器对象,虽然单个操作是安全的,但迭代并不是:

    public class DeadLockDemo {
    
        private static void startModifyThread(final List<String> list) {
            Thread modifyThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i = 0; i < 100; i++) {
                        list.add("item " + i);
                        try {
                            Thread.sleep((int) (Math.random() * 10));
                        } catch (InterruptedException e) {
                        }
                    }
                }
            });
            modifyThread.start();
        }
    
        private static void startIteratorThread(final List<String> list) {
            Thread iteratorThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        for(String str : list) {
                        }
                    }
                }
            });
            iteratorThread.start();
        }
    
        public static void main(String[] args) {
            final List<String> list = Collections
                    .synchronizedList(new ArrayList<String>());
            startIteratorThread(list);
            startModifyThread(list);
        }
        /*Exception in thread "Thread-0" java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34)
        at java.lang.Thread.run(Thread.java:748)*/
    }

    如果在遍历的同时容器发生了结构性变化就会抛出该异常。

    可以改为(在遍历的时候给整个容器对象加锁):

    private static void startIteratorThread2(final List<String> list) {
        Thread iteratorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(list){
                        for(String str : list) {
                        }
                    }
                }
            }
        });
        iteratorThread.start();
    }

    二、线程的基本协作机制

    一)协作的场景

    1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到

    队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,

    在队列为空的时候,消费者需要等待。

    2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。

    3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他

    任务之前需要等待每个子任务执行完毕。

    4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装

    为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。

    5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程

    到齐后,交还数据和计算结果,再进行下一次迭代。

    二、wait/notify

    在Java根父类中定义了一些线程协作的基本方法,这些方法

    分为两类:wait和notify。wait主要有两个方法:

    public final void wait() throws InterruptedException
    //单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0)
    public final native void wait(long timeout) throws InterruptedException;

    每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,

    会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了

    用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列

    用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件

    队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由

    它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。

    //notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒
    public final native void notify();
    //notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒
    public final native void notifyAll();

    简单的协作示例:

    public class WaitThread extends Thread{
        private volatile boolean fire = false;
        @Override
        public void run() {
            try {
                synchronized(this) {
                    while (!fire) {
                        wait();
                    }
                }
                System.out.println("fired!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public synchronized void fire() {
            this.fire = true;
            notify();
        }
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被

    synchronized保护。实际上,wait/notify方法只能在synchronized代码

    块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。

    看看另一种情况:

    public class WaitThread extends Thread{
        private volatile boolean fire = false;
        private volatile int count = 0;
        @Override
        public void run() {
            try {
                synchronized(this) {
                    while (!fire) {
                        sleep(1000);
                        System.out.println(++count);
                    }
                }
                System.out.println("fired");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //实例对象的锁不会被释放,该方法永远不会被执行
        public synchronized void fire() {
            System.out.println("fire it"); 
            this.fire = true;
        }
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程

    怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?

    它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用

    wait时,线程会释放对象锁。wait的具体过程是:

    1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。

    2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

    如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。

    否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。

    线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般

    调用模式是:

    synchronized (obj) {
    while(条件不成立)
    obj.wait();
    …//执行条件满足后的操作
    }

    比如上例中的代码:

    synchronized (this) {
        while(!fire) {
        wait();
        }
    }

    调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会

    释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的

    synchronized代码块执行完毕后,等待线程才会从wait调用中返回。

    三、生产者消费者模式

    public class MyQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        public MyQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<E>(limit);
        }
        //给生产者用的方法,往队列上放数据,满了就wait
        public synchronized void put(E e) throws InterruptedException {
            while (queue.size() == limit) wait();
            queue.add(e);
            //由于条件不同但又使用相同的的等待队列,所以
            //要用notifyAll而不能调用notify
            notifyAll();
        }
        //take给消费者用的方法空了就wait
        public synchronized E take() throws InterruptedException {
            while (queue.isEmpty()) wait();
            E e = queue.poll();
            notifyAll();
            return e;
        }
    }

    只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。

    public class Producer extends Thread{
        MyQueue<String> queue;
        public Producer(MyQueue<String> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            int num = 0;
            try {
                while (true) {
                    String task = String.valueOf(num);
                    queue.put(task);
                    System.out.println("produce: " + task);
                    num ++;
                    Thread.sleep(3000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class Consumer extends Thread{
        MyQueue<String> queue;
        public Consumer(MyQueue<String> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            try {
                while (true) {
                    String task = queue.take();
                    System.out.println("consumer: " + task);
                    Thread.sleep(3000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    三、同时开始

    public class FireFlag {
        private volatile boolean fired = false;
        public synchronized void waitForFire() throws InterruptedException {
            while (!fired) {
                wait();
            }
        }
        public synchronized void fire() {
            this.fired = true;
            notifyAll();
        }
    }
    public class Racer extends Thread{
        private FireFlag flag;
        public Racer(FireFlag fireFlag) {
            this.flag = fireFlag;
        }
        @Override
        public void run() {
            try {
                flag.waitForFire();
                System.out.println("start run " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int num = 10;
            FireFlag flag = new FireFlag();
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new Racer(flag);
                threads[i].start();
            }
            Thread.sleep(1000);
            flag.fire();
        }
    }

    四、等待结束

    join方法实际上就是调用了wait方法:

    while (isAlive()) {
        wait(0);
    }

    只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?

    当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要

    主线程逐一等待每个子线程。这里我们展示一种新写法:

    public class MyLatch {
        //未完成的线程个数,初始值为子线程总个数
        private int threadsCount;
        public MyLatch(int threadsCount) {
            this.threadsCount = threadsCount;
        }
        public synchronized void await() throws InterruptedException {
            while (threadsCount > 0) wait();
        }
        public synchronized void countDown() {
            threadsCount --;
            if (threadsCount <= 0) this.notifyAll();
        }
    }
    public class Worker extends Thread{
        MyLatch myLatch;
        public Worker(MyLatch myLatch) {
            this.myLatch = myLatch;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                myLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int num = 10;
            MyLatch myLatch = new MyLatch(num);
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new Worker(myLatch);
                threads[i].start();
            }
            myLatch.await();
            System.out.println("ready to end !");
        }
    }

    五、异步结果

    在Java中表示子任务的接口是Callable,声明为:

    public interface Callable<V> {
        V call() throws Exception;
    }

    为表示异步调用的结果,定义一个MyFuture接口:

    public interface MyFuture <V> {
        //get方法返回真正的结果,如果结果没有计算完成,
        //get方法会阻塞直到计算完成
        V get() throws Exception ;
    }

    为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute

    方法表示执行子任务并返回调用结果。

    //利用该方法,对于主线程,就不需要创建并管理子线程了,
    //并且可以方便地获取异步调用的结果
    public <V> MyFuture<V> execute(final Callable<V> task)

    实例:

    //表示子任务得接口
    interface MyCallable<V> {
        V call() throws Exception;
    }
    //表示异步调用的结果
    interface MyFuture<V> {
        //返回真正的结果
        V get() throws Exception;
    }
    interface MyExecutor {
        <V> MyFuture<V> execute(final MyCallable<V> task);
    }
    //执行任务的子线程
    public class ExecuteThread<V> extends Thread{
    
        private V result = null;
        private Exception exception = null;
        private volatile boolean done = false;
        private MyCallable<V> task;
        private Object lock;
    
        public ExecuteThread(MyCallable<V> task, Object lock) {
            this.task = task;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                result = task.call();
            } catch (Exception e) {
                exception = e;
            } finally {
                synchronized (lock) {
                    done = true;
                    lock.notifyAll();
                }
            }
        }
    
        public V getResult() {
            return result;
        }
        public boolean isDone() {
            return done;
        }
    
        public Exception getException() {
            return exception;
        }
    }
    public class MyExecutorImpl implements MyExecutor{
        @Override
        public <V> MyFuture<V> execute(MyCallable<V> task) {
            final Object lock = new Object();
            final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
            MyFuture<V> myFuture = new MyFuture<V>(){
                @Override
                public V get() throws Exception{
                    synchronized (lock) {
                        System.out.println("Try to get result");
                        while (!thread.isDone()) {
                            try {
                                //阻塞直到任务执行完毕
                                lock.wait();
                            } catch (Exception e) {}
                        }
                        if (thread.getException() != null) {
                            throw thread.getException();
                        }
                    }
                    System.out.println("Got it!!!");
                    return thread.getResult();
                }
            };
            thread.start();
            return myFuture;
        }
    }
    public class MainClass {
        public static void main(String[] args) {
            MyExecutor executor = new MyExecutorImpl();
            MyCallable<Integer> task = new MyCallable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("Start do the call.");
                    int result = (int) (Math.random() * 1000);
                    Thread.sleep(5000);
                    System.out.println("Do the call done.");
                    return result;
                }
            };
            //异步调用,返回一个MyFuture对象
            MyFuture<Integer> future = executor.execute(task);
            //执行其他操作
            try {
                System.out.println("Start do other things.");
                Thread.sleep(3000);
                System.out.println("Other things done. ");
                System.out.println("Ready to get.");
                //获取异步调用结果
                Integer result = future.get();
                System.out.println("The result is " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
            /*Start do other things.
            Do the call done.
            Other things done.
            The result is 802*/
        }
    }

    六、集合点

    public class AssemblePoint {
        //未到达集合点的线程数量,初始为所有线程数
        private int threadsCount;
        public AssemblePoint(int threadsCount) {
            this.threadsCount = threadsCount;
        }
        public synchronized void await() throws InterruptedException {
            System.out.println("Now the count is " + threadsCount);
            if (threadsCount > 0) {
                threadsCount --;
                if (threadsCount == 0 ) {
                    notifyAll();
                    System.out.println("All the threads done.");
                }
                else {
                    while (threadsCount !=0) {
                        this.wait();
                    }
                }
            }
        }
    }
    public class AssemblePointDemo {
        static class Tourist extends Thread {
            AssemblePoint point;
            public Tourist(AssemblePoint point) {
                this.point = point;
            }
            @Override
            public void run() {
                try {
                    //模拟各自独自运行
                    Thread.sleep((long) (Math.random() * 1000));
                    //该线程运行完毕,集合
                    point.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            int num = 10;
            Tourist[] threads = new Tourist[num];
            AssemblePoint point = new AssemblePoint(num);
            for (int i = 0; i < num; i++) {
                threads[i] = new Tourist(point);
                threads[i].start();
            }
        }
    }

    三、线程的中断

     一)取消/关闭机制

    在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,

    是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。

    //返回线程的中断标志位
    public boolean isInterrupted()
    //中断对应的线程
    public void interrupt()
    //该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空
    public static boolean interrupted()

     二)线程中断的反应

    interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,

    我们主要讨论线程状态:

    1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;

    2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;

    3)BLOCKED:线程在等待锁,试图进入同步块

    4)NEW/TERMINATED:线程还未启动或者已经结束。

    1.RUNNABLE 

    如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,

    没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:

    public class InterruptRunnableDemo extends Thread {
        @Override
        public void run() {
            while(!Thread.currentThread().isInterrupted()) {
            }
            System.out.println("done ");
        }
    }

    2.WAITING/TIME_WAITING 

    线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,

    在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。

    抛出异常后,中断标志位会被清空,而不是设置。比如:

    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().isInterrupted()); //false
            }
        });
        thread.start();
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
        thread.interrupt();
    }

    捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:

    1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;

    2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能

    抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调

    用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。

    3.BLOCKED 

    如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程

    仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。

    public class InterruptSynchronizedDemo {
        private static Object lock = new Object();
        private static class MyThread extends Thread {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("Coming in "); //never
                    while (!Thread.currentThread().isInterrupted()) {
                    }
                }
                System.out.println("exit");//never
            }
        }
        public static void test() throws InterruptedException {
            synchronized (lock) {
                MyThread thread = new MyThread();
                thread.start();
                Thread.sleep(1000);
                thread.interrupt();
                thread.join();
            }
        }
        public static void main(String[] args) {
            //test方法在持有锁的情况下启动线程thread,而线程thread
            //也尝试获得锁,所以会进入等待队列。
            try {
                test();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。

    4.NEW/TERMINATE

    调用interrupt无任何效果。

    三)如何正确地取消/关闭线程

    interrupt方法不一定会真正中断线程,它只是一种协作机制。

    对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供

    单独的取消/关闭方法给调用者。

  • 相关阅读:
    【心情】一天又一天
    【转】深度学习目标检测的整体架构描述(one-stage/two-stage/multi-stage)
    如何转载CSDN以及博客园的文章
    【转】Faster RCNN原理分析(二):Region Proposal Networks详解
    Lintcode 627
    遇黑中介打官司拿到房租的成功案例
    记录音视频不同步的问题及解决过程
    ffmpeg 使用笔记
    centos min安装后使用gclient
    剑指 Offer 26. 树的子结构
  • 原文地址:https://www.cnblogs.com/Shadowplay/p/10035987.html
Copyright © 2011-2022 走看看