zoukankan      html  css  js  c++  java
  • redis消息队列学习

    0 环境

    • 系统环境: centos7
    • 编辑器: xshell
    • IDE:IDEA

    1 前言

    感觉还是用思维导图总结 更加方便/更直观 无论以后回看 还是内容补充/分享

    reids思维导图图片版

    2 正文

    1 准备

    在这里插入图片描述
    事先准备的CallRedisDemo类以及CallWithJedis接口

    public class CallRedisDemo {
        private JedisPool jedisPool;
    
        // 配置
        public CallRedisDemo() {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            // 连接池最大空闲数
            config.setMaxIdle(300);
            // 最大连接数
            config.setMaxTotal(1000);
            // 连接最大等待时间 若是-1 则无限制
            config.setMaxWaitMillis(200000);
            // 在空闲时检查有效性
            config.setTestOnBorrow(true);
    
            /*
            * GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password
            * 1 redis地址
            * 2 redis端口
            * 3 连接超时时间
            * 4 密码
            * */
            jedisPool = new JedisPool(config, "127.0.0.1", 6379, 20000, "123456");
    
        }
    
        // 执行
        // 执行失败 --> 请求重试(这样语法糖就不能用了 试太多也没意思 估计有问题)
        public void execute(CallWithJedis callWithJedis){
            try (Jedis jedis = jedisPool.getResource()){
                callWithJedis.call(jedis);
            }
        }
    
    
    }
    
    public interface CallWithJedis {
        void call(Jedis jedis);
    }
    

    2 导入依赖

    <dependencies>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.2.0</version>
                <type>jar</type>
                <scope>compile</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.10.3</version>
            </dependency>
    
        </dependencies>
    

    3 定义消息类

    @Data
    @ToString
    public class Message {
        private String id;
        private Object data;
    }
    

    4 消息处理类

    @AllArgsConstructor
    public class DelayMsgsQueue {
        private Jedis jedis;
        // key
        private String queue;
    
        /**
        * @Description: 消息入队 Message类 存在id和数据 将对象序列化为string
         *              打印当前时间 为什么用zadd(ZADD key score member)
         *              因为需要延时 score(当前时间戳+延时时间) member(序列化数字)
        * @Param: data --> 发送消息
        * @return:
        * @Author: 
        * @Date: 
        */
        public void queue(Object data){
            // 构造一个Message
            Message message = new Message();
    
            message.setId(UUID.randomUUID().toString());
            message.setData(data);
    
            try {
                // 将对象序列号
                String s = new ObjectMapper().writeValueAsString(message);
                // 打印日期 以便与延迟时间对比
                System.out.println("消息:" + new Date());
    
                // 消息发布延时6s 需要秒(值)相加 --> score member --> s
                // ZADD key score1 member1 [score2 member2]
                jedis.zadd(queue, System.currentTimeMillis() + 6000, s);
    
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
        * @Description: 消费消息 当不中断时 利用zrangeByScore 每次只获取一个成员
         *                if 消费者score < 提供者的延时时间: 非空判断+休眠+continue
         *                else:
         *                  队列读取 在判断是否可移除
         *                  若可移除(反序列化 转化为对象) 打印消息
        * @Param:
        * @return:
        * @Author: 
        * @Date: 
        */
        public void loop(){
            while (!Thread.interrupted()){
                // ZRANGE key start stop [WITHSCORES]
                // ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
                // 读取0-当前时间戳区间 offset = 0 -> 不跳过任何成员 count = 1 --> 只读取一个成员
                Set<String> strings = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
    
                if (strings.isEmpty()) {
                    // 若消息为空 睡眠600ms 然后在开启
                    try {
                        Thread.sleep(600);
                    } catch (InterruptedException e) {
                        break;
                    }
    
                    continue;
                }
    
                // 队列中有消息 读取
                String next = strings.iterator().next();
    
                // ZREM key member [member ...]
                // 移除成功1个 返回1 假如是n个 返回n
                if (jedis.zrem(queue, next) > 0 && jedis.zrem(queue, next) < 2) {
                    // 拿到 处理 反序列化
                    try {
                        Message message = new ObjectMapper().readValue(next, Message.class);
                        System.out.println("接收的消息:" + new Date() + message);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
        }
    
    }
    

    5 调用类

    public class DelayMsgTest {
        public static void main(String[] args) {
    
            CallRedisDemo redisDemo = new CallRedisDemo();
            redisDemo.execute(jedis -> {
                // 构造消息队列
                DelayMsgsQueue queue = new DelayMsgsQueue(jedis, "hello");
    
                // 提供者
                Thread provider = new Thread() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 6; i++) {
                            // 调用DelayMsgsQueue类中提供者的方法
                            queue.queue("你饿不饿 >>>>>>>> " + i);
                        }
                    }
                };
    
                // 消费者
                Thread consumer = new Thread() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 6; i++) {
                            // 调用DelayMsgsQueue类中消费者的方法
                            queue.loop();
                        }
                    }
                };
    
                // 运行线程
                provider.run();
                consumer.run();
    
                // 等待一会 运行结束 中断消费者线程
                try {
                    Thread.sleep(9000);
                    consumer.interrupted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
            });
    
        }
    }
    

    6 对比打印结果

    作者:以罗伊
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    bzoj4165 矩阵 堆维护多路归并
    bzoj2802 [Poi2012]Warehouse Store 贪心+堆
    bzoj1367 [Baltic2004]sequence 左偏树+贪心
    bzoj3011 [Usaco2012 Dec]Running Away From the Barn 左偏树
    uoj207 共价大爷游长沙 子树信息 LCT + 随机化 + 路径覆盖
    bzoj4764 弹飞大爷 LCT
    bzoj4817 & loj2001 [Sdoi2017]树点涂色 LCT + 线段树
    bzoj5020 & loj2289 [THUWC 2017]在美妙的数学王国中畅游 LCT + 泰勒展开
    bzoj4998 星球联盟 LCT + 并查集
    bzoj3091 城市旅行 LCT + 区间合并
  • 原文地址:https://www.cnblogs.com/my-ordinary/p/12703116.html
Copyright © 2011-2022 走看看