zoukankan      html  css  js  c++  java
  • 第3章 JDK并发包(二)

    3.1.2 重入锁的好搭档:Condition条件

    • 它和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condition是与重入锁相关联的。通过Lock接口的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一种特定的时刻得到通知,继续执行。
    • Condition接口提供的基本方法如下:
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
    
    • 以上方法的含义如下:
      • await()方法使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
      • awaitUninterruptibly()方法与await()方法基本相同,但是它并不会在等待过程中响应中断。
      • signal()方法用于唤醒一个在等待中的线程。相对的signalAll()方法会唤醒所有在等待中的线程。这和Object.notify()方法很类似。
    • 下面的代码简单演示了Condition的功能:
    public class ReenterLockCondition implements Runnable {
        public static ReentrantLock lock = new ReentrantLock();
        //通过lock生成一个与之绑定的Condition对象。
        public static Condition condition = lock.newCondition();
        @Override
        public void run() {
            try {
                lock.lock();
                //要求线程在Condition对象上进行等待。
                condition.await();
                System.out.println("Thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            ReenterLockCondition tl = new ReenterLockCondition();
            Thread t1 = new Thread(tl);
            t1.start();
            Thread.sleep(2000);
            //通知线程t1继续执行
            lock.lock();
            condition.signal();
            //释放重入锁,否则唤醒的t1无法重新获得锁
            lock.unlock();
        }
    }
    
    • 当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。同理,在Condition.signal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。因此,在signal()方法调用之后,一般需要释放相关的锁,谦让给被唤醒的线程,让它可以继续执行。

    3.1.3 允许多个线程同时访问:信号量(Semaphore)

    • 信号量为多线程协作提供了更为强大的控制方法。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信 号量主要提供了以下构造函数:
    public Semaphore(int permits)
    public Semaphore(int permits, boolean fair) //第二个参数可以指定是否公平
    
    • 在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有 :
    public void acquire()
    public void acquireUninterruptibly()
    public boolean tryAcquire()
    public boolean tryAcquire(long timeout, TimeUnit unit)
    public void release()
    
    • acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不响应中断。tryAcquire()尝试获得一个许可,如果成功返回true,失败返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
    public class SemapDemo implements Runnable {
        //申明了一个包含5个许可的信号量
        final Semaphore semp = new Semaphore(5);
        @Override
        public void run() {
            try {
                semp.acquire();
                //模拟耗时操作
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getId() + ":done!");
                semp.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
            ExecutorService exec = Executors.newFixedThreadPool(20);
            final SemapDemo demo = new SemapDemo();
            for (int i = 0; i < 20; i++) {
                exec.submit(demo);
            }
        }
    }
    
    • 申请信号量使用require()操作,在离开时,务必使用release()释放信号量。如果不幸发生了信号量的泄露(申请了但没有释放),那么可以进入临界区的线程数量就会越来越少,直到所有的线程均不可访问。

    3.1.4 ReadWriteLock 读写锁

    • ReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。用锁分离的机制来提升性能非常容易理解,比如线程A1、A2、A3进行写操作,B1、B2、B3进行读操作,如果使用重入锁或者内部锁,则理论上说所有读之间、读与 写之间、写与写之间都是串行操作。当B1进行读取时,B2、B3则需要等待锁。由于读操作并不对数据的完整性造成破坏,这种等待显然是不合理。因此,读写锁就有了发挥功能的余地。

    • 在这种情况下,读写锁允许多个线程同时读,使得B1、B2、B3之间真正并行。但是,考虑到数据完整性,写写操作和读写操作间依然是需要相互等待和持有锁的。总的来说,读写锁的访问约束如表3.1所示。

    • 如果在系统中,读操作次数远远大于写操作,则读写锁就可以发挥最大的功效,提升系统的性能。这里我给出一个稍微夸张点的案例,来说明读写锁对性能的帮助。

    public class ReadWriteLockDemo {
        private static Lock lock = new ReentrantLock();
        private static ReentrantReadWriteLock  readWriteLock = new ReentrantReadWriteLock();
        private static Lock readLock = readWriteLock.readLock();
        private static Lock writeLock = readWriteLock.writeLock();
        private int value;
        
        public Object handleRead(Lock lock) throws InterruptedException {
            try {
                lock.lock();       //模拟读操作
                Thread.sleep(1000);   //读操作的耗时越多,读写锁的优势就越明显
                return value;
            } finally {
                lock.unlock();
            }
        }
        
        public void handleWrite(Lock lock, int index) throws InterruptedException {
            try {
                lock.lock();       //模拟写操作
                Thread.sleep(1000);   
                value = index;
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            final ReadWriteLockDemo demo = new ReadWriteLockDemo();
            Runnable readRunnable = new Runnable() {
                @Ovrride
                public void run() {
                    try {
                        demo.handleRead(readLock); //读线程
                        //demo.handleRead(lock);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            
            Runnable readRunnable = new Runnable() {
                @Ovrride
                public void run() {
                    try {
                        demo.handleWrite(writeLock, new Random().nextInt()); //写线程
                        //demo.handleWrite(lock, new Random().nextInt());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            
            for (int i = 0; i < 18; i++) {
                new Thread(readRunnable).start();
            }
            
            for (int i = 18; i < 20; i++) {
                new Thread(writeRunnable).start();
            }
        }
    }
    
    • 如果上述读线程和写线程,使用普通的重入锁代替读写锁。那么所有的读和写线程之间都必须相互等待,因此整个程序的执行时间将长达20余秒。

    3.1.5 倒计时器:CountDownLatch

    • 这个工具通常用来控制线程等待,它可以让某一个线程等到直到倒计时结束,再开始执行。
    • CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。
    public CountDownLatch(int count);
    
    • 下面这个简单的示例,演示了CountDownLatch的使用。
    public class CountDownLatchDemo implements Runnable {
        static final CountDownLatch end = new CountDownLatch(10);
        static final CountDownLatchDemo demo = new CountDownLatchDemo();
        @Override
        public void run() {
            try {
                //模拟检查任务
                Thread.sleep(new Random().nextInt(10) * 1000);
                System.out.println("check complete");
                //通知CountDownLatch,一个线程已经完成了任务,倒计时器可以减1啦。
                end.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            ExecutorService exec = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
                exec.submit(demo);
            }
            //等待检查
            end.await(); //要求主线程所有10个检查任务全部完成。主线程才能继续执行。
            //发射火箭
            System.out.println("Fire!");
            exec.shutdown();
        }
    }
    
    • 上述代码,生成一个CountDownLatch示例。计数数量为10。这表示需要有10个线程完成任务,等待在CountDownLatch上的线程才能继续执行。
    • 上述案例的执行逻辑可以用图3.1简单所示。

    3.1.6 循环栅栏:CyclicBarrier

    • 它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。
    • CyclicBarrier可以接收一个参数作为barrierAction。所谓barrierAction就是当计数器一次计数完成后,系统会执行的动作。如下构造函数,其中,parties表示计数总数,也就是参与的线程总数,也就是参与的线程总数。
    public CyclicBarrier(int parties, Runnable barrierAction)
    
    • 下面的实例使用CyclicBarrier演示了司令命令士兵完成任务的场景。
    public class CyclicBarrierDemo {
        public static class Soldier implements Runnable {
            private String soldier;
            private final CyclicBarrier cyclic;
            
            Soldier(CyclicBarrier cyclic, String soldierName) {
                this.cyclic = cyclic;
                this.soldier = soldierName;
            }
            
            public void run() {
                try {
                    //等待所有士兵到齐
                    cyclic.await();
                    doWork();
                    //等待所有士兵完成工作
                    cyclic.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
            
            void doWork() {
                try {
                    Thread.sleep(Math.abs(new Random().nextInt() % 10000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(soldier + ":任务完成");
            }
        }
        
        public static class BarrierRun implements Runnable {
            boolean flag;
            int N;
            public BarrierRun(boolean flag, int N) {
                this.flag = flag;
                this.N = N;
            }
            
            public void run() {
                if (flag) {
                    System.out.println("司令:[士兵" + N + "个,任务完成!]");
                } else {
                    System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                    flag = true;
                }
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            final int N = 10;
            Thread[] allSoldier = new Thread[N];
            boolean flag = false;
            CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
            
            //设置屏障点,主要是为了执行这个方法
            System.out.println("集合队伍!");
            for (int i = 0; i < N; ++i) {
                System.out.println("士兵 " + i + " 报道!");
                allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
                allSoldier[i].start();
            }
        }
    }
    


    • CyclicBarrier.await()方法可能会抛出两个异常。一个是InterruptedException,也就是等待过程中,线程被中断,应该说这是一个非常通用的异常。大部分迫使线程等待的方法都可能抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。另外一个异常则是CyclicBarrier特有的BrokenBarrierException。一旦遇到这个异常,则表示当前的CyclicBarrier已经破损了,可能系统已经没有 办法等待所有线程到齐了。

    3.1.7 线程阻塞工具类:LookSupport

    • 它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。
    • LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。它们实现了一个限时的等待。
    public class LockSupportDemo {
        public static Object u = new Object();
        static ChangeObjectThread t1 = new ChangeObjectThread("t1");
        static ChangeObjectThread t2 = new ChangeObjectThread("t2");
        
        public static class ChangeObjectThread extends Thread {
            public ChangeObjectThread(String name) {
                super.setName(name);
            }   
            @Override
            public void run() {
                synchronized (u) {
                    System.out.println("in " + getName());                    LockSupport.park();
                }
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            t1.start();
            Thread.sleep(100);
            t2.start();
            LockSupport.unpark(t1);
            LockSupport.unpark(t2);
            t1.join();
            t2.join();
        }
    }
    
    • 它自始至终都可以正常的结束,不会因为park()方法而导致线程永久性的挂起。

    • 这是因为LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可,如果许可不可用,就会阻塞。而unpark()则使得一个许可变为可用(永远只有一个)。

    • 这个特点使得:即使unpark()操作发生在park()之前,它也可以使下一次的park()操作立即返回。

    • 除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法获得中断标记。

     public class LockSupportIntDemo {
         public static Object u = new Object();
         static ChangeObjectThread t1 = new ChangeObjectThread("t1");
         static ChangeObjectThread t2 = new ChangeObjectThread("t2");
        
        public static class ChangeObjectThread extends Thread {
            public ChangeObjectThread(String name) {
                super.setName(name);
            }   
            @Override
            public void run() {
                synchronized (u) {
                    System.out.println("in " + getName());                    LockSupport.park();
                    if (Thread.interrupted()) {
                        System.out.println(getName + " 被中断了");
                    }
                }
                System.out.println(getName + "执行结束")l
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            t1.start();
            Thread.sleep(100);
            t2.start();
            t1.interrupt();
            LockSupport.unpark(t2);
        }
         
     }
    
    • t1.interrupt()中断了处于park()状态的t1。之后,t1可以马上响应这个中断,并且返回。之后在外面等待的t2才可以进入临界区,并最终由LockSupport.unpark(t2)操作使其运行结束。
    in t1
    t1 被中断了
    t1 执行结束
    in t2
    t2 执行结束。
    
  • 相关阅读:
    Powershell-查询当前文件目录层级结构
    Microsoft Edge浏览器下载文件乱码修复方法(二)
    Windows Server 2016-PS筛选导出用户邮箱属性包含某字段列表
    Visual Studio Code-批量添加或删除注释行
    Java利用gson,将字符串转化为list
    Java8新特性-日期相关类操作
    redis设置密码
    linux执行时间段内日志关键字搜索
    idea中以maven工程的方式运行tomcat源码
    微信小程序
  • 原文地址:https://www.cnblogs.com/sanjun/p/8320645.html
Copyright © 2011-2022 走看看