zoukankan      html  css  js  c++  java
  • redis分布式锁&队列应用

    分布式锁

    1. setnx(set if not exists)

    如果设值成功则证明上锁成功,然后再调用del指令释放。

    // 这里的冒号:就是一个普通的字符,没特别含义,它可以是任意其它字符,不要误解
    > setnx lock:codehole true
    OK
    ... do something critical ...
    > del lock:codehole
    (integer) 1
    

    但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

    1. setnx(set if not exists) 加上过期时间
    > setnx lock:codehole true
    OK
    > expire lock:codehole 5
    ... do something critical ...
    > del lock:codehole
    (integer) 1
    

    如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。

    1. 使用ex nx命令一起执行
    > set lock:codehole true ex 5 nx
    OK
    ... do something critical ...
    > del lock:codehole
    
    1. 删除锁的线程必须是上锁的线程

    为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过期了被服务器自动释放的。
    但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于delifequals这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。

    上锁
    tag = random.nextint()  # 随机数
    if redis.set(key, tag, nx=True, ex=5):
        do_something()
        redis.delifequals(key, tag)  # 假想的 delifequals 指令
        
    
    # delifequals 解锁
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    延时队列

    消息队列

    注意:
    Redis 的消息队列不是专业的消息队列,它没有非常多的高级特性,没有 ack 保证,如果对消息的可靠性有着极致的追求,那么它就不适合使用。

    Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队列,使用lpop 和 rpop来出队列。

    > rpush notify-queue apple banana pear
    (integer) 3
    > llen notify-queue
    (integer) 3
    > lpop notify-queue
    "apple"
    > llen notify-queue
    (integer) 2
    > lpop notify-queue
    "banana"
    > llen notify-queue
    (integer) 1
    > lpop notify-queue
    "pear"
    > llen notify-queue
    (integer) 0
    > lpop notify-queue
    (nil)
    

    阻塞队列

    如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。
    通常我们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就可以了。但是有个小问题,那就是睡眠会导致消息的延迟增大。
    我们可以使用 blpop/brpop,阻塞读。
    阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

    锁冲突处理

    上面我们讲了分布式锁的问题,但是加锁失败没有讲。一般我们有3种策略来处理加锁失败:

    1. 直接抛出异常,通知用户稍后重试
      这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内容,再点击重试,这样就可以起到人工延时的效果。
    2. sleep 一会再重试
      sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁或者队列里消息比较多,sleep 可能并不合适。
    3. 将请求转移至延时队列,过一会再试
      这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。

    延时队列

    延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。

    import java.lang.reflect.Type;
    import java.util.Set;
    import java.util.UUID;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    
    import redis.clients.jedis.Jedis;
    
    public class RedisDelayingQueue<T> {
    
      static class TaskItem<T> {
        public String id;
        public T msg;
      }
    
      // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
      private Type TaskType = new TypeReference<TaskItem<T>>() {
      }.getType();
    
      private Jedis jedis;
      private String queueKey;
    
      public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
      }
    
      public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
      }
    
      public void loop() {
        while (!Thread.interrupted()) {
          // 只取一条
          Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
          if (values.isEmpty()) {
            try {
              Thread.sleep(500); // 歇会继续
            } catch (InterruptedException e) {
              break;
            }
            continue;
          }
          String s = values.iterator().next();
          if (jedis.zrem(queueKey, s) > 0) { // 抢到了
            TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
            this.handleMsg(task.msg);
          }
        }
      }
    
      public void handleMsg(T msg) {
        System.out.println(msg);
      }
    
      public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {
    
          public void run() {
            for (int i = 0; i < 10; i++) {
              queue.delay("codehole" + i);
            }
          }
    
        };
        Thread consumer = new Thread() {
    
          public void run() {
            queue.loop();
          }
    
        };
        producer.start();
        consumer.start();
        try {
          producer.join();
          Thread.sleep(6000);
          consumer.interrupt();
          consumer.join();
        } catch (InterruptedException e) {
        }
      }
    }
    
  • 相关阅读:
    TCP/IP的基本概念知识
    Mysql查询今天、昨天、7天、近30天、本月、上一月数据
    PHP OOP面向对象部分方法归总(代码实例子)
    PHP 变量
    PHP超级全局变量、魔术变量和魔术函数
    PHP编程效率的20个要点
    MemCache超详细解读
    CodeForces 652E Pursuit For Artifacts 边双连通分量
    HDU 2460 Network 边双连通分量 缩点
    HDU 3594 Cactus 有向仙人掌图判定
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/10880754.html
Copyright © 2011-2022 走看看