zoukankan      html  css  js  c++  java
  • redis分布式锁&队列应用

    分布式锁

    1. setnx(set if not exists)

    如果设值成功则证明上锁成功,然后再调用del指令释放。

    // 这里的冒号:就是一个普通的字符,没特别含义,它可以是任意其它字符,不要误解
    > setnx lock:codehole true
    OK
    ... do something critical ...
    > del lock:codehole
    (integer) 1
    

    但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

    1. setnx(set if not exists) 加上过期时间
    > setnx lock:codehole true
    OK
    > expire lock:codehole 5
    ... do something critical ...
    > del lock:codehole
    (integer) 1
    

    如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。

    1. 使用ex nx命令一起执行
    > set lock:codehole true ex 5 nx
    OK
    ... do something critical ...
    > del lock:codehole
    
    1. 删除锁的线程必须是上锁的线程

    为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过期了被服务器自动释放的。
    但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于delifequals这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。

    上锁
    tag = random.nextint()  # 随机数
    if redis.set(key, tag, nx=True, ex=5):
        do_something()
        redis.delifequals(key, tag)  # 假想的 delifequals 指令
        
    
    # delifequals 解锁
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    延时队列

    消息队列

    注意:
    Redis 的消息队列不是专业的消息队列,它没有非常多的高级特性,没有 ack 保证,如果对消息的可靠性有着极致的追求,那么它就不适合使用。

    Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队列,使用lpop 和 rpop来出队列。

    > rpush notify-queue apple banana pear
    (integer) 3
    > llen notify-queue
    (integer) 3
    > lpop notify-queue
    "apple"
    > llen notify-queue
    (integer) 2
    > lpop notify-queue
    "banana"
    > llen notify-queue
    (integer) 1
    > lpop notify-queue
    "pear"
    > llen notify-queue
    (integer) 0
    > lpop notify-queue
    (nil)
    

    阻塞队列

    如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。
    通常我们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就可以了。但是有个小问题,那就是睡眠会导致消息的延迟增大。
    我们可以使用 blpop/brpop,阻塞读。
    阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

    锁冲突处理

    上面我们讲了分布式锁的问题,但是加锁失败没有讲。一般我们有3种策略来处理加锁失败:

    1. 直接抛出异常,通知用户稍后重试
      这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内容,再点击重试,这样就可以起到人工延时的效果。
    2. sleep 一会再重试
      sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁或者队列里消息比较多,sleep 可能并不合适。
    3. 将请求转移至延时队列,过一会再试
      这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。

    延时队列

    延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。

    import java.lang.reflect.Type;
    import java.util.Set;
    import java.util.UUID;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    
    import redis.clients.jedis.Jedis;
    
    public class RedisDelayingQueue<T> {
    
      static class TaskItem<T> {
        public String id;
        public T msg;
      }
    
      // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
      private Type TaskType = new TypeReference<TaskItem<T>>() {
      }.getType();
    
      private Jedis jedis;
      private String queueKey;
    
      public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
      }
    
      public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
      }
    
      public void loop() {
        while (!Thread.interrupted()) {
          // 只取一条
          Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
          if (values.isEmpty()) {
            try {
              Thread.sleep(500); // 歇会继续
            } catch (InterruptedException e) {
              break;
            }
            continue;
          }
          String s = values.iterator().next();
          if (jedis.zrem(queueKey, s) > 0) { // 抢到了
            TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
            this.handleMsg(task.msg);
          }
        }
      }
    
      public void handleMsg(T msg) {
        System.out.println(msg);
      }
    
      public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {
    
          public void run() {
            for (int i = 0; i < 10; i++) {
              queue.delay("codehole" + i);
            }
          }
    
        };
        Thread consumer = new Thread() {
    
          public void run() {
            queue.loop();
          }
    
        };
        producer.start();
        consumer.start();
        try {
          producer.join();
          Thread.sleep(6000);
          consumer.interrupt();
          consumer.join();
        } catch (InterruptedException e) {
        }
      }
    }
    
  • 相关阅读:
    不常用的cmd命令
    js获取宽度
    Marshaling Data with Platform Invoke 概览
    Calling a DLL Function 之三 How to: Implement Callback Functions
    Marshaling Data with Platform Invoke 之四 Marshaling Arrays of Types
    Marshaling Data with Platform Invoke 之一 Platform Invoke Data Types
    Marshaling Data with Platform Invoke 之三 Marshaling Classes, Structures, and Unions(用时查阅)
    Calling a DLL Function 之二 Callback Functions
    WCF 引论
    Marshaling Data with Platform Invoke 之二 Marshaling Strings (用时查阅)
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/10880754.html
Copyright © 2011-2022 走看看