zoukankan      html  css  js  c++  java
  • Redis(二)延迟队列

    1.目录 

    • 延迟队列
    • 进一步优化

    2.延迟队列

    package com.redis;
    
    import java.lang.reflect.Type;
    import java.util.Set;
    import java.util.UUID;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    import redis.clients.jedis.Jedis;
    
    public class RedisDelayingQueue<T> {
        static class TaskItem<T> {
            public String id;
            public T msg;
        }
    
        // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
        private Type TaskType = new TypeReference<TaskItem<T>>() {
        }.getType();
        private Jedis jedis;
        private String queueKey;
    
        public RedisDelayingQueue(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        public void delay(T msg) {
            TaskItem<T> task = new TaskItem<T>();
            task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
            task.msg = msg;
            String s = JSON.toJSONString(task); // fastjson 序列化
            jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
        }
    
        public void loop() {
            while (!Thread.interrupted()) {
    
                // 只取一条
                Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis());
                if (values.isEmpty()) {
                    try {
                        Thread.sleep(500); // 歇会继续
                    } catch (InterruptedException e) {
                        break;
                    }
                    continue;
                }
                String s = values.iterator().next();
                if (jedis.zrem(queueKey, s) > 0) { // 抢到了
                    TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
                    this.handleMsg(task.msg);
                }
            }
        }
    
        public void handleMsg(T msg) {
            System.out.println(msg);
        }
    
    
        public static void main(String[] args) {
            Jedis jedis = new Jedis();
            RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    
            Thread producer = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    queue.delay("codehole" + i);
                }
            });
    
            Thread consumer = new Thread(() -> queue.loop());
            producer.start();
            consumer.start();
            try {
                producer.join();
                Thread.sleep(6000);
                consumer.interrupt();
                consumer.join();
            } catch (InterruptedException e) {
            }
        }
    }

    3.进一步优化

    上面的算法中同一个任务可能会被多个进程取到之后再使用zrem进行争抢,那 些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用lua scripting来 优化一下这个逻辑,将zrangebyscore和zrem一同挪到服务器端进行原子化操 作,这样多个进程之间争抢任务时就不会出现这种浪费了
     

    搬砖的。。。。。 

  • 相关阅读:
    2021 6 3
    2021 5月 读书笔记
    2021 6 1
    第十三周 2021.05.30
    spring security permitAll不生效
    springboot配置jpa提示Unable to resolve name [mysql] as strategy
    element el-form-item el-input宽度设置
    vue+element表格中使用render函数(if判断处理)
    elementui移除tab报Avoided redundant navigation to current location: ***
    el-dropdown-item添加@click不生效
  • 原文地址:https://www.cnblogs.com/ql211lin/p/10444312.html
Copyright © 2011-2022 走看看