zoukankan      html  css  js  c++  java
  • Redis(四)——消息队列

    Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。

    性质:由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

    所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

    (列表常用命令)

    RPUSH : RPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表右端

    LPUSH : LPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表左端

    RPOP : RPOP key-name----------移除并返回列表最右端元素

    LPOP :LPOP key-name----------移除并返回列表最左端元素

    LINDEX : LINDEX key-name offset --------------返回列表中偏移量为offset的元素

    LRANGE : LRANGE key-name start end -------------返回列表中偏移量从start到end范围内的元素

    LTRIM : LTRIM key-name start end ----------------对列表进行修剪,只保留偏移量从start到end范围内的元素

    其中简单示例如下:
    首先连接redis服务器,其中我应用了Jedispool,代码如下:

    package redis;
    
    import java.io.IOException;
    import java.util.Properties;
    
    import org.springframework.core.io.support.PropertiesLoaderUtils;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    /**
     * redis单例连接池
     * @author admin
     *
     */
    public class RedisPool {
    
        private static  int TIMEOUT = 1000*30;
        private static  int MAXTOTAL = 1024;
        private static  int MAXIDLE = 100;
        private static  String REDISIP = "bei1";
        private static  int PORT = 6379;
        private static  String PASSWORD ="default";
    
        static {
            try {
                Properties prop = PropertiesLoaderUtils.loadAllProperties("redis.properties");
                TIMEOUT = Integer.parseInt(prop.getProperty("TIMEOUT","300000"));
                MAXTOTAL = Integer.parseInt(prop.getProperty("MAXTOTAL","1024"));
                MAXIDLE = Integer.parseInt(prop.getProperty("MAXIDLE","100"));
                REDISIP = prop.getProperty("REDISIP","127.0.0.1");
                PORT = Integer.parseInt(prop.getProperty("PORT","6379"));
                PASSWORD = prop.getProperty("PASSWORD","default");
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        private static JedisPool[] pool = new JedisPool[10];
    
        private  RedisPool() {}
    
        private static JedisPool getPool(int database) {
            if(database>10) {
                return null;
            }
            if(pool[database] == null) {
                JedisPoolConfig config = new JedisPoolConfig();
                config.setMaxTotal(MAXTOTAL);
                config.setMaxIdle(MAXIDLE);
                config.setMaxWaitMillis(TIMEOUT);
                config.setTestOnBorrow(true);
                pool[database] = new JedisPool(config,REDISIP,PORT,TIMEOUT,PASSWORD,database);
            }
            return pool[database];
        }
        //单例获取redis连接资源
        public static Jedis getResource(int database) {
            if(database>10) {
                return null;
            }
            Jedis jedis = null;
            if(pool[database] == null) {
                synchronized(RedisPool.class) {
                    try {
                        if(pool[database] == null) {
                            pool[database] = getPool(database);
                            try {
                                if (pool[database] != null) {
                                    jedis = pool[database].getResource();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }else {
                jedis = pool[database].getResource();
            }
            return jedis;
        }
    
    }
    
    

    定义一个生产者,代码:

    package RedisMq;
    
    import com.sun.deploy.util.StringUtils;
    import redis.RedisPool;
    import redis.clients.jedis.Jedis;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * <p>  </p>
     *
     * @author ly
     * @since 2019/1/5
     */
    public class Producer extends Thread{
    
        public static final String MESSAGE_KEY = "queue";
        private Jedis jedis;
        private String produceName;
        private volatile int count;
    
        public Producer(String name){
            this.produceName = name;
            init();
        }
        private void init(){
            jedis = RedisPool.getResource(1);
    
        }
        public void putMessage(String message) {
            Long size = jedis.lpush(MESSAGE_KEY, message);
            System.out.println(produceName + ": 当前未被处理消息条数为:" + size);
            count++;
        }
    
        public int getCount() {
            return count;
        }
        @Override
        public void run() {
            try {
                while (true) {
                    putMessage("hello world");
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Producer producer = new Producer("myProducer");
            producer.start();
    
            for (; ; ) {
                System.out.println("main : 已存储消息条数:" + producer.getCount());
                TimeUnit.SECONDS.sleep(10);
            }
        }
    }
    

    再定义一个消费者

    package RedisMq;
    
    import redis.RedisPool;
    import redis.clients.jedis.Jedis;
    /**
     * <p>  </p>
     *
     * @author ly
     * @since 2019/1/7
     */
    
    
        /**
         * 消息消费者
         * @author yamikaze
         */
        public class Customer extends Thread{
    
            private String customerName;
            private volatile int count;
            private Jedis jedis;
    
            public Customer(String name) {
                this.customerName = name;
                init();
            }
    
            private void init() {
                jedis = RedisPool.getResource(1);
            }
    
            public void processMessage() {
                String message = jedis.rpop(Producer.MESSAGE_KEY);
                if(message != null) {
                    count++;
                    handle(message);
                }
            }
    
            public void handle(String message) {
                System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
            }
    
            @Override
            public void run() {
                while (true) {
                    processMessage();
                }
            }
    
            public static void main(String[] args) {
                Customer customer = new Customer("小花");
                customer.start();
            }
        }
    
    

    运行后 生产者和消费者控制台信息分别如下:

    Redis 发布与订阅

    redis 支持消息队列。发布订阅即是一种消息通信模式:发送者发送消息,订阅者订阅消息。

    redis 客户端可以订阅任意数量的频道

    (一)发布订阅
    使用 publish 指令,格式为 publish channel message

    127.0.0.1:6379> publish fruit "apple"
    (integer) 0
     
    

    该返回值为0,说明没有人订阅

    (二)订阅消息
    使用subscribe指令接受消息,格式为 subscribe channel

    127.0.0.1:6379> subscribe fruit
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "fruit"
    3) (integer) 1
    
    

    可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。
    回复信息分为3类:
    1 如果为subscribe,第二个值表示订阅的频道,如上述代码

    2 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息,如图:

    3 如果退订消息 ,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。则接受信息如下

    (三)取消订阅
    使用Unsubscribe 指令,格式为 UNSUBSCRIBE channel [channel ...]

    127.0.0.1:6379>  unsubscribe fruit
    1) "unsubscribe"
    2) "fruit"
    3) (integer) 0
    

    参考文章https://blog.csdn.net/qq_34212276/article/details/78455004

  • 相关阅读:
    Shell printf 命令
    Shell echo命令
    Shell 基本运算符
    Shell 数组
    Shell 传递参数
    Shell 变量
    Spark基础知识汇总
    DataScientist————汇总篇
    Java的Unsafe类
    java 中文字符和unicode编码值相互转化
  • 原文地址:https://www.cnblogs.com/gloria-liu/p/10232455.html
Copyright © 2011-2022 走看看