zoukankan      html  css  js  c++  java
  • 并发编程

    ReentrantLock - 独占锁

    特性:①独占锁 ②可重入 ③公平/非公平 ④可超时中断

    // ReentrantLock
    public class ReentrantLockTest {
        private static Lock lock = new ReentrantLock();
        private static int count = 0;
    
        public static void main(String[] args) throws InterruptedException {
            for(int i = 0;i < 1000; i++){
                new Thread(ReentrantLockTest::incre).start();
            }
    
            Thread.sleep(1000);
            System.out.println(count);
        }
    
        public static void incre() {
            lock.lock();   // 抢占锁
            try {
                count++;
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();  // 释放锁
            }
        }
    }
    

    ReentrantReadWriteLock - 读写锁

    ReentrantReadWriteLock使用同一个Sync队列,重写了共享资源/互斥资源的获取与释放逻辑,通过state的高16位存储共享状态,state的低16位存储互斥状态。
    API:①readLock() - 获取读锁 ②writeLock() - 获取写锁 =③lock.lock()/unlock()

    // 读写锁,允许读读共享,读写互斥,写写互斥
    public class ReentrantReadWriteLockTest {
        private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
        private static Lock readLock = readWriteLock.readLock();
        private static Lock writeLock = readWriteLock.writeLock();
    
        static String message = null;
    
        public static void main(String[] args) {
            new Thread(ReentrantReadWriteLockTest::producer).start();
            new Thread(ReentrantReadWriteLockTest::consumer).start();
        }
    
        public static void producer(){
            Random random = new Random(47);
            while (true) {
                writeLock.lock();
                try {
                    message = "" + random.nextInt();
                    System.out.println("write " + message);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    writeLock.unlock();
                }
            }
        }
    
        public static void consumer(){
            while (true) {
                readLock.lock();
                try {
                    System.out.println("read " + message);
                } finally {
                    readLock.unlock();
                }
            }
        }
    }
    

    Condition - 条件等待

    Condition是JUC提供的,功能等同于wait/notify功能的接口,具体实现为AbstractQueuedSynchronizer的内部类ConditionObject(只能在AQS对象实例内部创建),基于一个单向的Fifo队列实现等待与唤醒
    API:①await() ②signal() ③signalAll()

    // 阻塞队列的put()/take()方法即为通过两个contidion实现等待
    public class ConditionTest{
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Condition empty = lock.newCondition();
            Condition full = lock.newCondition();
            Queue<Integer> queue = new LinkedList<Integer>();  // 线程不安全的队列
    
            new Thread(new Producer(queue,4,lock,empty,full)).start();
            new Thread(new Consumer(queue,lock,empty,full)).start();
        }
    }
    
    // Producer 
    public class Producer implements Runnable{
        private Queue<Integer> msg;
        private int maxSize;
        private Lock lock;
        private Condition empty;
        private Condition full;
    
        public Producer(Queue<Integer> msg, int maxSize, Lock lock, Condition empty, Condition full) {
            this.msg = msg;
            this.maxSize = maxSize;
            this.lock = lock;
            this.empty = empty;
            this.full = full;
        }
    
        @Override
        public void run() {
            for(int i = 0;;i++){
                lock.lock();
                try {
                    while (msg.size() == maxSize){
                        System.out.println("消息队列已满 - waiting");
                        full.await();                                    // 队列已满,加入full等待队列
                    }
                    System.out.print("生产消息:"+ msg.add(i));
                    empty.signal();                                      // 生成了消息,唤醒empty等待队列
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                System.out.println();
            }
        }
    }
    
    // Consumer 
    public class Consumer implements Runnable{
        private Queue<Integer> msg;
        private Lock lock;
        private Condition empty;
        private Condition full;
    
        public Consumer(Queue<Integer> msg, Lock lock, Condition empty, Condition full) {
            this.msg = msg;
            this.lock = lock;
            this.empty = empty;
            this.full = full;
        }
    
        @Override
        public void run() {
            for(int i = 0;;i++){
                lock.lock();
                try {
                    while (msg.isEmpty()){
                        System.out.println("消息队列已空 - waiting");
                        empty.await();
                    }
                    System.out.print("消费消息:" + msg.remove());
                    full.signal();
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                System.out.println();
            }
        }
    }
    

    CountDownLatch 计数器

    线程调用countDownLatch.await()方法时将会被挂起,直到计数器计数值减为0时,唤醒所有await()线程 - 底层为共享模式的实现
    API:①await() ②countDown() - 计数值减一

    // 用法一:初始计数值设为1,所有await线程将在同一时间被唤醒(多个线程await(),一个线程countDown())
        public static void demo1(){
            countDownLatch = new CountDownLatch(1);
            new Thread(CountDownLatchTest::execute1).start();
            new Thread(CountDownLatchTest::execute1).start();
            new Thread(CountDownLatchTest::execute1).start();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();   // 初始计数值计为1,countDown()方法唤醒所有await()线程,使所有线程同步开始
        }
    
        public static void execute1(){
            System.out.println(Thread.currentThread().getName() + " -> begin");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " -> end");
        }
    
    
    // 用法二:初始计数值设为任意,只有任意个线程调用countDown()方法后,方才唤醒awati线程(一个线程await(),等待多个线程countDown())
        public static void demo2(){
            countDownLatch = new CountDownLatch(3);
            new Thread(CountDownLatchTest::execute2).start();
            new Thread(CountDownLatchTest::execute2).start();
            new Thread(CountDownLatchTest::execute2).start();
    
            System.out.println(Thread.currentThread().getName() + " -> main.await");
            try {
                countDownLatch.await(); // await()线程等待计数值个线程执行countDown()后才能被唤醒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " -> main.signal");
        }
    
        public static void execute2(){
            System.out.println(Thread.currentThread().getName() + " -> 已执行");
            countDownLatch.countDown();
        }
    

    Semaphore 信号灯(令牌桶)

    只有获得令牌的线程可以继续执行,用以控制同时访问的线程个数。
    API:①acquire() - 获取令牌 ②release() - 释放令牌

    // 只有获得令牌的线程可以继续执行,用以控制同时访问的线程个数。
    public class SemaphoreTest {
        private static Semaphore semaphore = new Semaphore(5);
    
        public static void main(String[] args) {
            new Thread(SemaphoreTest::execute).start();
            new Thread(SemaphoreTest::execute).start();
            new Thread(SemaphoreTest::execute).start();
            new Thread(SemaphoreTest::execute).start();
            new Thread(SemaphoreTest::execute).start();
        }
    
        public static void execute(){
            while(true) {
                try {
                    semaphore.acquire();
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " -> 获得了令牌");
            }
        }
    }
    

    CyclicBarrier - 循环屏障

    让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续工作。
    (由最后一个进入线程执行barrierAction,并唤醒对应数量的await状态线程,然后计数值会reset,计入下一次循环)
    API: await() - CyclicBarrier是当最后一个线程到达屏障时,自动唤醒。

    // 屏障,当进入await状态的线程数达到parties时,会由最后一个进入线程执行barrierAction,然后释放对应数量的await状态线程(然后发送reset)
    public class CyclicBarrierTest {
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4,new Thread(()-> System.out.println(Thread.currentThread().getName())));
    
        public static void main(String[] args) {
            new Thread(CyclicBarrierTest::execute).start();
            new Thread(CyclicBarrierTest::execute).start();
            new Thread(CyclicBarrierTest::execute).start();
            new Thread(CyclicBarrierTest::execute).start(); // 以上4个线程将会被唤醒
            
            new Thread(CyclicBarrierTest::execute).start(); // 计数值被reset后,由于后来线程数无法达到计数值,因此该线程将被永久阻塞
        }
    
        public static void execute() {
            System.out.println(Thread.currentThread().getName() + " -> begin");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " -> end");
        }
    }
    

    Exchanger - 值交换器

    用以实现两个线程之间消息的交换,先到达exchange()的线程将被阻塞,等待下一个线程exchange()唤醒

    // 交换器,用于两个线程之间交换数据(先到达的线程将会被阻塞)
    public class ExchangerTesst {
        private static Exchanger<String> exchanger = new Exchanger<>();
    
        public static void main(String[] args) {
            new Thread(ExchangerTesst::execute).start();
            new Thread(ExchangerTesst::execute).start();
        }
    
        private static void execute(){
            try {
                String msg = Thread.currentThread().getName();
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " -> " + msg);
                msg = exchanger.exchange(Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName() + " -> " + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    // Thread-0 -> Thread-0
    // Thread-1 -> Thread-1
    // Thread-1 -> Thread-0 -- 发生了值的交换
    // Thread-0 -> Thread-1
    

    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    Java虚拟机(第二版) 学习笔记之Class类文件的结构
    JVM之深入浅出之垃圾收集算法
    Java虚拟机(第二版) 学习笔记之OutOfMemoryError
    Java虚拟机(第二版) 学习笔记
    平滑加权轮询负载均衡(轮询)算法
    java AQS(AbstractQueuedSynchronizer)同步器详解
    mybatis Interceptor拦截器代码详解
    aspectj编程简介
    Java并发编程阅读笔记-Java监视器模式示例
    我们该怎么结合日志做优化
  • 原文地址:https://www.cnblogs.com/kiqi/p/14392899.html
Copyright © 2011-2022 走看看