zoukankan      html  css  js  c++  java
  • redis-缓存设计-队列(普通队列、优先级队列、延迟队列)

    普通队列

    说明

    利用list 的push 和pop命令

    代码

    public class Charpter06_3 {
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
            //消费消息
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
            //模拟发送10个消息
            for(int i=0;i<10;i++){
                addWork(conn,"{"id":"+i+"}");
            }
        }
    
        /**
         * 入队
         * @param conn
         * @param json
         */
        public static void  addWork(Jedis conn,String json){
           conn.rpush("work:queue",json);
        }
    
        /**
         * 消费
         * @param conn
         */
        public static void  watch_work_queue(Jedis conn){
            while (true){
                //当没有任务阻塞3秒
                List<String> jsons= conn.blpop(3,"work:queue");
                if(jsons==null||jsons.size()<=0){
                    //让出cpu
                    Thread.yield();
                    continue;
                }
                System.out.println(jsons.get(1));
            }
        }
    }

    优先级队列

    说明

    不同优先级 加入到不同的list,消费的时候优先消费优先级高的

    代码

    public class Charpter06_4 {
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
           //假定有10个优先级
            for (int i = 0; i < 10; i++) {
                prioritizations.add(i);
            }
            //倒序
            prioritizations = (TreeSet) prioritizations.descendingSet();
            for (int i = 0; i < 10; i++) {
                addWork(conn, "{"id":" + i + "}", i);
            }
    
            /**
             * 消费消息
             */
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
    
    
        }
    
        private static TreeSet<Integer> prioritizations = new TreeSet();
    
    
        /**
         *
         * @param conn
         * @param json
         * @param prioritization 优先级
         */
        public static void addWork(Jedis conn, String json, Integer prioritization) {
            //不同的优先级放到不同队列
            conn.rpush("work:queue" + prioritization, json);
        }
    
        public static void watch_work_queue(Jedis conn) {
            while (true) {
                //格局优先级 优先消费优先级高的
                for (Integer p :
                        prioritizations) {
                    //当没有任务阻塞3秒
                    String json = conn.rpop("work:queue" + p);
                    if (json == null) {
                        continue;
                    }
                    System.out.println(json);
                    break;
                }
    
            }
        }
    }

    延迟队列

    说明

    如果入队设置了延迟时间,则计算消费时间并放入zset,

    1.开启一个线程根据zset的 zrangeByScore方法获得到了消费时间的消息,并放入 push到list

    2.开启一个线程消费list

    代码

    public class Charpter06_5 {
    
        public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
            //模拟数据
            for (int i = 0; i < 10; i++) {
                addWork(conn, "{"id":" + i + "}", i);
            }
            //开启线程pop数据
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn = new Jedis("127.0.0.1", 6379);
                    watch_work_queue(conn);
                }
            }).start();
            //开启线程 将zset到了时间的放入list
            Thread th = new Thread(new Runnable() {
                @Override
                public void run() {
                    Jedis conn2 = new Jedis("127.0.0.1", 6379);
                    try {
                        watch_delayed_work_push_queue(conn2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            th.start();
            th.join();
            System.out.println("exit");
    
    
        }
    
        /**
         * @param conn
         * @param json
         * @param time 延迟时间  秒
         */
        public static void addWork(Jedis conn, String json, Integer time) {
            if (time > 0) {
                conn.zadd("work:queue:delayed", System.currentTimeMillis() + time * 1000, json);
            } else {
                conn.rpush("work:queue", json);
            }
        }
    
        public static void watch_work_queue(Jedis conn) {
            while (true) {
    
                //当没有任务阻塞3秒
                List<String> jsons = conn.brpop(3,"work:queue");
                if (jsons == null||jsons.size()<=0) {
                    continue;
                }
                System.out.println(jsons.get(1));
            }
    
        }
    
        public static void watch_delayed_work_push_queue(Jedis conn) throws InterruptedException {
            while (true) {
                //当没有任务阻塞3秒 每次取一个
                Set<String> jsons = conn.zrangeByScore("work:queue:delayed", 0, System.currentTimeMillis(), 0, 1);
                if (jsons == null || jsons.size() <= 0) {
                    Thread.sleep(1000);
                    continue;
                }
                String item = jsons.iterator().next();
                conn.rpush("work:queue", item);
                conn.zrem("work:queue:delayed", item);
            }
    
        }
    }
  • 相关阅读:
    STL堆算法性能分析与优化方法(GCC4.4.2 stl_heap.h源代码分析与改进方案)
    POJ 1631 Bridging Singnals
    一个用于读unicode文本的迭代器(iterator)
    常用文本压缩算法及实现(To be finshed!)
    volatile语义及线程安全singleton模式探讨
    C++实现的huffman与canonical huffman的压缩解压缩系统,支持基于单词的压缩解压缩
    linux环境下 C++性能测试工具 gprof + kprof + gprof2dot
    多线程统计多个文件的单词数目C++0x多线程使用示例
    python嵌入C++ boost.python如何在C++中调用含有不定长参数tuple变量和关键字参数dict变量的函数
    boost.python入门教程 python 嵌入c++
  • 原文地址:https://www.cnblogs.com/LQBlog/p/13397685.html
Copyright © 2011-2022 走看看