zoukankan      html  css  js  c++  java
  • Redis---分布式锁

    1. 加锁

      set key value EX/PX seconds NX

      key : 数据库键

      value: 值

      EX: 秒

      PX: 毫秒

      seconds : 时间数

      NX: if key not exist

    2. 解锁

      加锁时保存 value 值, 解锁时将保存的值与从数据库获取的值进行比较, 判断加锁和解锁的是否为同一个线程

    3. 锁冲突处理

    3.1 重试机制

    1.  sleep 一段时间, 然后重试

    3.2 延时队列

      延时队列可以通过 Redis 的 zset(有序列表) 来实现。

      我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。

      因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。

      每次只取一个任务, 取到任务后在队列中删除任务,然后 执行任务

    public class RedisDelayingQueue<T> {
    
      static class TaskItem<T> {
        public String id;
        public T msg;
      }
    
      // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
      private Type TaskType = new TypeReference<TaskItem<T>>() {}.getType();
    
      private Jedis jedis;
      private String queueKey;
    
      public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
      }
    
      public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
      }
    
      public void loop() {
        while (!Thread.interrupted()) {
          // 只取一条
          Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
          if (values.isEmpty()) {
            try {
              Thread.sleep(500); // 歇会继续
            } catch (InterruptedException e) {
              break;
            }
    }
            continue;
          }
          String s = values.iterator().next();
          if (jedis.zrem(queueKey, s) > 0) { // 抢到了
            TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
            this.handleMsg(task.msg);
          }
        }
      }
    
      public void handleMsg(T msg) {
        System.out.println(msg);
      }
    
      public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {
    
          public void run() {
            for (int i = 0; i < 10; i++) {
              queue.delay("codehole" + i);
            }
          }
    
        };
        Thread consumer = new Thread() {
    
          public void run() {
            queue.loop();
          }
    
        };
        producer.start();
        consumer.start();
        try {
          producer.join();
          Thread.sleep(6000);
          consumer.interrupt();
          consumer.join();
        } catch (InterruptedException e) {
        }
      }
    }

     

  • 相关阅读:
    JavaScript 核心学习——继承
    吴裕雄--天生自然 PHP开发学习:PhpStorm的配置与安装
    吴裕雄--天生自然 HADOOP大数据分布式处理:安装WinSCP
    吴裕雄--天生自然 HADOOP大数据分布式处理:安装XShell
    云栖专辑 | 阿里开发者们的第14个感悟:技术拓宽价值边界
    玩转大数据系列之四:搜索服务
    RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)
    RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查
    RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
    RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想
  • 原文地址:https://www.cnblogs.com/virgosnail/p/9547055.html
Copyright © 2011-2022 走看看