zoukankan      html  css  js  c++  java
  • java并发系列

    这是java高并发系列第29篇。

    环境:jdk1.8。

    本文内容

    1. 介绍常见的限流算法
    2. 通过控制最大并发数来进行限流
    3. 通过漏桶算法来进行限流
    4. 通过令牌桶算法来进行限流
    5. 限流工具类RateLimiter

    常见的限流的场景

    1. 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理
    2. 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
    3. 医院看病通过发放排队号的方式来做限流处理。

    常见的限流算法

    1. 通过控制最大并发数来进行限流
    2. 使用漏桶算法来进行限流
    3. 使用令牌桶算法来进行限流

    通过控制最大并发数来进行限流

    以秒杀业务为例,10个iphone,100万人抢购,100万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在10个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。

    并发控制的,通俗解释:一大波人去商场购物,必须经过一个门口,门口有个门卫,兜里面有指定数量的门禁卡,来的人先去门卫那边拿取门禁卡,拿到卡的人才可以刷卡进入商场,拿不到的可以继续等待。进去的人出来之后会把卡归还给门卫,门卫可以把归还来的卡继续发放给其他排队的顾客使用。

    JUC中提供了这样的工具类:Semaphore,示例代码:

    package com.itsoku.chat29;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 跟着阿里p7学并发,微信公众号:javacode2018
     */
    public class Demo1 {
    
        static Semaphore semaphore = new Semaphore(5);
    
        public static void main(String[] args) {
            for (int i = 0; i < 20; i++) {
                new Thread(() -> {
                    boolean flag = false;
                    try {
                        flag = semaphore.tryAcquire(100, TimeUnit.MICROSECONDS);
                        if (flag) {
                            //休眠2秒,模拟下单操作
                            System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");
                            TimeUnit.SECONDS.sleep(2);
                        } else {
                            System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        if (flag) {
                            semaphore.release();
                        }
                    }
                }).start();
            }
        }
    
    }
    

    输出:

    Thread[Thread-10,5,main],尝试下单中。。。。。
    Thread[Thread-8,5,main],尝试下单中。。。。。
    Thread[Thread-9,5,main],尝试下单中。。。。。
    Thread[Thread-12,5,main],尝试下单中。。。。。
    Thread[Thread-11,5,main],尝试下单中。。。。。
    Thread[Thread-2,5,main],秒杀失败,请稍微重试!
    Thread[Thread-1,5,main],秒杀失败,请稍微重试!
    Thread[Thread-18,5,main],秒杀失败,请稍微重试!
    Thread[Thread-16,5,main],秒杀失败,请稍微重试!
    Thread[Thread-0,5,main],秒杀失败,请稍微重试!
    Thread[Thread-3,5,main],秒杀失败,请稍微重试!
    Thread[Thread-14,5,main],秒杀失败,请稍微重试!
    Thread[Thread-6,5,main],秒杀失败,请稍微重试!
    Thread[Thread-13,5,main],秒杀失败,请稍微重试!
    Thread[Thread-17,5,main],秒杀失败,请稍微重试!
    Thread[Thread-7,5,main],秒杀失败,请稍微重试!
    Thread[Thread-19,5,main],秒杀失败,请稍微重试!
    Thread[Thread-15,5,main],秒杀失败,请稍微重试!
    Thread[Thread-4,5,main],秒杀失败,请稍微重试!
    Thread[Thread-5,5,main],秒杀失败,请稍微重试!
    

    关于Semaphore的使用,可以移步:JUC中的Semaphore(信号量)

    使用漏桶算法来进行限流

    国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。

    漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    漏桶算法示意图:

    简陋版的实现,代码如下:

    package com.itsoku.chat29;
    
    import java.util.Objects;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * 跟着阿里p7学并发,微信公众号:javacode2018
     */
    public class Demo2 {
    
        public static class BucketLimit {
            static AtomicInteger threadNum = new AtomicInteger(1);
            //容量
            private int capcity;
            //流速
            private int flowRate;
            //流速时间单位
            private TimeUnit flowRateUnit;
            private BlockingQueue<Node> queue;
            //漏桶流出的任务时间间隔(纳秒)
            private long flowRateNanosTime;
    
            public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
                this.capcity = capcity;
                this.flowRate = flowRate;
                this.flowRateUnit = flowRateUnit;
                this.bucketThreadWork();
            }
    
            //漏桶线程
            public void bucketThreadWork() {
                this.queue = new ArrayBlockingQueue<Node>(capcity);
                //漏桶流出的任务时间间隔(纳秒)
                this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
                Thread thread = new Thread(this::bucketWork);
                thread.setName("漏桶线程-" + threadNum.getAndIncrement());
                thread.start();
            }
    
            //漏桶线程开始工作
            public void bucketWork() {
                while (true) {
                    Node node = this.queue.poll();
                    if (Objects.nonNull(node)) {
                        //唤醒任务线程
                        LockSupport.unpark(node.thread);
                    }
                    //休眠flowRateNanosTime
                    LockSupport.parkNanos(this.flowRateNanosTime);
                }
            }
    
            //返回一个漏桶
            public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
                if (capcity < 0 || flowRate < 0) {
                    throw new IllegalArgumentException("capcity、flowRate必须大于0!");
                }
                return new BucketLimit(capcity, flowRate, flowRateUnit);
            }
    
            //当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务
            public boolean acquire() {
                Thread thread = Thread.currentThread();
                Node node = new Node(thread);
                if (this.queue.offer(node)) {
                    LockSupport.park();
                    return true;
                }
                return false;
            }
    
            //漏桶中存放的元素
            class Node {
                private Thread thread;
    
                public Node(Thread thread) {
                    this.thread = thread;
                }
            }
        }
    
        public static void main(String[] args) {
            BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
            for (int i = 0; i < 15; i++) {
                new Thread(() -> {
                    boolean acquire = bucketLimit.acquire();
                    System.out.println(System.currentTimeMillis() + " " + acquire);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
    }
    

    代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);创建了一个容量为10,流水为60/分钟的漏桶。

    代码中用到的技术有:

    1. BlockingQueue阻塞队列
    2. JUC中的LockSupport工具类,必备技能

    使用令牌桶算法来进行限流

    令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。

    令牌桶算法示意图:

    有兴趣的可以自己去实现一个。

    限流工具类RateLimiter

    Google开源工具包Guava提供了限流工具类RateLimiter,可以非常方便的控制系统每秒吞吐量,示例代码如下:

    package com.itsoku.chat29;
    
    import com.google.common.util.concurrent.RateLimiter;
    
    import java.util.Calendar;
    import java.util.Date;
    import java.util.Objects;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * 跟着阿里p7学并发,微信公众号:javacode2018
     */
    public class Demo3 {
    
        public static void main(String[] args) throws InterruptedException {
            RateLimiter rateLimiter = RateLimiter.create(5);//设置QPS为5
            for (int i = 0; i < 10; i++) {
                rateLimiter.acquire();
                System.out.println(System.currentTimeMillis());
            }
            System.out.println("----------");
            //可以随时调整速率,我们将qps调整为10
            rateLimiter.setRate(10);
            for (int i = 0; i < 10; i++) {
                rateLimiter.acquire();
                System.out.println(System.currentTimeMillis());
            }
        }
    }
    

    输出:

    1566284028725
    1566284028922
    1566284029121
    1566284029322
    1566284029522
    1566284029721
    1566284029921
    1566284030122
    1566284030322
    1566284030522
    ----------
    1566284030722
    1566284030822
    1566284030921
    1566284031022
    1566284031121
    1566284031221
    1566284031321
    1566284031422
    1566284031522
    1566284031622
    

    代码中RateLimiter.create(5)创建QPS为5的限流对象,后面又调用rateLimiter.setRate(10);将速率设为10,输出中分2段,第一段每次输出相隔200毫秒,第二段每次输出相隔100毫秒,可以非常精准的控制系统的QPS。

    上面介绍的这些,业务中可能会用到,也可以用来应对面试。

    java高并发系列目录

    1. 第1天:必须知道的几个概念
    2. 第2天:并发级别
    3. 第3天:有关并行的两个重要定律
    4. 第4天:JMM相关的一些概念
    5. 第5天:深入理解进程和线程
    6. 第6天:线程的基本操作
    7. 第7天:volatile与Java内存模型
    8. 第8天:线程组
    9. 第9天:用户线程和守护线程
    10. 第10天:线程安全和synchronized关键字
    11. 第11天:线程中断的几种方式
    12. 第12天JUC:ReentrantLock重入锁
    13. 第13天:JUC中的Condition对象
    14. 第14天:JUC中的LockSupport工具类,必备技能
    15. 第15天:JUC中的Semaphore(信号量)
    16. 第16天:JUC中等待多线程完成的工具类CountDownLatch,必备技能
    17. 第17天:JUC中的循环栅栏CyclicBarrier的6种使用场景
    18. 第18天:JAVA线程池,这一篇就够了
    19. 第19天:JUC中的Executor框架详解1
    20. 第20天:JUC中的Executor框架详解2
    21. 第21天:java中的CAS,你需要知道的东西
    22. 第22天:JUC底层工具类Unsafe,高手必须要了解
    23. 第23天:JUC中原子类,一篇就够了
    24. 第24天:ThreadLocal、InheritableThreadLocal(通俗易懂)
    25. 第25天:掌握JUC中的阻塞队列
    26. 第26篇:学会使用JUC中常见的集合,常看看!
    27. 第27天:实战篇,接口性能提升几倍原来这么简单
    28. 第28天:实战篇,微服务日志的伤痛,一并帮你解决掉

    java高并发系列连载中,总计估计会有四五十篇文章。

    阿里p7一起学并发,公众号:路人甲java,每天获取最新文章!

  • 相关阅读:
    C# 温故而知新:Stream篇(—)
    C# 温故而知新:Stream篇(三)
    Unity教程之Unity Attribute的使用总结
    鹅厂分享会丨面向Unity程序员的Android快速上手教程
    【Unity】编辑器小教程
    浅析Unity 坐标系
    C# 温故而知新:Stream篇(五)
    C# 温故而知新:Stream篇(二)
    unity 协程原理与线程的区别
    简单获取2月天数
  • 原文地址:https://www.cnblogs.com/itsoku123/p/11383684.html
Copyright © 2011-2022 走看看