zoukankan      html  css  js  c++  java
  • coding++:Semaphore—RateLimiter-漏桶算法-令牌桶算法

    java中对于生产者消费者模型,或者小米手机营销 1分钟卖多少台手机等都存在限流的思想在里面。

    关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)

    Semaphore:从线程个数限流

    RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法

    令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌

    漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据

    应用场景:

    漏桶算法:必须读写分流的情况下,限制读取的速度

    令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000实现的方法都是一样。

    RateLimiter来实现对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题

    1、关于RateLimter和Semphore简单用法

    package concurrent;
     
    import com.google.common.util.concurrent.RateLimiter;
     
    import java.util.concurrent.*;
    import java.util.stream.IntStream;
     
    import static java.lang.Thread.currentThread;
     
    /**
     * ${DESCRIPTION}
     * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
     * Semaphore:从线程个数限流
     * RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
     *
     * @author mengxp
     * @version 1.0
     * @create 2018-01-15 22:44
     **/
    public class RateLimiterExample {
     
       //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
        private final static RateLimiter limiter=RateLimiter.create(0.5d);
     
     
        //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
        private final static Semaphore sem=new Semaphore(3);
        private static void testSemaphore(){
            try {
                sem.acquire();
                System.out.println(currentThread().getName()+" is doing work...");
                TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                sem.release();
                System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
            }
        }
     
        public static void runTestSemaphore(){
            ExecutorService service = Executors.newFixedThreadPool(10);
            IntStream.range(0,10).forEach((i)->{
                //RateLimiterExample::testLimiter 这种写法是创建一个线程
                service.submit(RateLimiterExample::testSemaphore);
            });
        }
     
        /**
         * Guava的RateLimiter
         */
        private static void testLimiter(){
            System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
        }
     
        //Guava的RateLimiter
        public static void runTestLimiter(){
            ExecutorService service = Executors.newFixedThreadPool(10);
            IntStream.range(0,10).forEach((i)->{
                //RateLimiterExample::testLimiter 这种写法是创建一个线程
                service.submit(RateLimiterExample::testLimiter);
            });
        }
     
     
     
        public static void main(String[] args) {
            IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9
            //runTestLimiter();
            runTestSemaphore();
        }
    }

    2、实现漏桶算法

    package concurrent.BucketAl;
     
    import com.google.common.util.concurrent.Monitor;
    import com.google.common.util.concurrent.RateLimiter;
     
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
     
    import static java.lang.Thread.currentThread;
     
    /**
     * ${DESCRIPTION}
     *
     * @author mengxp
     * @version 1.0
     * @create 2018-01-20 22:42
     * 实现漏桶算法 实现多线程生产者消费者模型 限流
     **/
    public class Bucket {
        //定义桶的大小
        private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();
     
        private final static int  BUCKET_LIMIT=1000;
     
        //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
        private final RateLimiter consumerRate=RateLimiter.create(10d);
     
        //往桶里面放数据时,确认没有超过桶的最大的容量
        private Monitor offerMonitor=new Monitor();
     
        //从桶里消费数据时,桶里必须存在数据
        private Monitor consumerMonitor=new Monitor();
     
     
        /**
         * 往桶里面写数据
         * @param data
         */
        public void submit(Integer data){
            if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
                try {
                    container.offer(data);
                    System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
                } finally {
                    offerMonitor.leave();
                }
            }else {
                //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
                //或者存入MQ DB等后续处理
                throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
            }
        }
     
     
        /**
         * 从桶里面消费数据
         * @param consumer
         */
        public void takeThenConsumer(Consumer<Integer> consumer){
            if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
                try {
                    //不打印时 写 consumerRate.acquire();
                    System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
                    Integer data = container.poll();
                    //container.peek() 只是去取出来不会删掉
                    consumer.accept(data);
                }finally {
                    consumerMonitor.leave();
                }
            }else {
                //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
                System.out.println("will consumer Data from MQ...");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
     
    }

    2.1 漏桶算法测试类

    package concurrent.BucketAl;
     
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.IntStream;
     
    import static java.lang.Thread.currentThread;
     
    /**
     * ${DESCRIPTION}
     *
     * @author mengxp
     * @version 1.0
     * @create 2018-01-20 23:11
     * 漏桶算法测试
     * 实现漏桶算法 实现多线程生产者消费者模型 限流
     **/
    public class BuckerTest {
     
        public static void main(String[] args) {
            final Bucket bucket = new Bucket();
            final AtomicInteger DATA_CREATOR = new AtomicInteger(0);
     
            //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
            IntStream.range(0, 10).forEach(i -> {
                new Thread(() -> {
                    for (; ; ) {
                        int data = DATA_CREATOR.incrementAndGet();
                        try {
                            bucket.submit(data);
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (Exception e) {
                            //对submit时,如果桶满了可能会抛出异常
                            if (e instanceof IllegalStateException) {
                                System.out.println(e.getMessage());
                                //当满了后,生产线程就休眠1分钟
                                try {
                                    TimeUnit.SECONDS.sleep(60);
                                } catch (InterruptedException e1) {
                                    e1.printStackTrace();
                                }
                            }
                        }
                    }
                }).start();
            });
     
     
            //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
            IntStream.range(0, 10).forEach(i -> {
                new Thread(
                        () -> {
                            for (; ; ) {
                                bucket.takeThenConsumer(x -> {
                                    System.out.println(currentThread()+"C.." + x);
                                });
                            }
                        }
                ).start();
            });
     
        }
    }

    3、令牌桶算法

    package concurrent.TokenBucket;
     
    import com.google.common.util.concurrent.RateLimiter;
     
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
     
    import static java.lang.Thread.currentThread;
    import static java.lang.Thread.interrupted;
     
    /**
     * ${DESCRIPTION}
     *
     * @author mengxp
     * @version 1.0
     * @create 2018-01-21 0:18
     * 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
     * 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
     *
     * 应用场景:
     * 漏桶算法:必须读写分流的情况下,限制读取的速度
     * 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
     *
     * 实现的方法都是一样。RateLimiter来实现
     * 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
     **/
    public class TokenBuck {
     
        //可以使用 AtomicInteger+容量  可以不用Queue实现
       private AtomicInteger phoneNumbers=new AtomicInteger(0);
       private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次
       //默认销售500台
       private final static int DEFALUT_LIMIT=500;
       private final int saleLimit;
     
        public TokenBuck(int saleLimit) {
            this.saleLimit = saleLimit;
        }
     
        public TokenBuck() {
            this(DEFALUT_LIMIT);
        }
     
        public int buy(){
            //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)
            //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success
            //里面也需要加上double check
           /* if (phoneNumbers.get()>=saleLimit){
                throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
            }*/
     
            //目前设置超时时间,10秒内没有抢到就抛出异常
            //这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算
             boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
             if (success){
                 if (phoneNumbers.get()>=saleLimit){
                     throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
                 }
                 int phoneNo = phoneNumbers.getAndIncrement();
                 System.out.println(currentThread()+" user has get :["+phoneNo+"]");
                 return phoneNo;
             }else {
                 //超时后 同一时间,很大的流量来强时,超时快速失败。
                 throw new RuntimeException(currentThread()+"has timeOut can try again...");
             }
     
        }
    }

    3.1、令牌桶算法的测试类

    package concurrent.TokenBucket;
     
    import java.util.stream.IntStream;
     
    /**
     * ${DESCRIPTION}
     *
     * @author mengxp
     * @version 1.0
     * @create 2018-01-21 0:40
     **/
    public class TokenBuckTest {
        public static void main(String[] args) {
            final TokenBuck tokenBuck=new TokenBuck(200);
     
     
            IntStream.range(0,300).forEach(i->{
                //目前测试时,让一个线程抢一次,不用循环抢
                //tokenBuck::buy 这种方式 产生一个Runnable
                new Thread(tokenBuck::buy).start();
            });
        }
    }
  • 相关阅读:
    基于TensorRT的BERT实时自然语言理解(下)
    基于TensorRT的BERT实时自然语言理解(上)
    lsof
    kata-runtime spec
    kata 虚拟机
    json + jq命令
    kata-runtime run mycontainer
    kata-runtime来运行容器
    docker + docker-runc
    kata container在aarch64上成功运行
  • 原文地址:https://www.cnblogs.com/codingmode/p/11872771.html
Copyright © 2011-2022 走看看