zoukankan      html  css  js  c++  java
  • 你所不知道的库存超限做法

    在互联网企业中,限购的做法,多种多样,有的别出心裁,有的因循守旧,但是种种做法皆想达到的目的,无外乎几种,商品卖的完,系统抗的住,库存不超限。虽然短短数语,却有着说不完,道不尽,轻者如释重负,重者涕泪横流的架构体验。 但是,在实际开发过程中,库存超限,作为其中最核心的一员,到底该怎么做,如何做才会是最合适的呢?

    今天这篇文章,我将会展示给大家库存限购的五种常见的做法,并对其利弊一一探讨,由于这五种做法,有的在设计之初当做提案被否定掉的,有的在线上跑着,但是在没有任何单元测试和压测情况下,这几种超限控制的做法也许是不符合你的业务的,所以不建议直接用于生产环境。我这里权当是做抛砖引玉,期待大家更好的做法。

    工欲善其事必先利其器,在这里,我们将利用一台测试环境的redis服务器当做库存超限控制的主战场,先设置库存量为10进去,然后根据此库存量,一一展开,设置库存代码如下:

       1:  def set_storage():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      current_storage = conn.get(key)
       5:      if current_storage == None:
       6:          conn.set(key, 10)

    为了方便性,我这里使用了python语言来书写逻辑,但是今天我们只是讲解思想,语言这类的,大家可以自己尝试转一下。

    上面就是我们的设置库存到redis中的做法,很简单,就是在redis中设置一个storage_seckill的库存key,然后里面放上库存量10.

    超限限制做法一:先获取当前库存值进行比对,然后进行扣减操作

       1:  def storage_scenario_one():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      current_storage = conn.get(key)
       5:      current_storage_int = int(current_storage)
       6:      if current_storage_int<=0 :
       7:          return 0
       8:      result = conn.decr(key)
       9:      return result

    首先,我们拿到当前的库存值,然后看看是否已经扣减到了零,如果扣减到了零,则不继续扣减,直接返回;如果库存还有,则利用decr原子操作进行扣减,同时返回扣减后的库存值。

    此种做法在小并发量下访问,问题不大;在对库存量控制不严格的业务中,问题也不大。但是如果并发量比较大一些,同时业务要求严格控制库存,那么此种做法是非常不合适的,原因在于,在高并发情况下,get命令,decr命令,都是分开发给redis的,这样会导致比对的时候,很容易出现限制不住的情况,也就是会造成第六行的比对失效。

    设想如下一个场景,AB两个请求进来,A获取的库存值为1,B获取的库存值为1,然后两个请求都被发到redis中进行扣减操作,然后这种场景下,A最后得到的库存值为0;但是B最后得到的库存值为-1,超限。

    所以此种场景,由于在高并发下,get和decr操作不是一组原子性操作,会引发超限问题,被直接pass。

    超限限制做法二:先扣减库存,然后比对,最后根据情况回滚

       1:  def storage_scenario_two():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      current = conn.decr(key)
       5:      if current>=0:
       6:          return current
       7:      else:
       8:          #回滚库存
       9:          conn.incr(key)
      10:          return 0

    首先,请求进来,直接对库存值进行扣减,然后得到当前的库存值;然后,对此库存值进行校验,如果库存还有,则返回库存值,如果库存没有了,则回滚库存,以便于防止负库存量的存在。

    此做法,相比做法一,要稍微可靠一些,由于redis的decr操作直接返回真实的库存值,所以每个请求进来,只要执行了decr操作,拿到的肯定是当前最准确的库存值。然后进行比对,如果库存值大于等于零,返回当前库存值,如果小于零,则将库存进行回滚。

    此种做法,最大的一个问题就是,如果大批量的并发请求过来,redis承受的写操作的量,相对于方法一来说,是加倍的,因为回滚库存的存在导致的。所以这种情况下,高并发量进来,极有可能将redis的写操作打出极限值,然后会出现很多redis写失败的错误警告。 另一个问题和做法一是一样的,就是第五行的比对在高并发下,也是限不住的,具体的压测结果请看我的这篇stackoverflow的提问:Will redis incr command can be limitation to specific number?

    所以此种场景,虽然在高并发情况下避免了redis命令的分开操作,但是却大大增加了redis的写并发量,被pass。

    超限限制做法三:先递减库存,然后通过整数溢出控制,最后根据情况回滚

       1:  def storage_scenario_three():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      current = conn.decr(key)
       5:      #通过整数控制溢出的做法
       6:      if storage_overflow_checker(current):
       7:          return current
       8:      else:
       9:          #回滚库存
      10:          conn.incr(key)
      11:          return 0
      12:   
      13:  def storage_overflow_checker(current_storage):
      14:      #如果当前库存未被递减到0,则check_number为int类型,isinstance方法检测结果为true
      15:      #如果当前库存已被递减到负数,则check_number为long类型,isinstance方法检测结果为false
      16:      check_number = sys.maxint - current_storage
      17:      check_result = isinstance(check_number,int)
      18:      return check_result

    说明一下,当前库存,如果为负数,则利用python的isinstance(check_number,int)检测的时候,check_result返回是false;如果为非负数,则检测的时候,check_result返回的是true,上面的storage_overflow_checker的做法,和下面的C#语言的做法是一样的,利用C#语言描述,大家可能对上面的代码更清晰一些:

       1:      /**
       2:       * 通过让Integer溢出的方式来控制数量超卖(递减导致溢出)
       3:       * @param current
       4:       * @return
       5:       */
       6:      public boolean StorageOverFillChecker(long current) {
       7:          try {
       8:              //当前数值的结果计算
       9:              Long value = Integer.MAX_VALUE - current;
      10:              //尝试转变为Inter类型,如果超卖,则转换会出错;如果未超卖,则转换不会出错
      11:              Integer.parseInt(value.toString());
      12:          } catch (Exception ex) {
      13:              //值溢出
      14:              return true;
      15:          }
      16:   
      17:          return false;
      18:      }

    可以看出,此种做法和方法二很相似,只是比对部分由,直接和零比对,变成了通过检测integer是否溢出的方式来进行。这样就彻底解决了高并发情况下,直接和零比对,限制不住的问题了。

    虽然此种做法,相对于做法二说来,要靠谱很多,但是仍然解决不了在高并发情况下,redis写并发量加倍的问题,极有可能某个促销活动,在开始的那一刻,直接将redis的写操作打出问题来。

    超限限制做法四:共享锁

       1:  def storage_scenario_four():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      key_lock = key + "_lock"
       5:      if conn.setnx(key_lock, "1"):
       6:          #客户端挂掉,设置过期时间,防止其不释放锁
       7:          conn.pexpire(key_lock, 5)
       8:          current_storage = conn.get(key)
       9:          if int(current_storage)<=0 :
      10:              return 0
      11:          result = conn.decr(key)
      12:          #客户端正常,删除共享锁,提高性能
      13:          conn.delete(key_lock)
      14:          return result
      15:      else :
      16:          return "someone in it"

    前面三种,由于在高并发下都有问题,所以本做法,主要是通过setnx设置共享锁,然后请求到锁的用户请求,正常进行库存扣减操作;请求不到锁的用户请求,则直接提示有其他人在操作库存。

    由于setnx的特殊性,当客户端挂掉的时候,是不会释放这个锁的,所以当请求进来的时候,首先通过pexpire命令,为锁设置过期时间,防止死锁不释放。然后执行正常的库存扣减操作,当操作完毕,删掉共享锁,可以极大的提高程序性能,否则只能等待锁慢慢过期了。

    此种做法相对于上面的三种操作,通过采用共享锁,牺牲了部分性能,来规避了高并发的问题,比较推荐,但是由于redis操作命令还是很多,并且每条都要发送到redis端执行,所以在网络传输上,耗费的时间开销是不小的。这是后面需要着力优化的方向。

    看了上面四种做法,都不是很完美,其中最大的问题在于,高并发情况下,多条redis命令分开操作库存,极容易发生库存限不住的问题;同时,由于加了rollback库存操作,极容易由于redis写命令的操作数加倍导致压垮redis的风险。加了锁,虽然牺牲了部分性能,规避了高并发问题,但是redis命令操作量过多。

    其实我上面一直在强调高并发,高并发。上面的四个场景,只有在高并发的情况下,才会出现问题,如果你的用户请求量没有那么多,那么采用上面四种方式之一,也不是不可以。但是如何才能知道采用起来没问题呢?其实最简单的一个方式,就是在你们自己的集群机器上,模拟活动的真实用户量,进行压测,看看会不会超限就行了,不超限的话,上面四种做法完全满足需求。

    那么,就没有比较好一些的解决方案了吗?

    也不是,虽然解决这个问题,没有绝对好用的银弹,但是有相对好用的大蒜和圣水。下面的讲解,会涉及到Redisson的Redlock的源码实现,当然也会涉及一点lua方面的知识,还请提前预备一下。

    偶然在研究分布式锁的时候,尝试翻阅过Redisson的Redlock的实现,并对其底层的实现方式有所记录,我们先来看看其加锁过程的源码实现:

    redlocklockinner

    从上面的方法中,我们可以看到,分布式锁的上锁过程,是首先判断一个key存不存在,如果不存在,则设置这个key,然后pexpire设置一个过期时间,来防止客户端访问的时候,挂掉了后,不释放锁的问题。为什么这段lua代码就能实现分布式锁的核心呢? 原因就是,这段代码放到一个lua脚本中,那么这段lua脚本就是一个原子性的操作。redis在执行这段lua脚本的过程中,不会掺杂任何其他的命令。所以从根本上避免了并发操作命令的问题。

    我们都知道,一个key如果设置了过期时间,key过期后,redis是不会删掉这个key的,只有用户访问才会删除掉这个key,所以,当使用分布式锁的时候,如果设置的pexpire过期时间为5ms,那么一秒钟只能处理200个并发,性能非常低。如何解决这种性能问题呢?来看来解锁的操作:

    redlockunlockinner

    从上面解锁的方法中,我们可以看到,如果这个锁用完了之后,Redisson的做法是是直接删除掉的。这样可以提高不少的性能。(源码参阅,属于我自己的理解,如有谬误,还请指教)

    那么按照上面这种设计思路,新的超限做法就出来了。

    超限做法五:基于lua的共享锁

       1:  def storage_scenario_five():
       2:      conn = redis_conn()
       3:      key = "storage_seckill"
       4:      key_lock = key + "_lock"
       5:      key_val = "get_the_key"
       6:      lua = """
       7:              local key    = KEYS[1]
       8:              local expire = KEYS[2]
       9:              local value  = KEYS[3]
      10:   
      11:              local result = redis.call('setnx',key,value)
      12:              if result == 1 then
      13:                 redis.call('pexpire', key, expire)
      14:              end
      15:              return result
      16:            """
      17:      locked = conn.eval(lua, 3, key_lock, 5, key_val)
      18:      print (locked == 1)
      19:      if locked == 1:
      20:          val = storage_scenario_one()
      21:          print("val:"+str(val))
      22:          #删掉共享key,用以提高性能, 否则只能默默的等其过期
      23:          conn.delete(key_lock)
      24:          return val
      25:      else:
      26:          return "someone in it"

    这种做法,其实是做法四的衍生优化版本,优化的地方在于,将多条redis操作命令多次发送,改成了将多条redis操作命令放在了一个原子性操作事务中一次性执行完毕,省去了很多的网络请求。如果可以,其实你也可以将业务逻辑糅合到上面的lua代码中,这样一来,性能当然会更好。

    上面这种做法,如果 storage_scenario_one()这种操作是直接操作的mysql库存,则非常推荐这种做法,但是如果storage_scenario_one()这种操作直接操作的redis中的虚拟库存,则不是很推荐这种做法,不如直接用限流操作。

    超限做法六: All In Lua

       1:  def storage_scenario_six():
       2:      conn = redis_conn()
       3:      lua = """
       4:              local storage = redis.call('get','storage_seckill')
       5:              if  storage ~= false then
       6:                  if tonumber(storage) > 0 then
       7:                      return redis.call('decr','storage_seckill')
       8:                  else
       9:                      return 'storage is zero now, can't perform decr action'
      10:                  end
      11:              else
      12:                  return redis.call('set','storage_seckill',10)
      13:              end
      14:            """
      15:      result = conn.eval(lua,0)
      16:      print(result)

    此种做法是当前最好的做法,所有的库存扣减操作都放在lua脚本中进行,形成一个原子性操作,redis在执行上面的lua脚本的时候,是不会掺杂任何其他的执行命令的。所以这样从根本上避免了高并发下,多条命令执行带来的问题。而且上面的redis命令执行,都直接在redis服务器上,省去了网络传输时间,也没有共享锁的限制,从性能上而言,是最好的。但是,业务逻辑的lua化,相对而言是比较麻烦的,所以对于追求极限库存控制的业务,可以考虑这种做法。

    好了,这就是我今天为大家带来的六种库存超限的做法,每种做法都有自己的优缺点,好使的限不住,限的住的性能不行,性能好的又需要引入lua,真心不知道如何选择了。

    声明:上面六种库存超限做法,有些属于本人的推理,线上并未实际用过,如果你贸然使用而未经过压测,由此造成的损失,找老板去讨论吧。

    参考资源:

    redis lua can't work properly due to wrong type comparison

    《深入理解redis》

    Edit:2018年2月9日11:19:35

    修改了第五种方法中的说明部分。

  • 相关阅读:
    .Net中获取打印机的相关信息
    如何在windows server 2008上配置NLB群集
    jvm分析内存泄露
    JVM调优
    线程池工作队列饱和策略
    线程池的处理流程:
    Java的Executor框架和线程池实现原理(转)
    线程池实现原理详解:
    futer.get()(如果任务没执行完将等待)
    sql注入
  • 原文地址:https://www.cnblogs.com/scy251147/p/8371636.html
Copyright © 2011-2022 走看看