zoukankan      html  css  js  c++  java
  • 服务接口API限流 Rate Limit

    一、场景描述                                                                                                

         很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

        也就是面对大流量时,如何进行流量控制?

        服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

         实际场景中常用的限流策略:

    • Nginx前端限流

             按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

    • 业务应用系统限流

            1、客户端限流

            2、服务端限流

    • 数据库限流

            红线区,力保数据库

    二、常用的限流算法                                                                                       

         常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

         1、漏桶算法

             漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

       

             可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

             因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

         2、令牌桶算法

             令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

      令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

    三、基于Redis功能的实现                                                                                

           简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

          1)使用Redis的incr命令,将计数器作为Lua脚本         

    1 local current
    2 current = redis.call("incr",KEYS[1])
    3 if tonumber(current) == 1 then
    4     redis.call("expire",KEYS[1],1)
    5 end

            Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

           2)使用Reids的列表结构代替incr命令

     1 FUNCTION LIMIT_API_CALL(ip)
     2 current = LLEN(ip)
     3 IF current > 10 THEN
     4     ERROR "too many requests per second"
     5 ELSE
     6     IF EXISTS(ip) == FALSE
     7         MULTI
     8             RPUSH(ip,ip)
     9             EXPIRE(ip,1)
    10         EXEC
    11     ELSE
    12         RPUSHX(ip,ip)
    13     END
    14     PERFORM_API_CALL()
    15 END

             Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

        RPUSHX在后续的计数操作中进行增加操作。

    四、基于令牌桶算法的实现                                                                                

           令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

          

      1 import java.io.BufferedWriter;
      2 import java.io.FileOutputStream;
      3 import java.io.IOException;
      4 import java.io.OutputStreamWriter;
      5 import java.util.Random;
      6 import java.util.concurrent.ArrayBlockingQueue;
      7 import java.util.concurrent.Executors;
      8 import java.util.concurrent.ScheduledExecutorService;
      9 import java.util.concurrent.TimeUnit;
     10 import java.util.concurrent.locks.ReentrantLock;
     11  
     12 import com.google.common.base.Preconditions;
     13 import com.netease.datastream.util.framework.LifeCycle;
     14  
     15
     20 public class TokenBucket implements LifeCycle {
     21  
     22 // 默认桶大小个数 即最大瞬间流量是64M
     23  private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
     24  
     25 // 一个桶的单位是1字节
     26  private int everyTokenSize = 1;
     27  
     28 // 瞬间最大流量
     29  private int maxFlowRate;
     30  
     31 // 平均流量
     32  private int avgFlowRate;
     33  
     34 // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
     35  private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
     36  
     37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
     38  
     39 private volatile boolean isStart = false;
     40  
     41 private ReentrantLock lock = new ReentrantLock(true);
     42  
     43 private static final byte A_CHAR = 'a';
     44  
     45 public TokenBucket() {
     46  }
     47  
     48 public TokenBucket(int maxFlowRate, int avgFlowRate) {
     49  this.maxFlowRate = maxFlowRate;
     50  this.avgFlowRate = avgFlowRate;
     51  }
     52  
     53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
     54  this.everyTokenSize = everyTokenSize;
     55  this.maxFlowRate = maxFlowRate;
     56  this.avgFlowRate = avgFlowRate;
     57  }
     58  
     59 public void addTokens(Integer tokenNum) {
     60  
     61 // 若是桶已经满了,就不再家如新的令牌
     62  for (int i = 0; i < tokenNum; i++) {
     63  tokenQueue.offer(Byte.valueOf(A_CHAR));
     64  }
     65  }
     66  
     67 public TokenBucket build() {
     68  
     69 start();
     70  return this;
     71  }
     72  
     73 /**
     74  * 获取足够的令牌个数
     75  *
     76  * @return
     77  */
     78  public boolean getTokens(byte[] dataSize) {
     79  
     80 Preconditions.checkNotNull(dataSize);
     81  Preconditions.checkArgument(isStart, "please invoke start method first !");
     82  
     83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
     84  
     85 final ReentrantLock lock = this.lock;
     86  lock.lock();
     87  try {
     88  boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
     89  if (!result) {
     90  return false;
     91  }
     92  
     93 int tokenCount = 0;
     94  for (int i = 0; i < needTokenNum; i++) {
     95  Byte poll = tokenQueue.poll();
     96  if (poll != null) {
     97  tokenCount++;
     98  }
     99  }
    100  
    101 return tokenCount == needTokenNum;
    102  } finally {
    103  lock.unlock();
    104  }
    105  }
    106  
    107 @Override
    108  public void start() {
    109  
    110 // 初始化桶队列大小
    111  if (maxFlowRate != 0) {
    112  tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
    113  }
    114  
    115 // 初始化令牌生产者
    116  TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
    117  scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
    118  isStart = true;
    119  
    120 }
    121  
    122 @Override
    123  public void stop() {
    124  isStart = false;
    125  scheduledExecutorService.shutdown();
    126  }
    127  
    128 @Override
    129  public boolean isStarted() {
    130  return isStart;
    131  }
    132  
    133 class TokenProducer implements Runnable {
    134  
    135 private int avgFlowRate;
    136  private TokenBucket tokenBucket;
    137  
    138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
    139  this.avgFlowRate = avgFlowRate;
    140  this.tokenBucket = tokenBucket;
    141  }
    142  
    143 @Override
    144  public void run() {
    145  tokenBucket.addTokens(avgFlowRate);
    146  }
    147  }
    148  
    149 public static TokenBucket newBuilder() {
    150  return new TokenBucket();
    151  }
    152  
    153 public TokenBucket everyTokenSize(int everyTokenSize) {
    154  this.everyTokenSize = everyTokenSize;
    155  return this;
    156  }
    157  
    158 public TokenBucket maxFlowRate(int maxFlowRate) {
    159  this.maxFlowRate = maxFlowRate;
    160  return this;
    161  }
    162  
    163 public TokenBucket avgFlowRate(int avgFlowRate) {
    164  this.avgFlowRate = avgFlowRate;
    165  return this;
    166  }
    167  
    168 private String stringCopy(String data, int copyNum) {
    169  
    170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
    171  
    172 for (int i = 0; i < copyNum; i++) {
    173  sbuilder.append(data);
    174  }
    175  
    176 return sbuilder.toString();
    177  
    178 }
    179  
    180 public static void main(String[] args) throws IOException, InterruptedException {
    181  
    182 tokenTest();
    183  }
    184  
    185 private static void arrayTest() {
    186  ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
    187  tokenQueue.offer(1);
    188  tokenQueue.offer(1);
    189  tokenQueue.offer(1);
    190  System.out.println(tokenQueue.size());
    191  System.out.println(tokenQueue.remainingCapacity());
    192  }
    193  
    194 private static void tokenTest() throws InterruptedException, IOException {
    195  TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
    196  
    197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
    198  String data = "xxxx";// 四个字节
    199  for (int i = 1; i <= 1000; i++) {
    200  
    201 Random random = new Random();
    202  int i1 = random.nextInt(100);
    203  boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
    204  TimeUnit.MILLISECONDS.sleep(100);
    205  if (tokens) {
    206  bufferedWriter.write("token pass --- index:" + i1);
    207  System.out.println("token pass --- index:" + i1);
    208  } else {
    209  bufferedWriter.write("token rejuect --- index" + i1);
    210  System.out.println("token rejuect --- index" + i1);
    211  }
    212  
    213 bufferedWriter.newLine();
    214  bufferedWriter.flush();
    215  }
    216  
    217 bufferedWriter.close();
    218  }
    219  
    220 }

    参考:

    http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/

    http://redisdoc.com/string/incr.html

    http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html

    由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

  • 相关阅读:
    用 Flask 来写个轻博客 (31) — 使用 Flask-Admin 实现 FileSystem 管理
    jenkins持续集成:jenkins+SVN
    Linux基础一:Linux的安装及相关配置
    jenkins持续集成:构建多个job同时执行
    jenkins持续集成:定时构建语法
    Nginx详解二十九:基于Nginx的中间件架构设计
    Nginx详解二十八:Nginx架构篇Nginx+Lua的安全waf防火墙
    Nginx详解二十七:Nginx架构篇之安全篇
    Nginx详解二十六:Nginx架构篇之性能优化
    Nginx详解二十五:Nginx架构篇之Nginx常见的问题
  • 原文地址:https://www.cnblogs.com/exceptioneye/p/4783904.html
Copyright © 2011-2022 走看看