zoukankan      html  css  js  c++  java
  • Java利用Redis实现消息队列

    应用场景

    • 为什么要用redis?
      二进制存储、java序列化传输、IO连接数高、连接频繁

    一、序列化

      这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 
    其代码如下:

    复制代码
     1 package Utils;
     2 import java.io.*;
     3 /**
     4  * Created by Kinglf on 2016/10/17.
     5  */
     6 public class ObjectUtil {
     7     /**
     8      * 对象转byte[]
     9      * @param obj
    10      * @return
    11      * @throws IOException
    12      */
    13     public static byte[] object2Bytes(Object obj) throws IOException{
    14         ByteArrayOutputStream bo=new ByteArrayOutputStream();
    15         ObjectOutputStream oo=new ObjectOutputStream(bo);
    16         oo.writeObject(obj);
    17         byte[] bytes=bo.toByteArray();
    18         bo.close();
    19         oo.close();
    20         return bytes;
    21     }
    22     /**
    23      * byte[]转对象
    24      * @param bytes
    25      * @return
    26      * @throws Exception
    27      */
    28     public static Object bytes2Object(byte[] bytes) throws Exception{
    29         ByteArrayInputStream in=new ByteArrayInputStream(bytes);
    30         ObjectInputStream sIn=new ObjectInputStream(in);
    31         return sIn.readObject();
    32     }
    33 }
    复制代码

    二、消息类(实现Serializable接口)

    复制代码
    package Model;
    
    import java.io.Serializable;
    
    /**
     * Created by Kinglf on 2016/10/17.
     */
    public class Message implements Serializable {
    
        private static final long serialVersionUID = -389326121047047723L;
        private int id;
        private String content;
        public Message(int id, String content) {
            this.id = id;
            this.content = content;
        }
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getContent() {
            return content;
        }
        public void setContent(String content) {
            this.content = content;
        }
    }
    复制代码

    三、Redis的操作

      利用redis做队列,我们采用的是redis中list的push和pop操作; 
    结合队列的特点: 
    只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可 
      java采用Jedis进行Redis的存储和Redis的连接池设置 
    上代码:

    复制代码
    package Utils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    /**
     * Created by Kinglf on 2016/10/17.
     */
    public class JedisUtil {
        private static String JEDIS_IP;
        private static int JEDIS_PORT;
        private static String JEDIS_PASSWORD;
        private static JedisPool jedisPool;
        static {
            //Configuration自行写的配置文件解析类,继承自Properties
            Configuration conf=Configuration.getInstance();
            JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
            JEDIS_PORT=conf.getInt("jedis.port",6379);
            JEDIS_PASSWORD=conf.getString("jedis.password",null);
            JedisPoolConfig config=new JedisPoolConfig();
            config.setMaxActive(5000);
            config.setMaxIdle(256);
            config.setMaxWait(5000L);
            config.setTestOnBorrow(true);
            config.setTestOnReturn(true);
            config.setTestWhileIdle(true);
            config.setMinEvictableIdleTimeMillis(60000L);
            config.setTimeBetweenEvictionRunsMillis(3000L);
            config.setNumTestsPerEvictionRun(-1);
            jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
        }
        /**
         * 获取数据
         * @param key
         * @return
         */
        public static String get(String key){
            String value=null;
            Jedis jedis=null;
            try{
                jedis=jedisPool.getResource();
                value=jedis.get(key);
            }catch (Exception e){
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            }finally {
                close(jedis);
            }
            return value;
        }
    
        private static void close(Jedis jedis) {
            try{
                jedisPool.returnResource(jedis);
            }catch (Exception e){
                if(jedis.isConnected()){
                    jedis.quit();
                    jedis.disconnect();
                }
            }
        }
        public static byte[] get(byte[] key){
            byte[] value = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                value = jedis.get(key);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
    
            return value;
        }
    
        public static void set(byte[] key, byte[] value) {
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.set(key, value);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        public static void set(byte[] key, byte[] value, int time) {
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.set(key, value);
                jedis.expire(key, time);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        public static void hset(byte[] key, byte[] field, byte[] value) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.hset(key, field, value);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        public static void hset(String key, String field, String value) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.hset(key, field, value);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        /**
         * 获取数据
         *
         * @param key
         * @return
         */
        public static String hget(String key, String field) {
    
            String value = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                value = jedis.hget(key, field);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
    
            return value;
        }
        /**
         * 获取数据
         *
         * @param key
         * @return
         */
        public static byte[] hget(byte[] key, byte[] field) {
    
            byte[] value = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                value = jedis.hget(key, field);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
    
            return value;
        }
        public static void hdel(byte[] key, byte[] field) {
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.hdel(key, field);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
        /**
         * 存储REDIS队列 顺序存储
         * @param  key reids键名
         * @param  value 键值
         */
        public static void lpush(byte[] key, byte[] value) {
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.lpush(key, value);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        /**
         * 存储REDIS队列 反向存储
         * @param  key reids键名
         * @param  value 键值
         */
        public static void rpush(byte[] key, byte[] value) {
    
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                jedis.rpush(key, value);
    
            } catch (Exception e) {
    
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
    
                //返还到连接池
                close(jedis);
    
            }
        }
    
        /**
         * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
         * @param  key reids键名
         * @param  destination 键值
         */
        public static void rpoplpush(byte[] key, byte[] destination) {
    
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                jedis.rpoplpush(key, destination);
    
            } catch (Exception e) {
    
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
    
                //返还到连接池
                close(jedis);
    
            }
        }
    
        /**
         * 获取队列数据
         * @param  key 键名
         * @return
         */
        public static List lpopList(byte[] key) {
    
            List list = null;
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                list = jedis.lrange(key, 0, -1);
    
            } catch (Exception e) {
    
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
    
                //返还到连接池
                close(jedis);
    
            }
            return list;
        }
        /**
         * 获取队列数据
         * @param  key 键名
         * @return
         */
        public static byte[] rpop(byte[] key) {
    
            byte[] bytes = null;
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                bytes = jedis.rpop(key);
    
            } catch (Exception e) {
    
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
    
                //返还到连接池
                close(jedis);
    
            }
            return bytes;
        }
        public static void hmset(Object key, Map hash) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.hmset(key.toString(), hash);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
    
            }
        }
        public static void hmset(Object key, Map hash, int time) {
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                jedis.hmset(key.toString(), hash);
                jedis.expire(key.toString(), time);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
    
            }
        }
        public static List hmget(Object key, String... fields) {
            List result = null;
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                result = jedis.hmget(key.toString(), fields);
    
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
    
            }
            return result;
        }
    
        public static Set hkeys(String key) {
            Set result = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                result = jedis.hkeys(key);
    
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
    
            }
            return result;
        }
        public static List lrange(byte[] key, int from, int to) {
            List result = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                result = jedis.lrange(key, from, to);
    
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
    
            }
            return result;
        }
        public static Map hgetAll(byte[] key) {
            Map result = null;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                result = jedis.hgetAll(key);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
    
            } finally {
                //返还到连接池
                close(jedis);
            }
            return result;
        }
    
        public static void del(byte[] key) {
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.del(key);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
        }
    
        public static long llen(byte[] key) {
    
            long len = 0;
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.llen(key);
            } catch (Exception e) {
                //释放redis对象
                jedisPool.returnBrokenResource(jedis);
                e.printStackTrace();
            } finally {
                //返还到连接池
                close(jedis);
            }
            return len;
        }
    }
    复制代码

    四、Configuration主要用于读取Redis的配置信息

    复制代码
    package Utils;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * Created by Kinglf on 2016/10/17.
     */
    public class Configuration extends Properties {
    
        private static final long serialVersionUID = -2296275030489943706L;
        private static Configuration instance = null;
    
        public static synchronized Configuration getInstance() {
            if (instance == null) {
                instance = new Configuration();
            }
            return instance;
        }
    
    
        public String getProperty(String key, String defaultValue) {
            String val = getProperty(key);
            return (val == null || val.isEmpty()) ? defaultValue : val;
        }
    
        public String getString(String name, String defaultValue) {
            return this.getProperty(name, defaultValue);
        }
    
        public int getInt(String name, int defaultValue) {
            String val = this.getProperty(name);
            return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
        }
    
        public long getLong(String name, long defaultValue) {
            String val = this.getProperty(name);
            return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
        }
    
        public float getFloat(String name, float defaultValue) {
            String val = this.getProperty(name);
            return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
        }
    
        public double getDouble(String name, double defaultValue) {
            String val = this.getProperty(name);
            return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
        }
    
        public byte getByte(String name, byte defaultValue) {
            String val = this.getProperty(name);
            return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
        }
    
        public Configuration() {
            InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
            try {
                this.loadFromXML(in);
                in.close();
            } catch (IOException ioe) {
    
            }
        }
    }
    复制代码

    五、测试

    复制代码
    import Model.Message;
    import Utils.JedisUtil;
    import Utils.ObjectUtil;
    import redis.clients.jedis.Jedis;
    
    import java.io.IOException;
    
    /**
     * Created by Kinglf on 2016/10/17.
     */
    public class TestRedisQueue {
        public static byte[] redisKey = "key".getBytes();
        static {
            try {
                init();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void init() throws IOException {
            for (int i = 0; i < 1000000; i++) {
                Message message = new Message(i, "这是第" + i + "个内容");
                JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
            }
    
        }
    
        public static void main(String[] args) {
            try {
                pop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void pop() throws Exception {
            byte[] bytes = JedisUtil.rpop(redisKey);
            Message msg = (Message) ObjectUtil.bytes2Object(bytes);
            if (msg != null) {
                System.out.println(msg.getId() + "----" + msg.getContent());
            }
        }
    }
    复制代码
    每执行一次pop()方法,结果如下:
    <br>1----这是第1个内容
    <br>2----这是第2个内容
    <br>3----这是第3个内容
    <br>4----这是第4个内容

    总结

    至此,整个Redis消息队列的生产者和消费者代码已经完成

    1. Message 需要传送的实体类(需实现Serializable接口)
    2. Configuration Redis的配置读取类,继承自Properties
    3. ObjectUtil 将对象和byte数组双向转换的工具类
    4. Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
  • 相关阅读:
    Django Swagger接口文档生成
    基于docker快速搭建hbase集群
    Cassandra数据操作管理工具tableplus
    基于docker创建Cassandra集群
    基于docker快速搭建hive环境
    [20200623]应用报错:当前事务无法提交,而且无法支持写入日志文件的操作
    zabbix--监控 TCP 连接状态
    kubernetes 使用ceph实现动态持久卷存储
    MySQL备份脚本
    Linux Pam后门总结拓展
  • 原文地址:https://www.cnblogs.com/zzsdream/p/6813624.html
Copyright © 2011-2022 走看看