zoukankan      html  css  js  c++  java
  • Redis 任务队列(生产者消费者)

    安装reids参考下面网址:

     http://www.runoob.com/redis/redis-install.html

    pom.xml中的jar包

     1 <dependency>
     2         <groupId>org.springframework.data</groupId>
     3         <artifactId>spring-data-redis</artifactId>
     4         <version>1.6.0.RELEASE</version>
     5     </dependency>
     6 
     7     <!-- jedis -->
     8     <dependency>
     9           <groupId>redis.clients</groupId>
    10           <artifactId>jedis</artifactId>
    11            <version>2.5.1</version>
    12     </dependency>   

    Jedis的工具类JedisUtil

      1 package demo3;
      2 
      3 import java.util.List;
      4 
      5 import redis.clients.jedis.Jedis;
      6 public class JedisUtil {
      7     private static Jedis jedis = null;
      8     /**
      9      * 存储REDIS队列 顺序存储
     10      * @param key 字节类型
     11      * @param value 字节类型
     12      */
     13     public static void lpush(byte[] key,byte[] value){
     14         try {
     15             jedis = RedisPool.getJedis();
     16             jedis.lpush(key,value);
     17         } catch (Exception e) {
     18             e.printStackTrace();
     19         }finally {
     20             RedisPool.returnResource(jedis);
     21         }
     22     }
     23     /**
     24      * 存储REDIS队列 反序存储
     25      * @param key 字节类型
     26      * @param value 字节类型
     27      */
     28     public static void rpush(byte[] key,byte[] value){
     29         try {
     30             jedis = RedisPool.getJedis();
     31             jedis.rpush(key,value);
     32         } catch (Exception e) {
     33             e.printStackTrace();
     34         }finally {
     35             RedisPool.returnResource(jedis);
     36         }
     37     }
     38     /**
     39      * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回,就可以实现任务队列
     40      * @param srckey 原队列的key
     41      * @param dstkey 目标队列的key
     42      */
     43     public static byte[] rpoplpush(byte[] srckey,byte[] dstkey){
     44         byte[] value = null;
     45         try {
     46             jedis = RedisPool.getJedis();
     47             value= jedis.rpoplpush(srckey,dstkey);
     48         } catch (Exception e) {
     49             e.printStackTrace();
     50         }finally {
     51             RedisPool.returnResource(jedis);
     52         }
     53         return value;
     54     }
     55     /**
     56      * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
     57      * @param srckey
     58      * @param dstkey
     59      * @param timout
     60      * @return
     61      */
     62     public static byte[] brpoplpush(byte[] srckey,byte[] dstkey,int timout){
     63         byte[] value = null;
     64         try {
     65             jedis = RedisPool.getJedis();
     66             value = jedis.brpoplpush(srckey,dstkey,timout);
     67         } catch (Exception e) {
     68             e.printStackTrace();
     69         } finally {
     70             RedisPool.returnResource(jedis);
     71         }
     72         return value;
     73     }
     74     /**
     75      * 设置实现任务队列的键和过期时间
     76      * @param key
     77      * @param timeout
     78      */
     79     public static List<byte[]> brpop(byte[] key, int timeout){
     80         List<byte[]> result = null;
     81         try {
     82             jedis = RedisPool.getJedis();
     83             result=jedis.brpop(0,key);
     84         } catch (Exception e) {
     85             e.printStackTrace();
     86         } finally {
     87             RedisPool.returnResource(jedis);
     88         }
     89         return result;
     90     }
     91     /**
     92      * 移除队列中的最后一个元素并显示最后一个元素
     93      * @param key
     94      * @return
     95      */
     96     public static byte[] rpop(byte[] key) {
     97         byte[] bytes = null;
     98         try {
     99             jedis = RedisPool.getJedis();
    100             bytes = jedis.rpop(key);
    101         } catch (Exception e) {
    102             e.printStackTrace();
    103         } finally {
    104             RedisPool.returnResource(jedis);
    105         }
    106         return bytes;
    107     }
    108 }

    用到的工具类RedisPool

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 import redis.clients.jedis.JedisPool;
     5 import redis.clients.jedis.JedisPoolConfig;
     6 
     7 public class RedisPool {
     8      private static final JedisPool jedisPool;
     9      private static final String REDIS_IP="127.0.0.1";
    10      private static final int REIDSPORT=6379;
    11      
    12     static {
    13         jedisPool=new JedisPool(new JedisPoolConfig(), REDIS_IP, REIDSPORT);
    14     }
    15     
    16     public static Jedis getJedis(){
    17         return jedisPool.getResource();
    18     }
    19     
    20     public static void returnResource(Jedis jedis){
    21         jedisPool.returnResource(jedis);
    22     }
    23 }

    用到的工具类SerialoizebleUtil

    package demo3;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    
    public class SerialoizebleUtil {
    public static Object ByteToObject(byte[] bytes) {  
     Object obj = null;  
     try {  
         // bytearray to object  
         ByteArrayInputStream bi = new ByteArrayInputStream(bytes);  
         ObjectInputStream oi = new ObjectInputStream(bi);  
       
         obj = oi.readObject();  
         bi.close();  
         oi.close();  
     } catch (Exception e) {  
         System.out.println("translation" + e.getMessage());  
         e.printStackTrace();  
     }  
            return obj;  
     }
    
    public static byte[] ObjectToByte(java.lang.Object obj) {  
        byte[] bytes = null;  
        try {  
            // object to bytearray  
            ByteArrayOutputStream bo = new ByteArrayOutputStream();  
            ObjectOutputStream oo = new ObjectOutputStream(bo);  
            oo.writeObject(obj);  
      
            bytes = bo.toByteArray();  
      
            bo.close();  
            oo.close();  
        } catch (Exception e) {  
            System.out.println("translation" + e.getMessage());  
            e.printStackTrace();  
        }  
        return bytes;  
    }
    }

    实体类MessageUtil,实现存入Redis中的是对象,不是单单的基本类型,存入Redis中的对象需要实现序列化接口

     1 package model;
     2 
     3 import java.io.Serializable;
     4 
     5 public class Message implements Serializable{
     6 
     7     /**
     8      * 
     9      */
    10     private static final long serialVersionUID = 1L;
    11 
    12     private String titile;
    13     private String info;
    14      
    15     public Message(String titile,String info){
    16     this.titile=titile;
    17     this.info=info;
    18     }
    19 
    20     public String getTitile() {
    21         return titile;
    22     }
    23 
    24     public void setTitile(String titile) {
    25         this.titile = titile;
    26     }
    27 
    28     public String getInfo() {
    29         return info;
    30     }
    31 
    32     public void setInfo(String info) {
    33         this.info = info;
    34     }
    35     
    36     
    37 }

    使用两个Redis列表,一个队列作为生成者,一个队列作为消费者,加上线程实现两个列表,一个列表产生任务,通过任务队列,另一个列表处理任务

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 
     5 public class InitList {
     6     public static byte[] rediskey = "key".getBytes();
     7     public static byte[] dstkey = "dstkey".getBytes();
     8     public static long time = 0;
     9     public static int i = 0;
    10 
    11     public static void main(String args[]) {
    12         Jedis jedis = RedisPool.getJedis();
    13         while (true) {
    14             try {
    15                 MessageUtil msg1 = new MessageUtil();
    16                 msg1.setId(i);
    17                 msg1.setContent("wq" + i);
    18                 // JedisUtil.lpush(rediskey,SerialoizebleUtil.serialize(msg1));
    19                 JedisUtil.lpush(rediskey, SerialoizebleUtil.ObjectToByte(msg1));
    20                 time = 2000;
    21                 System.out.println("success" + i);
    22                 System.out.println(jedis.lrange(rediskey, 0, 100));
    23                 i++;
    24                 Thread.sleep(time);
    25             } catch (InterruptedException e) {
    26                 e.printStackTrace();
    27             }
    28         }
    29     }
    30 }

    控制台输出如下,说明一直在向列表中插入新产生的对象

    再开启一个线程,使用Redis中的brpoplpush方法,实现任务队列原理

     1 package demo3;
     2 
     3 import redis.clients.jedis.Jedis;
     4 
     5 public class JedisUtilTest {
     6     public static byte[] rediskey = "key".getBytes();
     7     public static byte[] dstkey = "dstkey".getBytes();
     8     public static long time = 0;
     9 
    10     public static void main(String args[]) {
    11         Jedis jedis = RedisPool.getJedis();
    12         while (true) {
    13             try {
    14                 byte[] bytes = JedisUtil.brpoplpush(rediskey, dstkey, 0);
    15                 MessageUtil msg = (MessageUtil) SerialoizebleUtil.ByteToObject(bytes);
    16                 if (msg != null) {
    17                     System.out.println(msg.getId() + " " + msg.getContent());
    18                 }
    19                 time = 3000;
    20                 System.out.println(jedis.lrange(rediskey, 0, 100));
    21                 System.out.println(jedis.lrange(dstkey, 0, 100));
    22                 Thread.sleep(time);
    23             } catch (InterruptedException e) {
    24                 e.printStackTrace();
    25             }
    26         }
    27     }
    28 }

    控制台输出如下,上面的列表存的是刚才产生的6个对象,下面圈出来的是新的列表,可以看出新的列表的对象在递增,说明成功实现了任务队列原理

  • 相关阅读:
    SAP S/4HANA extensibility扩展原理介绍
    SAP CRM系统订单模型的设计与实现
    使用nodejs代码在SAP C4C里创建Individual customer
    SAP Cloud for Customer Account和individual customer的区别
    Let the Balloon Rise map一个数组
    How Many Tables 简单并查集
    Heap Operations 优先队列
    Arpa’s obvious problem and Mehrdad’s terrible solution 思维
    Passing the Message 单调栈两次
    The Suspects 并查集
  • 原文地址:https://www.cnblogs.com/SunAutumn/p/7645265.html
Copyright © 2011-2022 走看看