zoukankan      html  css  js  c++  java
  • 【并发】9、借助redis 实现生产消费,消息订阅发布模式队列

    这个就是一个消息可以被多次消费的范例了

    其实这个实现的方式可以参考我之前的设计模式,观察者模式

    https://www.cnblogs.com/cutter-point/p/5249780.html

    不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象

    不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些不可见的字符应该是被截掉了

    消息发布者

    package queue.redisQueue;
    
    import queue.fqueue.vo.TempVo;
    import redis.clients.jedis.Jedis;
    
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectOutputStream;
    import java.util.UUID;
    
    /**
     * @ProjectName: cutter-point
     * @Package: queue.redisQueue
     * @ClassName: RedisQueueProducter3
     * @Author: xiaof
     * @Description: 订阅,发布模式 发布消息
     * @Date: 2019/6/12 16:47
     * @Version: 1.0
     */
    public class RedisQueueProducter3 implements Runnable {
    
        private Jedis jedis;
        private String queueKey;
    
        public RedisQueueProducter3(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        public void putMessage() {
            try {
                Thread.sleep((long) (Math.random() * 1000));
    
                //不存在则创建,存在则直接插入
                //向redis队列中存放数据
                //生成数据
                TempVo tempVo = new TempVo();
                tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());
    
                try {
                    int i = 0;
                    while(i < 10) {
                        //反馈订阅的数量
                        long num = jedis.publish(queueKey.getBytes(), tempVo.toString().getBytes());
                        if(num > 0) {
                            System.out.println("成功!num:" + num);
                            break;
                        }
                        ++i;
                    }
                } catch (Exception e) {
                    System.out.println("失败!");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
    
            while(true) {
                putMessage();
    
            }
    
        }
    }

    消息消费者

    package queue.redisQueue;
    
    import queue.fqueue.vo.EventVo;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    import redis.clients.util.SafeEncoder;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.UnsupportedEncodingException;
    
    /**
     * @ProjectName: cutter-point
     * @Package: queue.redisQueue
     * @ClassName: RedisQueueConsume3
     * @Author: xiaof
     * @Description: 发布订阅消息,订阅线程
     * @Date: 2019/6/12 16:53
     * @Version: 1.0
     */
    public class RedisQueueConsume3 implements Runnable {
    
        private Jedis jedis;
        private String queueKey;
    
        class myJedisPubSub extends JedisPubSub {
            /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
             * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
             * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
             * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
             **/
            @Override
            public void onMessage(String channel, String message) {
                System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
                //接收到exit消息后退出
                System.out.println(message);
            }
        }
    
        public RedisQueueConsume3(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        public void consumerMessage() {
            jedis.subscribe(new myJedisPubSub(), queueKey);
        }
    
        @Override
        public void run() {
            while (true) {
                consumerMessage();
            }
        }
    }

    测试代码:

    @Test
        public void test4() throws InterruptedException {
    
            //读写取数据
            for(int i = 0; i < 2; ++i) {
                System.out.println("输出测试" + i);
                RedisQueueProducter3 producter = new RedisQueueProducter3(jedisPool.getResource(), "xiaof");
                Thread t = new Thread(producter);
                t.start();
            }
    
            while(true) {
                Thread.sleep(1000);
            }
        }
    
        @Test
        public void test5() throws InterruptedException {
    
            //读写取数据
            for(int i = 0; i < 5; ++i) {
                System.out.println("输出测试" + i);
                //切记一定要重新获取Resource,不然无法并发操作
                RedisQueueConsume3 fqueueConsume = new RedisQueueConsume3(jedisPool.getResource(), "xiaof");
                Thread t = new Thread(fqueueConsume);
                t.setDaemon(true);
                t.start();
            }
    
            while(true) {
                Thread.sleep(1000);
            }
        }

    效果展示

    同一消息被多个订阅者同步消费

  • 相关阅读:
    IEEE754二进制浮点数算术标准
    符号三角形代码勘误
    最近点对问题
    【Unsolved】线性时间选择算法的复杂度证明
    解决mosh: Nothing received from server on UDP port 60001 环境: centos7.1
    半导体测试基础
    python进程------multiprocessing包
    python线程------queue、生产者和消费者模式
    pyhon——线程同步条件(event)
    os 模块
  • 原文地址:https://www.cnblogs.com/cutter-point/p/11011122.html
Copyright © 2011-2022 走看看