zoukankan      html  css  js  c++  java
  • 信号量、栅栏和倒计数器

    信号量、栅栏和倒计数器

    学习材料来源于网络
    如有侵权,联系删除

    Semaphore

    简称信号量,可以控制多个线程的资源争抢许可。

    • acquire:获取一个许可,如果没有就等待,
    • release:释放一个许可。
    • availablePermits:方法得到可用的许可数目

    适用场景:

    1、代码并发处理限流

    如hystrix框架

    示例1:

    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            int guestCount = 9; // 客人数量
            Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < guestCount; i++) {
                String vipNo = "vip-00" + i;
                new Thread(
                        () -> {
                            try {
                                semaphore.acquire(); // 获取令牌
    
                                SemaphoreDemo.service(vipNo);
    
                                semaphore.release(); // 释放令牌
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        })
                        .start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public static void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    }
    

    自定义AQS实现信号量

    示例2

    SemaphoreAqs

    package icu.shaoyayu.multithreading.chapter6;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class SemaphoreAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
        // 记录资源状态
        public volatile AtomicInteger state = new AtomicInteger(0);
    
        // 共享资源占用的逻辑,返回资源的占用情况
        public int tryAcquireShared(){
            throw new UnsupportedOperationException();
        }
    
        public void acquireShared(){
            boolean addQ = true;
            while(tryAcquireShared() < 0) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryReleaseShared(){
            throw new UnsupportedOperationException();
        }
    
        public void releaseShared(){
            if (tryReleaseShared()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        // 独占资源相关的代码
    
        public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
            throw new UnsupportedOperationException();
        }
    
        public void acquire() {
            boolean addQ = true;
            while (!tryAcquire()) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release() { // 定义了 释放资源之后要做的操作
            if (tryRelease()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    }
    

    SimpleSemaphore

    package icu.shaoyayu.multithreading.chapter6;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class SimpleSemaphore {
        SemaphoreAqs aqs = new SemaphoreAqs() {
            @Override
            public int tryAcquireShared() { // 信号量获取, 数量 - 1
                for(;;) {
                    int count =  getState().get();
                    int n = count - 1;
                    if(count <= 0 || n < 0) {
                        return -1;
                    }
                    if(getState().compareAndSet(count, n)) {
                        return 1;
                    }
                }
            }
    
            @Override
            public boolean tryReleaseShared() { // state + 1
                return getState().incrementAndGet() >= 0;
            }
        };
    
        /** 许可数量 */
        public SimpleSemaphore(int count) {
            aqs.getState().set(count); // 设置资源的状态
        }
    
        public void acquire() {
            aqs.acquireShared();
        } // 获取令牌
    
        public void release() {
            aqs.releaseShared();
        } // 释放令牌
    }
    

    SemaphoreDemo

    package icu.shaoyayu.multithreading.chapter6;
    
    import java.util.Random;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            int guestCount = 9; // 客人数量
            SimpleSemaphore semaphore = new SimpleSemaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < guestCount; i++) {
                String vipNo = "vip-00" + i;
                new Thread(
                        () -> {
                            try {
                                semaphore.acquire(); // 获取令牌
    
                                SemaphoreDemo.service(vipNo);
    
                                semaphore.release(); // 释放令牌
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        })
                        .start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public static void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    }
    

    CountDownLatch

    倒计数器

    创建对象时,传入指定数值作为线程参与的数量;

    • await:方法等待计数器值变为0,在这之前,线程进入等待状态;
    • countdown:计数器数值减一,直到为0;

    经常用于等待其他线程执行到某一节点,再继续执行当前线程代码

    使用场景示例:

    1、统计线程执行的情况;

    2、压力测试中,使用countDownLatch实现最大程度的并发处理;

    3、多个线程之间,相互通信,比如线程异步调用完接口,结果通知;

    示例3

    public static void main(String[] args) throws InterruptedException {
        // 一个请求,后台需要调用多个接口 查询数据
        CountDownLatch cdLdemo = new CountDownLatch(10); // 创建,计数数值
        for (int i = 0; i < 10; i++) { // 启动九个线程,最后一个两秒后启动
            int finalI = i;
            new Thread(() -> {
                // 参与计数
                try {
                    // 等待计数器为0
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("我是" + Thread.currentThread() + ".我执行接口-" + finalI +"调用了");
                // 不影响后续操作
                cdLdemo.countDown();
            }).start();
        }
        cdLdemo.await(); //等待其他线程的执行结束以后
    
        System.out.println("全部执行完毕.我来召唤神龙");
    
    }
    

    自定义AQS实现信号量

    示例4

    package icu.shaoyayu.multithreading.chapter6;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class SimpleCDLAqs {
        // 同步资源状态
        volatile AtomicInteger state = new AtomicInteger(0);
        // 当前锁的拥有者
        protected volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // java q 线程安全
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
        public void acquire() {
            boolean addQ = true;
            while (!tryAcquire()) {
                if (addQ) {
                    // 塞到等待锁的集合中
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 挂起这个线程
                    LockSupport.park();
                    // 后续,等待其他线程释放锁,收到通知之后继续循环
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void release() {
            // cas 修改 owner 拥有者
            if (tryRelease()) {
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread waiter = iterator.next();
                    LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
                }
            }
        }
    
        // 判断量够不够
        public void acquireShared() {
            boolean addQ = true;
            while (tryAcquireShared() < 0) {
                if (addQ) {
                    // 塞到等待锁的集合中
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 挂起这个线程
                    LockSupport.park();
                    // 后续,等待其他线程释放锁,收到通知之后继续循环
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void releaseShared() {
            // cas 修改 owner 拥有者
            if (tryReleaseShared()) {
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread waiter = iterator.next();
                    LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
                }
            }
        }
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public int tryAcquireShared() {
            throw new UnsupportedOperationException();
        }
    
        public boolean tryReleaseShared() {
            throw new UnsupportedOperationException();
        }
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    }
    
    package icu.shaoyayu.multithreading.chapter6;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class SimpleCDLAqs {
        // 同步资源状态
        volatile AtomicInteger state = new AtomicInteger(0);
        // 当前锁的拥有者
        protected volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // java q 线程安全
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
        public void acquire() {
            boolean addQ = true;
            while (!tryAcquire()) {
                if (addQ) {
                    // 塞到等待锁的集合中
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 挂起这个线程
                    LockSupport.park();
                    // 后续,等待其他线程释放锁,收到通知之后继续循环
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void release() {
            // cas 修改 owner 拥有者
            if (tryRelease()) {
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread waiter = iterator.next();
                    LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
                }
            }
        }
    
        // 判断量够不够
        public void acquireShared() {
            boolean addQ = true;
            while (tryAcquireShared() < 0) {
                if (addQ) {
                    // 塞到等待锁的集合中
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 挂起这个线程
                    LockSupport.park();
                    // 后续,等待其他线程释放锁,收到通知之后继续循环
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void releaseShared() {
            // cas 修改 owner 拥有者
            if (tryReleaseShared()) {
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread waiter = iterator.next();
                    LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
                }
            }
        }
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public int tryAcquireShared() {
            throw new UnsupportedOperationException();
        }
    
        public boolean tryReleaseShared() {
            throw new UnsupportedOperationException();
        }
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    }
    

    CyclicBarrier

    线程栅栏

    创建对象时,指定栅栏线程数量。

    await:等指定数量的线程都处于等待状态时,继续执行后续代码。

    barrierAction:线程数量到了指定量之后,自动触发执行指定任务。

    CounDownLatch重要区别在于,CyclicBarrier对象可多次触发执行;

    典型场景:

    1、数据量比较大时,实现批量插入数据到数据库;

    2、数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总;

    示例5

    package icu.shaoyayu.multithreading.chapter6;
    
    import java.util.ArrayList;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class CyclicBarrierTest {
    
        public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
            // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。
    
            // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
            CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    // 这是每满足4次数据库操作,就触发一次批量执行
                    System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                    for (int i = 0; i < 4; i++) {
                        System.out.println(sqls.poll());
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        sqls.add("data - " + Thread.currentThread()); // 缓存起来
                        Thread.sleep(1000L); // 模拟数据库操作耗时
                        barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                        System.out.println(Thread.currentThread() + "插入完毕");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    
            Thread.sleep(2000);
        }
    }
    

    运行结果

    分析。这种任务的4个线程组合跑一趟,但是在这个第的问题是线程8和线程9会一直阻塞等待,

    记得加油学习哦^_^
  • 相关阅读:
    重构的信号
    枚举类返回Map键值对,绑定到下拉框
    js onclick函数中传字符串参数的问题
    python opencv3 矩形 圆形边框
    python opencv3 轮廓检测
    python opencv3 滤波器 卷积核
    python opencv3 窗口显示摄像头的帧
    python opencv3 显示一张图片
    python opencv3 获取摄像头视频
    python opencv3 视频文件的读写
  • 原文地址:https://www.cnblogs.com/shaoyayu/p/14073962.html
Copyright © 2011-2022 走看看