zoukankan      html  css  js  c++  java
  • 并发编程与高并发学习笔记六

    J.U.C
    一,AbstractQueuedSynchronizer -AQS
    1.设计
    使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
    利用一个int类型表示状态
    在AQS类中有一个叫waitStatus的成员变量,基于AQS有一个同步组件叫ReentrantLock,在这个组件中status表示
    获取锁的线程数。比如:status=0,表示还没有线程获取锁。status=0表示已经有线程获取了锁。status>1表示
    重入锁的数量
    使用方法是继承(是基于模板方法设计的,使用者必须继承这个AQS里的方法)
    子类通过继承并通过实现他的方法管理其状态的方法操纵状态(acquire方法和release()方法)
    可以同时实现排他锁和共享锁模式(站在使用者角度有两种功能:独占功能,共享功能)
    2.实现思路
    AQS内部维护了一个clh队列来管理锁,线程会首先尝试获取锁。如果失败,就将当前线程,以及等待状态等信息包成一个Node节点
    加入到同步队列SyncQueue中。之后,会不断的循环尝试获取锁。条件是但前节点为head节点的后继才会尝试。如果失败,就会
    阻塞自己。直到自己被唤醒。当持有锁的线程释放锁的时候,会唤醒队列中的后继线程
    二,同步组件
    CountDownLatch:闭锁,通过一个计数来保证线程是否需要一直阻塞
    Semaphore:控制同一时间并发线程的数目
    CyclicBarrier:可以阻塞线程
    ReentrantLock
    Condition
    FutureTask
    /**
     * Created by yaming .
     * CountDownLatch使用场景1:
     * 等待一个线程全部执行完毕后,才开始执行另外一个线程
     */
    public class CountDownLatchExample1 {
    
        //请求总数
        public static int clientTotal = 200;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int threadNum = i;
                executorService.execute(()->{
                    try {
                        test(threadNum);
                    }catch (Exception e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }finally {
                        //闭锁,每执行一次test()操作,请求数就减一
                        countDownLatch.countDown();
                    }
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("finish");
            //关闭线程池
            executorService.shutdown();
        }
    
        private static void test(int threadNum) throws Exception{
            Thread.sleep(100);
            System.out.println(""+threadNum);
            Thread.sleep(100);
        }
    }
    
    
    /**
     * Created by yaming .
     * CountDownLatch使用场景2:
     * 等待一个线程指定时间,超时就开始执行另外一个线程
     */
    public class CountDownLatchExample2 {
    
        //请求总数
        public static int clientTotal = 200;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int threadNum = i;
                executorService.execute(()->{
                    try {
                        test(threadNum);
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        //闭锁,每执行一次test()操作,请求数就减一
                        //放到finally代码块里
                        countDownLatch.countDown();
                    }
                });
            }
    
            //只等待上面的线程指定时间,超时后直接向下执行主线程
            try {
                countDownLatch.await(1, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("finish");
            //关闭线程池
            executorService.shutdown();
        }
    
        private static void test(int threadNum) throws Exception{
            Thread.sleep(20);
            System.out.println(""+threadNum);
            Thread.sleep(20);
        }
    }
    /**
     * Created by yaming 
     * 控制并发执行的线程个数
     */
    public class SemaphoreExample1 {
        //请求总数
        public static int clientTotal = 20;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //计数器,
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < clientTotal; i++) {
                final int threadNum = i;
                executorService.execute(()->{
    
                    try {
                        if(semaphore.tryAcquire()){ //尝试获取一个许可,如果获取不到,该线程就会被丢弃
                            test(threadNum);
                            semaphore.release();//释放一个许可
                        }
    
    //                    semaphore.acquire();//获取一个许可
    //                    test(threadNum);
    //                    semaphore.release();//释放一个许可
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                });
            }
            //关闭线程池
            executorService.shutdown();
        }
        private static void test(int threadNum) throws Exception{
            System.out.println(""+threadNum);
            Thread.sleep(1000);
        }
    }

    三,区别
    CountDownLatch:描述的是一个或n个线程等待其他线程的关系。CountDownLatch的计数器只能使用一次
    CyclicBarrier:描述的是多个线程相互等待的,只有所有的线程都满足条件后,才继续执行下一个操作。CyclicBarrier计数器可以
    重复使用(使用reset()方法重置)
    四,ReentrantLock与锁
    1.锁的分类:
    synchronized关键字修饰的锁
    J.U.C里提供的锁,(ReentrantLock)
    2.区别:
    都具有可重入性。(同一个线程进入一次,锁的计数器就自增1,当锁的计数器下降为0时,才释放锁)
    锁的实现。synchronized是基于JVM实现的(操作系统来实现),ReentrantLock是jdk实现的(用户自己实现)
    性能的区别。优化的的synchronized的性能和ReentrantLock差不多
    功能的区别。synchronized使用比较方便简洁,是由编译器保证加锁的锁的释放。ReentrantLock需要手动声明加锁和释放锁
    在锁的细粒度和灵活度上,ReentrantLock具有优势
    3.ReentrantLock独有的功能
    可以指定是公平锁还是非公平锁(是可以选择的)。synchronized是非公平锁(不能指定)。(公平锁:先等待的线程先获得锁)
    提供了一个Condition类,可以分组唤醒需要唤醒的线程。而synchronized要么随机唤醒一个线程,要么唤醒所有线程
    提供了能够中断等待锁的线程的机制,lock.lockInterruptibly();
    4.总结:
    ReentrantLock实现的是一种自旋锁,循环调用CAS操作来加锁,他的性能比较好也是因为避免了使线程进入内核态的阻塞状态。
    想尽办法避免进入线程内核的阻塞状态,是我们分析和理解锁设计的关键
    5.什么情况下使用ReentrantLock
    当我们需要使用ReentrantLock独有的功能的时候,就用ReentrantLock
    6.源码:
    private final static Lock lock = new ReentrantLock();
    如何实现:
    默认是不公平锁
    public ReentrantLock() {
    sync = new NonfairSync();
    }
    我们还可以指定使用公平锁还是非公平锁
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }

    tryLock()方法:本质作用是仅在调用时锁定未被另一个线程保持的情况下,才获取锁定

    //synchronized实现锁
    public class LockExample1 {
    
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        //全局变量
        public static int count = 0;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        add();
    
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("count:"+count);
    
            //关闭线程池
            executorService.shutdown();
    
        }
    
        private synchronized static void add(){
            count++;
        }
    
    }
    
    
    //ReentrantLock来加锁
    public class LockExample2 {
    
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        //全局变量
        public static int count = 0;
    
        private final static Lock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        add();
    
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("count:"+count);
    
            //关闭线程池
            executorService.shutdown();
    
        }
    
        private static void add(){
            try {
                lock.lock();
                count++;
            }finally {
                //放在finally里保证锁一定能释放
                lock.unlock();
            }
    
    
        }
    
    }

    五,ReentrantReadWriteLock
    在没有任何读读写锁的时候,才可以取得写入锁.
    该类里有两个锁:读锁和写锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    可以并发读,只能同步写。当读的操作远大于写的操作数量时,会使写线程遭遇饥饿,写操作无法获取锁
    //ReentrantReadWriteLock来加锁
    public class LockExample3 {
        private final Map<String,Data> map = new TreeMap<>();
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private final Lock readLock = lock.readLock();
        private final Lock writeLock = lock.writeLock();
    
        public Data get(String key){
            readLock.lock();
            try {
                return map.get(key);
            }finally {
                readLock.unlock();
            }
        }
        public Data put(String key,Data data){
            writeLock.lock();
            try {
                return map.put(key,data);
            }finally {
                writeLock.unlock();
            }
    
        }
    
        public Set<String> getAllKeys(){
    
            readLock.lock();
            try {
                return map.keySet();
            }finally {
                readLock.unlock();
            }
        }
    
        class Data{}
    }
    
    
    

    六,StampedLock
    StampedLock控制锁有三种模式。分别是写,读,乐观读。重点是在乐观读上
    一个StampedLock的状态是由版本和模式两个部分组成
    //StampedLock的官方例子
    public class LockExample4 {
        class Point {
            private double x, y;
            private final StampedLock sl = new StampedLock();
    
            void move(double deltaX, double deltaY) { // an exclusively locked method
                long stamp = sl.writeLock();
                try {
                    x += deltaX;
                    y += deltaY;
                } finally {
                    sl.unlockWrite(stamp);
                }
            }
    
            //下面看看乐观读锁案例
            double distanceFromOrigin() { // A read-only method
                long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
                double currentX = x, currentY = y;  //将两个字段读入本地局部变量
                if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                    stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                    try {
                        currentX = x; // 将两个字段读入本地局部变量
                        currentY = y; // 将两个字段读入本地局部变量
                    } finally {
                        sl.unlockRead(stamp);
                    }
                }
                return Math.sqrt(currentX * currentX + currentY * currentY);
            }
    
            //下面是悲观读锁案例
            void moveIfAtOrigin(double newX, double newY) { // upgrade
                // Could instead start with optimistic, not read mode
                long stamp = sl.readLock();
                try {
                    while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                        long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                        if (ws != 0L) { //这是确认转为写锁是否成功
                            stamp = ws; //如果成功 替换票据
                            x = newX; //进行状态改变
                            y = newY;  //进行状态改变
                            break;
                        } else { //如果不能成功转换为写锁
                            sl.unlockRead(stamp);  //我们显式释放读锁
                            stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                        }
                    }
                } finally {
                    sl.unlock(stamp); //释放读锁或写锁
                }
            }
        }
    
    }
    
    
    //StampedLock的例子
    public class LockExample5 {
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        //全局变量
        public static int count = 0;
        private final static StampedLock lock = new StampedLock();
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        add();
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("count:"+count);
    
            //关闭线程池
            executorService.shutdown();
    
        }
        private static void add(){
            long stamp = lock.writeLock();
            try {
                count++;
            }finally {
                //放在finally里保证锁一定能释放
                lock.unlock(stamp);
            }
        }
    }

    七,总结:
    当只有少量的竞争者时,synchronized是一个不错的选择,(synchronized不会引发死锁,jvm会自动解锁)
    竞争者不少,但是线程增长的趋势我们是能够预估的,这时可以选择ReentrantLock

    八,Condition

    //Condition的使用
    public class LockExample6 {
    
        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock();
            Condition condition = reentrantLock.newCondition();
    
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    System.out.println("wait signal"); // 1
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("get signal"); // 4
                reentrantLock.unlock();
            }).start();
    
            new Thread(() -> {
                reentrantLock.lock();
                System.out.println("get lock"); // 2
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                condition.signalAll();
                System.out.println("send signal ~ "); // 3
                reentrantLock.unlock();
            }).start();
        }
    
    
    }
    
    
    
     
  • 相关阅读:
    12.C语言控制窗口
    11.字符,字符常见开发,_itoa函数
    Clusterware 和 RAC 中的域名解析的配置校验和检查 (文档 ID 1945838.1)
    导致实例逐出的五大问题 (文档 ID 1526186.1)
    如何诊断 11.2 集群节点驱逐问题 (文档 ID 1674872.1)
    11gR2新特性---Gpnp守护进程
    CSS 功能简介
    11gR2新特性---gipc守护进程
    10g集群启动顺序
    11gR2集群件任务角色分离(Job Role Separation)简介
  • 原文地址:https://www.cnblogs.com/inspred/p/9520973.html
Copyright © 2011-2022 走看看