zoukankan      html  css  js  c++  java
  • Java笔记(十四) 并发基础知识

    并发基础知识

    一、线程的基本概念

    线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。

    1.创建线程

    1)继承Thread

    Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:

    public class MyThread extends Thread{
        @Override
        public void run() {
            System.out.println("thread name: " + Thread.currentThread().getName() +
                    " thread id: " + Thread.currentThread().getId());
            System.out.println("Running my thread!");
        }
    
        public static void main(String[] args) {
            MyThread thread = new MyThread();
            //启动线程
            thread.start();
            /*thread name: Thread-0 thread id: 11
            Running my thread!*/
        }
    }

    2)实现Runnable接口

    public class MyRunnable implements Runnable{
        @Override
        public void run() {
            System.out.println("thread name:" + Thread.currentThread().getName());
        }
    
        public static void main(String[] args) {
            System.out.println("Main thread name : "
                    + Thread.currentThread().getName()); //Main thread name : main
            Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0
            thread.start();
        }
    }

    2.线程的基本属性和方法

    1)id和name

    id: 一个递增整数,每创建一个线程就加一

    name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。

    2)优先级

    优先级从1到10,默认为5,相关方法:

    public final void setPriority(int newPriority)
    public final int getPriority()

    在编程中不要过分依赖优先级。

    3)状态

    线程有一个状态概念,Thread获取状态方法:

    public State getState()

    返回值类型为Thread.State,它是一个枚举值:

    public enum State {
    NEW, //线程还没调用start
    //调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,
    //不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在
    //等待操作系统分配时间片,只是它在等待其他条件。
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED; //线程已经结束运行
    }

    Thread还有一个方法,返回线程是否alive:

    //线程被启动后,run方法运行结束前,返回值都是true
    public final native boolean isAlive()

    4.是否是daemon线程 

    一般情况下整个程序只有在所有的线程都结束后,线程才会退出。

    而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。

    public final void setDaemon(boolean on)
    public final boolean isDaemon()

    5.sleep方法

    Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。

    public static native void sleep(long millis) throws InterruptedException;

    在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。

    睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。

    6.yield方法 

    public static native void yield();

    调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。

    不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。

    7.join方法 

    在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。

    Thread有一个join方法,可以让调用join的线程等待该线程的结束。

    public final void join() throws InterruptedException
    //限定等待的最长时间
    public final synchronized void join(long millis) throws InterruptedException
    MyThread thread = new MyThread();
    thread.start();
    //让main线程在子线程调用结束后再退出,相当于阻塞了main线程
    thread.join();

    3.共享内存及可能出现的问题

    虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,

    但线程之间可以共享内存,它们可以访问和操作相同的对象。

    public class ShareMemoryDemo {
        private static int shared = 0;
        private static void incrShared() {
            shared ++;
        }
        static class ChildThread extends Thread {
            List<String> list;
            public ChildThread(List<String> list) {
                this.list = list;
            }
            @Override
            public void run() {
                incrShared();
                list.add(Thread.currentThread().getName());
            }
        }
        public static void main(String[] args) throws InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            ChildThread t1 = new ChildThread(list);
            ChildThread t2 = new ChildThread(list);
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(shared);
            System.out.println(list);
            /*2
            [Thread-0, Thread-1]*/
        }
    }

    ChildThread的run方法访问了共享变量shared和list。

    执行流、内存和程序代码之间的关系:

    1)不同的执行流可以访问和操作相同的变量

    2)不同的执行流可以执行相同的程序代码

    所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的

    3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中

    的参数和局部变量都有自己的一份。

    当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。

    1.竞态条件 

    所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序

    有关,可能正确也可能不正确。

    public class CounterThread extends Thread{
        private static int counter = 0;
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                counter++;
            }
        }
        public static void main(String[] args) throws InterruptedException {
            int num = 1000;
            Thread[] threads = new Thread[num];
            for (int i = 0; i< num; i++) {
                threads[i] = new CounterThread();
                threads[i].start();
            }
            for (int i = 0; i < 1000; i++) {
                threads[i].join();
            }
            System.out.println(counter); //998400
        }
    }

    期望结果应该是10000000,实际结果为998400,为什么呢?

    因为counter++这个操作不是原子操作,它分为了3个步骤:

    1)去counter的当前值

    2)在当前值上加1

    3)将新值重新赋值给counter

    两个线程可能同时执行第一步,取到了相同的counter值。

    2.内存的可见性 

    多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,

    另一个线程不一定能马上见到,甚至永远见不到。

    public class VisibilityDemo {
        private static boolean shutdown = false;
        static class MyThread extends Thread {
            @Override
            public void run() {
                while (!shutdown) {
                    //do nothing
                }
                System.out.println("exit myThread");
            }
        }
        public static void main(String[] args) throws InterruptedException {
            new MyThread().start();
            Thread.sleep(1000);
            shutdown = true;
            System.out.println("exit main!");
        }
    }

    期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,

    也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。

    这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及

    各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当

    修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这

    一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对

    内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。

    二、理解synchronized(同步)

    1.用法和基本原理

    synchronized可以用于:

    1)实例方法:

    public class Counter {
        private int count;
        public synchronized void incre() {
            count ++;
        }
        public synchronized int getCount() {
            return count;
        }
    }
    public class CounterThread extends Thread {
        private Counter counter;
        public CounterThread(Counter counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                counter.incre();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            int num = 1000;
            Counter counter = new Counter();
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new CounterThread(counter);
                threads[i].start();
                threads[i].join();
            }
            System.out.println(counter.getCount());
        }
    }

    看上去,synchronized使得同时只能有一个线程执行实例方法,

    但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,

    只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个

    对象方法的调用,确保同时只能有一个线程执行。大致过程如下:

    1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;

    2)执行实例方法代码;

    3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒

    哪一个是不一定的,不保证公平性。

    当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.

    注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。

    再次强调:synchronized保护的是对象而非代码,只要访问的是同一个对象的synchronized方法,

    即使是不同的代码,也会被同步顺序访问。比如,Counter中的两个实例方法,incre和getCount,

    对于同一个Counter对象,一个线程想去执行incre,另一个线程想执行getCount,它们是不能同时执行的,

    会被synchronized同步顺序执行。

    2)静态方法 

    synchronized保护的是类对象,而非实例对象(实际上,每个对象都有一个锁和等待队列,类对象也不例外)。

    public class StaticCounter {
        private static int count = 0;
        public static void incr() {
            synchronized (StaticCounter.class) {
                count ++;
            }
        }
        public static int getCount() {
            synchronized (StaticCounter.class) {
                return count;
            }
        }
    }

    3)代码块 

    public class Counter {
        private int count;
        public void incre() {
            //synchronized括号里面的就是保护对象{}里就是同步执行代码
            synchronized (this) {
                count ++;
            }
        }
        public synchronized int getCount() {
            synchronized (this) {
                return count;
            }
        }
    }
    public class Counter {
        private int count;
        private Object obj = new Object();
        public void incre() {
            //synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列
            synchronized (obj) {
                count ++;
            }
        }
        public synchronized int getCount() {
            synchronized (obj) {
                return count;
            }
        }
    }

    2.进一步理解synchronized

    1)可重入性 

    对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

    比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。

    2)内存可见性 

    synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,

    在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取最新数据。

    不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。

    加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。

    3)死锁 

    死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,

    a、b陷入了互相等待,最后谁都执行不下去。

    public class DeadLockDemo {
        private static Object lockA = new Object();
        private static Object lockB = new Object();
        public static void startThreadA() {
            Thread t = new Thread(() -> {
                synchronized (lockA) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (lockB) {}
                }
            });
            t.start();
        }
        public static void startThreadB() {
            Thread t = new Thread(() -> {
                synchronized (lockB) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (lockA) {}
                }
            });
            t.start();
        }
        public static void main(String[] args) {
            startThreadA();
            startThreadA();
        }
    }

    如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,

    如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

    3.同步容器

    Collections类中有一些方法,可以返回线程安全的同步容器:

    public static <T> Collection<T> synchronizedCollection(Collection<T> c)
    public static <T> List<T> synchronizedList(List<T> list)
    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)

    它们是给所有容器方法加上synchronized来实现安全的,例如:

    static class SynchronizedCollection<E> implements Collection<E> {
        final Collection<E> c; //Backing Collection
        final Object mutex; //Object on which to synchronize
        SynchronizedCollection(Collection<E> c) {
            if(c==null)
                throw new NullPointerException();
            this.c = c;
            mutex = this;
        }

    public int size() { synchronized (mutex) {return c.size();} } public boolean add(E e) { synchronized (mutex) {return c.add(e);} } public boolean remove(Object o) { synchronized (mutex) {return c.remove(o);} } // }

    这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,  

    不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是

    的我们还需要注意以下情况:

    1)复合操作 

    public class EnhancedMap <K, V> {
        Map<K, V> map;
        public EnhancedMap(Map<K,V> map){
            this.map = Collections.synchronizedMap(map);
        }
        public V putIfAbsent(K key, V value){
            V old = map.get(key);
            if(old!=null){
                return old;
            }
            return map.put(key, value);
        }
        public V put(K key, V value){
            return map.put(key, value);
        }
    //
    }

    其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,

    这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程

    执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。

    2.伪同步 

    那么给该方法加上synchronized就线程安全了吗?

    public synchronized V putIfAbsent(K key, V value){
        V old = map.get(key);
        if(old!=null){
            return old;
        }
        return map.put(key, value);
    }

    答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,

    而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须

    使用相同的锁。所以可以改为:

    public V putIfAbsent(K key, V value){
        synchronized(map){
            V old = map.get(key);
            if(old!=null){
                return old;
            }
            return map.put(key, value);
        }
    }

    3.迭代 

    对于同步容器对象,虽然单个操作是安全的,但迭代并不是:

    public class DeadLockDemo {
    
        private static void startModifyThread(final List<String> list) {
            Thread modifyThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i = 0; i < 100; i++) {
                        list.add("item " + i);
                        try {
                            Thread.sleep((int) (Math.random() * 10));
                        } catch (InterruptedException e) {
                        }
                    }
                }
            });
            modifyThread.start();
        }
    
        private static void startIteratorThread(final List<String> list) {
            Thread iteratorThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        for(String str : list) {
                        }
                    }
                }
            });
            iteratorThread.start();
        }
    
        public static void main(String[] args) {
            final List<String> list = Collections
                    .synchronizedList(new ArrayList<String>());
            startIteratorThread(list);
            startModifyThread(list);
        }
        /*Exception in thread "Thread-0" java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34)
        at java.lang.Thread.run(Thread.java:748)*/
    }

    如果在遍历的同时容器发生了结构性变化就会抛出该异常。

    可以改为(在遍历的时候给整个容器对象加锁):

    private static void startIteratorThread2(final List<String> list) {
        Thread iteratorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(list){
                        for(String str : list) {
                        }
                    }
                }
            }
        });
        iteratorThread.start();
    }

    二、线程的基本协作机制

    一)协作的场景

    1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到

    队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,

    在队列为空的时候,消费者需要等待。

    2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。

    3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他

    任务之前需要等待每个子任务执行完毕。

    4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装

    为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。

    5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程

    到齐后,交还数据和计算结果,再进行下一次迭代。

    二、wait/notify

    在Java根父类中定义了一些线程协作的基本方法,这些方法

    分为两类:wait和notify。wait主要有两个方法:

    public final void wait() throws InterruptedException
    //单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0)
    public final native void wait(long timeout) throws InterruptedException;

    每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,

    会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了

    用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列

    用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件

    队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由

    它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。

    //notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒
    public final native void notify();
    //notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒
    public final native void notifyAll();

    简单的协作示例:

    public class WaitThread extends Thread{
        private volatile boolean fire = false;
        @Override
        public void run() {
            try {
                synchronized(this) {
                    while (!fire) {
                        wait();
                    }
                }
                System.out.println("fired!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public synchronized void fire() {
            this.fire = true;
            notify();
        }
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被

    synchronized保护。实际上,wait/notify方法只能在synchronized代码

    块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。

    看看另一种情况:

    public class WaitThread extends Thread{
        private volatile boolean fire = false;
        private volatile int count = 0;
        @Override
        public void run() {
            try {
                synchronized(this) {
                    while (!fire) {
                        sleep(1000);
                        System.out.println(++count);
                    }
                }
                System.out.println("fired");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //实例对象的锁不会被释放,该方法永远不会被执行
        public synchronized void fire() {
            System.out.println("fire it"); 
            this.fire = true;
        }
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程

    怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?

    它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用

    wait时,线程会释放对象锁。wait的具体过程是:

    1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。

    2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

    如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。

    否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。

    线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般

    调用模式是:

    synchronized (obj) {
    while(条件不成立)
    obj.wait();
    …//执行条件满足后的操作
    }

    比如上例中的代码:

    synchronized (this) {
        while(!fire) {
        wait();
        }
    }

    调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会

    释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的

    synchronized代码块执行完毕后,等待线程才会从wait调用中返回。

    三、生产者消费者模式

    public class MyQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        public MyQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<E>(limit);
        }
        //给生产者用的方法,往队列上放数据,满了就wait
        public synchronized void put(E e) throws InterruptedException {
            while (queue.size() == limit) wait();
            queue.add(e);
            //由于条件不同但又使用相同的的等待队列,所以
            //要用notifyAll而不能调用notify
            notifyAll();
        }
        //take给消费者用的方法空了就wait
        public synchronized E take() throws InterruptedException {
            while (queue.isEmpty()) wait();
            E e = queue.poll();
            notifyAll();
            return e;
        }
    }

    只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。

    public class Producer extends Thread{
        MyQueue<String> queue;
        public Producer(MyQueue<String> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            int num = 0;
            try {
                while (true) {
                    String task = String.valueOf(num);
                    queue.put(task);
                    System.out.println("produce: " + task);
                    num ++;
                    Thread.sleep(3000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class Consumer extends Thread{
        MyQueue<String> queue;
        public Consumer(MyQueue<String> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            try {
                while (true) {
                    String task = queue.take();
                    System.out.println("consumer: " + task);
                    Thread.sleep(3000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    三、同时开始

    public class FireFlag {
        private volatile boolean fired = false;
        public synchronized void waitForFire() throws InterruptedException {
            while (!fired) {
                wait();
            }
        }
        public synchronized void fire() {
            this.fired = true;
            notifyAll();
        }
    }
    public class Racer extends Thread{
        private FireFlag flag;
        public Racer(FireFlag fireFlag) {
            this.flag = fireFlag;
        }
        @Override
        public void run() {
            try {
                flag.waitForFire();
                System.out.println("start run " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int num = 10;
            FireFlag flag = new FireFlag();
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new Racer(flag);
                threads[i].start();
            }
            Thread.sleep(1000);
            flag.fire();
        }
    }

    四、等待结束

    join方法实际上就是调用了wait方法:

    while (isAlive()) {
        wait(0);
    }

    只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?

    当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要

    主线程逐一等待每个子线程。这里我们展示一种新写法:

    public class MyLatch {
        //未完成的线程个数,初始值为子线程总个数
        private int threadsCount;
        public MyLatch(int threadsCount) {
            this.threadsCount = threadsCount;
        }
        public synchronized void await() throws InterruptedException {
            while (threadsCount > 0) wait();
        }
        public synchronized void countDown() {
            threadsCount --;
            if (threadsCount <= 0) this.notifyAll();
        }
    }
    public class Worker extends Thread{
        MyLatch myLatch;
        public Worker(MyLatch myLatch) {
            this.myLatch = myLatch;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                myLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int num = 10;
            MyLatch myLatch = new MyLatch(num);
            Thread[] threads = new Thread[num];
            for (int i = 0; i < num; i++) {
                threads[i] = new Worker(myLatch);
                threads[i].start();
            }
            myLatch.await();
            System.out.println("ready to end !");
        }
    }

    五、异步结果

    在Java中表示子任务的接口是Callable,声明为:

    public interface Callable<V> {
        V call() throws Exception;
    }

    为表示异步调用的结果,定义一个MyFuture接口:

    public interface MyFuture <V> {
        //get方法返回真正的结果,如果结果没有计算完成,
        //get方法会阻塞直到计算完成
        V get() throws Exception ;
    }

    为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute

    方法表示执行子任务并返回调用结果。

    //利用该方法,对于主线程,就不需要创建并管理子线程了,
    //并且可以方便地获取异步调用的结果
    public <V> MyFuture<V> execute(final Callable<V> task)

    实例:

    //表示子任务得接口
    interface MyCallable<V> {
        V call() throws Exception;
    }
    //表示异步调用的结果
    interface MyFuture<V> {
        //返回真正的结果
        V get() throws Exception;
    }
    interface MyExecutor {
        <V> MyFuture<V> execute(final MyCallable<V> task);
    }
    //执行任务的子线程
    public class ExecuteThread<V> extends Thread{
    
        private V result = null;
        private Exception exception = null;
        private volatile boolean done = false;
        private MyCallable<V> task;
        private Object lock;
    
        public ExecuteThread(MyCallable<V> task, Object lock) {
            this.task = task;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                result = task.call();
            } catch (Exception e) {
                exception = e;
            } finally {
                synchronized (lock) {
                    done = true;
                    lock.notifyAll();
                }
            }
        }
    
        public V getResult() {
            return result;
        }
        public boolean isDone() {
            return done;
        }
    
        public Exception getException() {
            return exception;
        }
    }
    public class MyExecutorImpl implements MyExecutor{
        @Override
        public <V> MyFuture<V> execute(MyCallable<V> task) {
            final Object lock = new Object();
            final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
            MyFuture<V> myFuture = new MyFuture<V>(){
                @Override
                public V get() throws Exception{
                    synchronized (lock) {
                        System.out.println("Try to get result");
                        while (!thread.isDone()) {
                            try {
                                //阻塞直到任务执行完毕
                                lock.wait();
                            } catch (Exception e) {}
                        }
                        if (thread.getException() != null) {
                            throw thread.getException();
                        }
                    }
                    System.out.println("Got it!!!");
                    return thread.getResult();
                }
            };
            thread.start();
            return myFuture;
        }
    }
    public class MainClass {
        public static void main(String[] args) {
            MyExecutor executor = new MyExecutorImpl();
            MyCallable<Integer> task = new MyCallable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("Start do the call.");
                    int result = (int) (Math.random() * 1000);
                    Thread.sleep(5000);
                    System.out.println("Do the call done.");
                    return result;
                }
            };
            //异步调用,返回一个MyFuture对象
            MyFuture<Integer> future = executor.execute(task);
            //执行其他操作
            try {
                System.out.println("Start do other things.");
                Thread.sleep(3000);
                System.out.println("Other things done. ");
                System.out.println("Ready to get.");
                //获取异步调用结果
                Integer result = future.get();
                System.out.println("The result is " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
            /*Start do other things.
            Do the call done.
            Other things done.
            The result is 802*/
        }
    }

    六、集合点

    public class AssemblePoint {
        //未到达集合点的线程数量,初始为所有线程数
        private int threadsCount;
        public AssemblePoint(int threadsCount) {
            this.threadsCount = threadsCount;
        }
        public synchronized void await() throws InterruptedException {
            System.out.println("Now the count is " + threadsCount);
            if (threadsCount > 0) {
                threadsCount --;
                if (threadsCount == 0 ) {
                    notifyAll();
                    System.out.println("All the threads done.");
                }
                else {
                    while (threadsCount !=0) {
                        this.wait();
                    }
                }
            }
        }
    }
    public class AssemblePointDemo {
        static class Tourist extends Thread {
            AssemblePoint point;
            public Tourist(AssemblePoint point) {
                this.point = point;
            }
            @Override
            public void run() {
                try {
                    //模拟各自独自运行
                    Thread.sleep((long) (Math.random() * 1000));
                    //该线程运行完毕,集合
                    point.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            int num = 10;
            Tourist[] threads = new Tourist[num];
            AssemblePoint point = new AssemblePoint(num);
            for (int i = 0; i < num; i++) {
                threads[i] = new Tourist(point);
                threads[i].start();
            }
        }
    }

    三、线程的中断

     一)取消/关闭机制

    在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,

    是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。

    //返回线程的中断标志位
    public boolean isInterrupted()
    //中断对应的线程
    public void interrupt()
    //该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空
    public static boolean interrupted()

     二)线程中断的反应

    interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,

    我们主要讨论线程状态:

    1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;

    2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;

    3)BLOCKED:线程在等待锁,试图进入同步块

    4)NEW/TERMINATED:线程还未启动或者已经结束。

    1.RUNNABLE 

    如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,

    没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:

    public class InterruptRunnableDemo extends Thread {
        @Override
        public void run() {
            while(!Thread.currentThread().isInterrupted()) {
            }
            System.out.println("done ");
        }
    }

    2.WAITING/TIME_WAITING 

    线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,

    在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。

    抛出异常后,中断标志位会被清空,而不是设置。比如:

    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().isInterrupted()); //false
            }
        });
        thread.start();
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
        thread.interrupt();
    }

    捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:

    1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;

    2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能

    抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调

    用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。

    3.BLOCKED 

    如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程

    仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。

    public class InterruptSynchronizedDemo {
        private static Object lock = new Object();
        private static class MyThread extends Thread {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("Coming in "); //never
                    while (!Thread.currentThread().isInterrupted()) {
                    }
                }
                System.out.println("exit");//never
            }
        }
        public static void test() throws InterruptedException {
            synchronized (lock) {
                MyThread thread = new MyThread();
                thread.start();
                Thread.sleep(1000);
                thread.interrupt();
                thread.join();
            }
        }
        public static void main(String[] args) {
            //test方法在持有锁的情况下启动线程thread,而线程thread
            //也尝试获得锁,所以会进入等待队列。
            try {
                test();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。

    4.NEW/TERMINATE

    调用interrupt无任何效果。

    三)如何正确地取消/关闭线程

    interrupt方法不一定会真正中断线程,它只是一种协作机制。

    对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供

    单独的取消/关闭方法给调用者。

  • 相关阅读:
    gain 基尼系数
    luogu P5826 【模板】子序列自动机 主席树 vector 二分
    牛客挑战赛39 树与异或 离线 树上莫队 树状数组 约数
    4.22 省选模拟赛 三元组 manacher 回文自动机
    4.22 省选模拟赛 最优价值 网络流 最大权闭合子图
    4.18 省选模拟赛 消息传递 树剖 倍增 线段树维护等比数列
    luogu P4008 [NOI2003]文本编辑器 splay 块状链表
    牛客挑战赛39 密码系统 后缀数组
    luogu P1526 [NOI2003]智破连环阵 搜索+最大匹配+剪枝
    luogu P4095 [HEOI2013]Eden 的新背包问题 多重背包 背包的合并
  • 原文地址:https://www.cnblogs.com/Shadowplay/p/10035987.html
Copyright © 2011-2022 走看看