zoukankan      html  css  js  c++  java
  • Redis 任务队列(生产者消费者)

    安装reids参考下面网址:

     http://www.runoob.com/redis/redis-install.html

    pom.xml中的jar包

     1 <dependency>
     2         <groupId>org.springframework.data</groupId>
     3         <artifactId>spring-data-redis</artifactId>
     4         <version>1.6.0.RELEASE</version>
     5     </dependency>
     6 
     7     <!-- jedis -->
     8     <dependency>
     9           <groupId>redis.clients</groupId>
    10           <artifactId>jedis</artifactId>
    11            <version>2.5.1</version>
    12     </dependency>   

    Jedis的工具类JedisUtil

      1 package demo3;
      2 
      3 import java.util.List;
      4 
      5 import redis.clients.jedis.Jedis;
      6 public class JedisUtil {
      7     private static Jedis jedis = null;
      8     /**
      9      * 存储REDIS队列 顺序存储
     10      * @param key 字节类型
     11      * @param value 字节类型
     12      */
     13     public static void lpush(byte[] key,byte[] value){
     14         try {
     15             jedis = RedisPool.getJedis();
     16             jedis.lpush(key,value);
     17         } catch (Exception e) {
     18             e.printStackTrace();
     19         }finally {
     20             RedisPool.returnResource(jedis);
     21         }
     22     }
     23     /**
     24      * 存储REDIS队列 反序存储
     25      * @param key 字节类型
     26      * @param value 字节类型
     27      */
     28     public static void rpush(byte[] key,byte[] value){
     29         try {
     30             jedis = RedisPool.getJedis();
     31             jedis.rpush(key,value);
     32         } catch (Exception e) {
     33             e.printStackTrace();
     34         }finally {
     35             RedisPool.returnResource(jedis);
     36         }
     37     }
     38     /**
     39      * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回,就可以实现任务队列
     40      * @param srckey 原队列的key
     41      * @param dstkey 目标队列的key
     42      */
     43     public static byte[] rpoplpush(byte[] srckey,byte[] dstkey){
     44         byte[] value = null;
     45         try {
     46             jedis = RedisPool.getJedis();
     47             value= jedis.rpoplpush(srckey,dstkey);
     48         } catch (Exception e) {
     49             e.printStackTrace();
     50         }finally {
     51             RedisPool.returnResource(jedis);
     52         }
     53         return value;
     54     }
     55     /**
     56      * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
     57      * @param srckey
     58      * @param dstkey
     59      * @param timout
     60      * @return
     61      */
     62     public static byte[] brpoplpush(byte[] srckey,byte[] dstkey,int timout){
     63         byte[] value = null;
     64         try {
     65             jedis = RedisPool.getJedis();
     66             value = jedis.brpoplpush(srckey,dstkey,timout);
     67         } catch (Exception e) {
     68             e.printStackTrace();
     69         } finally {
     70             RedisPool.returnResource(jedis);
     71         }
     72         return value;
     73     }
     74     /**
     75      * 设置实现任务队列的键和过期时间
     76      * @param key
     77      * @param timeout
     78      */
     79     public static List<byte[]> brpop(byte[] key, int timeout){
     80         List<byte[]> result = null;
     81         try {
     82             jedis = RedisPool.getJedis();
     83             result=jedis.brpop(0,key);
     84         } catch (Exception e) {
     85             e.printStackTrace();
     86         } finally {
     87             RedisPool.returnResource(jedis);
     88         }
     89         return result;
     90     }
     91     /**
     92      * 移除队列中的最后一个元素并显示最后一个元素
     93      * @param key
     94      * @return
     95      */
     96     public static byte[] rpop(byte[] key) {
     97         byte[] bytes = null;
     98         try {
     99             jedis = RedisPool.getJedis();
    100             bytes = jedis.rpop(key);
    101         } catch (Exception e) {
    102             e.printStackTrace();
    103         } finally {
    104             RedisPool.returnResource(jedis);
    105         }
    106         return bytes;
    107     }
    108 }

    用到的工具类RedisPool

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 import redis.clients.jedis.JedisPool;
     5 import redis.clients.jedis.JedisPoolConfig;
     6 
     7 public class RedisPool {
     8      private static final JedisPool jedisPool;
     9      private static final String REDIS_IP="127.0.0.1";
    10      private static final int REIDSPORT=6379;
    11      
    12     static {
    13         jedisPool=new JedisPool(new JedisPoolConfig(), REDIS_IP, REIDSPORT);
    14     }
    15     
    16     public static Jedis getJedis(){
    17         return jedisPool.getResource();
    18     }
    19     
    20     public static void returnResource(Jedis jedis){
    21         jedisPool.returnResource(jedis);
    22     }
    23 }

    用到的工具类SerialoizebleUtil

    package demo3;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    
    public class SerialoizebleUtil {
    public static Object ByteToObject(byte[] bytes) {  
     Object obj = null;  
     try {  
         // bytearray to object  
         ByteArrayInputStream bi = new ByteArrayInputStream(bytes);  
         ObjectInputStream oi = new ObjectInputStream(bi);  
       
         obj = oi.readObject();  
         bi.close();  
         oi.close();  
     } catch (Exception e) {  
         System.out.println("translation" + e.getMessage());  
         e.printStackTrace();  
     }  
            return obj;  
     }
    
    public static byte[] ObjectToByte(java.lang.Object obj) {  
        byte[] bytes = null;  
        try {  
            // object to bytearray  
            ByteArrayOutputStream bo = new ByteArrayOutputStream();  
            ObjectOutputStream oo = new ObjectOutputStream(bo);  
            oo.writeObject(obj);  
      
            bytes = bo.toByteArray();  
      
            bo.close();  
            oo.close();  
        } catch (Exception e) {  
            System.out.println("translation" + e.getMessage());  
            e.printStackTrace();  
        }  
        return bytes;  
    }
    }

    实体类MessageUtil,实现存入Redis中的是对象,不是单单的基本类型,存入Redis中的对象需要实现序列化接口

     1 package model;
     2 
     3 import java.io.Serializable;
     4 
     5 public class Message implements Serializable{
     6 
     7     /**
     8      * 
     9      */
    10     private static final long serialVersionUID = 1L;
    11 
    12     private String titile;
    13     private String info;
    14      
    15     public Message(String titile,String info){
    16     this.titile=titile;
    17     this.info=info;
    18     }
    19 
    20     public String getTitile() {
    21         return titile;
    22     }
    23 
    24     public void setTitile(String titile) {
    25         this.titile = titile;
    26     }
    27 
    28     public String getInfo() {
    29         return info;
    30     }
    31 
    32     public void setInfo(String info) {
    33         this.info = info;
    34     }
    35     
    36     
    37 }

    使用两个Redis列表,一个队列作为生成者,一个队列作为消费者,加上线程实现两个列表,一个列表产生任务,通过任务队列,另一个列表处理任务

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 
     5 public class InitList {
     6     public static byte[] rediskey = "key".getBytes();
     7     public static byte[] dstkey = "dstkey".getBytes();
     8     public static long time = 0;
     9     public static int i = 0;
    10 
    11     public static void main(String args[]) {
    12         Jedis jedis = RedisPool.getJedis();
    13         while (true) {
    14             try {
    15                 MessageUtil msg1 = new MessageUtil();
    16                 msg1.setId(i);
    17                 msg1.setContent("wq" + i);
    18                 // JedisUtil.lpush(rediskey,SerialoizebleUtil.serialize(msg1));
    19                 JedisUtil.lpush(rediskey, SerialoizebleUtil.ObjectToByte(msg1));
    20                 time = 2000;
    21                 System.out.println("success" + i);
    22                 System.out.println(jedis.lrange(rediskey, 0, 100));
    23                 i++;
    24                 Thread.sleep(time);
    25             } catch (InterruptedException e) {
    26                 e.printStackTrace();
    27             }
    28         }
    29     }
    30 }

    控制台输出如下,说明一直在向列表中插入新产生的对象

    再开启一个线程,使用Redis中的brpoplpush方法,实现任务队列原理

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 
     5 public class JedisUtilTest {
     6     public static byte[] rediskey = "key".getBytes();
     7     public static byte[] dstkey = "dstkey".getBytes();
     8     public static long time = 0;
     9 
    10     public static void main(String args[]) {
    11         Jedis jedis = RedisPool.getJedis();
    12         while (true) {
    13             try {
    14                 byte[] bytes = JedisUtil.brpoplpush(rediskey, dstkey, 0);
    15                 MessageUtil msg = (MessageUtil) SerialoizebleUtil.ByteToObject(bytes);
    16                 if (msg != null) {
    17                     System.out.println(msg.getId() + " " + msg.getContent());
    18                 }
    19                 time = 3000;
    20                 System.out.println(jedis.lrange(rediskey, 0, 100));
    21                 System.out.println(jedis.lrange(dstkey, 0, 100));
    22                 Thread.sleep(time);
    23             } catch (InterruptedException e) {
    24                 e.printStackTrace();
    25             }
    26         }
    27     }
    28 }

    控制台输出如下,上面的列表存的是刚才产生的6个对象,下面圈出来的是新的列表,可以看出新的列表的对象在递增,说明成功实现了任务队列原理

  • 相关阅读:
    ecshop 整合 kindedotor
    css 一些小笔记
    linux 使用 随记录
    GIPZ 压缩
    js 代码 随记
    map和list循环遍历
    向数据库批量处理事件
    链表和数组的优劣比较
    内存对齐 和 sizeof小结
    C++的默认构造函数与构造函数
  • 原文地址:https://www.cnblogs.com/SunAutumn/p/7645265.html
Copyright © 2011-2022 走看看