zoukankan      html  css  js  c++  java
  • redis-缓存设计-队列(普通队列、优先级队列、延迟队列)

    普通队列

    说明

    利用list 的push 和pop命令

    代码

    public class Charpter06_3 {
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
            //消费消息
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
            //模拟发送10个消息
            for(int i=0;i<10;i++){
                addWork(conn,"{"id":"+i+"}");
            }
        }
    
        /**
         * 入队
         * @param conn
         * @param json
         */
        public static void  addWork(Jedis conn,String json){
           conn.rpush("work:queue",json);
        }
    
        /**
         * 消费
         * @param conn
         */
        public static void  watch_work_queue(Jedis conn){
            while (true){
                //当没有任务阻塞3秒
                List<String> jsons= conn.blpop(3,"work:queue");
                if(jsons==null||jsons.size()<=0){
                    //让出cpu
                    Thread.yield();
                    continue;
                }
                System.out.println(jsons.get(1));
            }
        }
    }

    优先级队列

    说明

    不同优先级 加入到不同的list,消费的时候优先消费优先级高的

    代码

    public class Charpter06_4 {
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
           //假定有10个优先级
            for (int i = 0; i < 10; i++) {
                prioritizations.add(i);
            }
            //倒序
            prioritizations = (TreeSet) prioritizations.descendingSet();
            for (int i = 0; i < 10; i++) {
                addWork(conn, "{"id":" + i + "}", i);
            }
    
            /**
             * 消费消息
             */
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
    
    
        }
    
        private static TreeSet<Integer> prioritizations = new TreeSet();
    
    
        /**
         *
         * @param conn
         * @param json
         * @param prioritization 优先级
         */
        public static void addWork(Jedis conn, String json, Integer prioritization) {
            //不同的优先级放到不同队列
            conn.rpush("work:queue" + prioritization, json);
        }
    
        public static void watch_work_queue(Jedis conn) {
            while (true) {
                //格局优先级 优先消费优先级高的
                for (Integer p :
                        prioritizations) {
                    //当没有任务阻塞3秒
                    String json = conn.rpop("work:queue" + p);
                    if (json == null) {
                        continue;
                    }
                    System.out.println(json);
                    break;
                }
    
            }
        }
    }

    延迟队列

    说明

    如果入队设置了延迟时间,则计算消费时间并放入zset,

    1.开启一个线程根据zset的 zrangeByScore方法获得到了消费时间的消息,并放入 push到list

    2.开启一个线程消费list

    代码

    public class Charpter06_5 {
    
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
            //模拟数据
            for (int i = 0; i < 10; i++) {
                addWork(conn, "{"id":" + i + "}", i);
            }
            //开启线程pop数据
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
            //开启线程 将zset到了时间的放入list
            Thread th = new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn2 = new Jedis("127.0.0.1", 6379);
                    try {
                        watch_delayed_work_push_queue(conn2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            th.start();
            th.join();
            System.out.println("exit");
    
    
        }
    
        /**
         * @param conn
         * @param json
         * @param time 延迟时间  秒
         */
        public static void addWork(Jedis conn, String json, Integer time) {
            if (time > 0) {
                conn.zadd("work:queue:delayed", System.currentTimeMillis() + time * 1000, json);
            } else {
                conn.rpush("work:queue", json);
            }
        }
    
        public static void watch_work_queue(Jedis conn) {
            while (true) {
    
                //当没有任务阻塞3秒
                List<String> jsons = conn.brpop(3,"work:queue");
                if (jsons == null||jsons.size()<=0) {
                    continue;
                }
                System.out.println(jsons.get(1));
            }
    
        }
    
        public static void watch_delayed_work_push_queue(Jedis conn) throws InterruptedException {
            while (true) {
                //当没有任务阻塞3秒 每次取一个
                Set<String> jsons = conn.zrangeByScore("work:queue:delayed", 0, System.currentTimeMillis(), 0, 1);
                if (jsons == null || jsons.size() <= 0) {
                    Thread.sleep(1000);
                    continue;
                }
                String item = jsons.iterator().next();
                conn.rpush("work:queue", item);
                conn.zrem("work:queue:delayed", item);
            }
    
        }
    }
  • 相关阅读:
    dell r720服务器raid5安装centos6.5系统
    centos6.5报错:checking filesystems failed问题处理
    配置mysql5.5主从复制、半同步复制、主主复制
    vmware下centos克隆功能对网络的设置
    mysql报错问题解决MySQL server PID file could not be found!
    使用第三方工具Xtrabackup进行MySQL备份
    mysql数据库基于LVM快照的备份
    mysql的日志及利用mysqldump备份及还原
    centos6.5下java和tomcat环境部署
    通达OA数据库优化方案之_历史数据清理
  • 原文地址:https://www.cnblogs.com/LQBlog/p/13397685.html
Copyright © 2011-2022 走看看