zoukankan      html  css  js  c++  java
  • Spring-data-redis: 分布式队列

    Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。

    Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。

    我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。

    一.配置文件:

    [java] view plain copy
    1. <beans xmlns="http://www.springframework.org/schema/beans"   
    2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    3. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">  
    4.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
    5.         <property name="maxActive" value="32"></property>  
    6.         <property name="maxIdle" value="6"></property>  
    7.         <property name="maxWait" value="15000"></property>  
    8.         <property name="minEvictableIdleTimeMillis" value="300000"></property>  
    9.         <property name="numTestsPerEvictionRun" value="3"></property>  
    10.         <property name="timeBetweenEvictionRunsMillis" value="60000"></property>  
    11.         <property name="whenExhaustedAction" value="1"></property>  
    12.     </bean>  
    13.     <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">  
    14.         <property name="poolConfig" ref="jedisPoolConfig"></property>  
    15.         <property name="hostName" value="127.0.0.1"></property>  
    16.         <property name="port" value="6379"></property>  
    17.         <property name="password" value="0123456"></property>  
    18.         <property name="timeout" value="15000"></property>  
    19.         <property name="usePool" value="true"></property>  
    20.     </bean>  
    21.     <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  
    22.         <property name="connectionFactory" ref="jedisConnectionFactory"></property>  
    23.         <property name="defaultSerializer">  
    24.             <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>  
    25.         </property>  
    26.     </bean>  
    27.     <bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>  
    28.     <bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">  
    29.         <property name="redisTemplate" ref="jedisTemplate"></property>  
    30.         <property name="key" value="user:queue"></property>  
    31.         <property name="listener" ref="jedisQueueListener"></property>  
    32.     </bean>  
    33. </beans>  

    二.程序实例:

    1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。

    [java] view plain copy
    1. public interface RedisQueueListener<T> {  
    2.   
    3.     public void onMessage(T value);  
    4. }  
    [java] view plain copy
    1. public class QueueListener<String> implements RedisQueueListener<String> {  
    2.   
    3.     @Override  
    4.     public void onMessage(String value) {  
    5.         System.out.println(value);  
    6.           
    7.     }  
    8.   
    9. }  

    2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。

    [java] view plain copy
    1. public class RedisQueue<T> implements InitializingBean,DisposableBean{  
    2.     private RedisTemplate redisTemplate;  
    3.     private String key;  
    4.     private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据  
    5.     private byte[] rawKey;  
    6.     private RedisConnectionFactory factory;  
    7.     private RedisConnection connection;//for blocking  
    8.     private BoundListOperations<String, T> listOperations;//noblocking  
    9.       
    10.     private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑  
    11.       
    12.     private RedisQueueListener listener;//异步回调  
    13.     private Thread listenerThread;  
    14.       
    15.     private boolean isClosed;  
    16.       
    17.     public void setRedisTemplate(RedisTemplate redisTemplate) {  
    18.         this.redisTemplate = redisTemplate;  
    19.     }  
    20.   
    21.     public void setListener(RedisQueueListener listener) {  
    22.         this.listener = listener;  
    23.     }  
    24.   
    25.     public void setKey(String key) {  
    26.         this.key = key;  
    27.     }  
    28.       
    29.   
    30.     @Override  
    31.     public void afterPropertiesSet() throws Exception {  
    32.         factory = redisTemplate.getConnectionFactory();  
    33.         connection = RedisConnectionUtils.getConnection(factory);  
    34.         rawKey = redisTemplate.getKeySerializer().serialize(key);  
    35.         listOperations = redisTemplate.boundListOps(key);  
    36.         if(listener != null){  
    37.             listenerThread = new ListenerThread();  
    38.             listenerThread.setDaemon(true);  
    39.             listenerThread.start();  
    40.         }  
    41.     }  
    42.       
    43.       
    44.     /** 
    45.      * blocking 
    46.      * remove and get last item from queue:BRPOP 
    47.      * @return 
    48.      */  
    49.     public T takeFromTail(int timeout) throws InterruptedException{   
    50.         lock.lockInterruptibly();  
    51.         try{  
    52.             List<byte[]> results = connection.bRPop(timeout, rawKey);  
    53.             if(CollectionUtils.isEmpty(results)){  
    54.                 return null;  
    55.             }  
    56.             return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  
    57.         }finally{  
    58.             lock.unlock();  
    59.         }  
    60.     }  
    61.       
    62.     public T takeFromTail() throws InterruptedException{  
    63.         return takeFromHead(0);  
    64.     }  
    65.       
    66.     /** 
    67.      * 从队列的头,插入 
    68.      */  
    69.     public void pushFromHead(T value){  
    70.         listOperations.leftPush(value);  
    71.     }  
    72.       
    73.     public void pushFromTail(T value){  
    74.         listOperations.rightPush(value);  
    75.     }  
    76.       
    77.     /** 
    78.      * noblocking 
    79.      * @return null if no item in queue 
    80.      */  
    81.     public T removeFromHead(){  
    82.         return listOperations.leftPop();  
    83.     }  
    84.       
    85.     public T removeFromTail(){  
    86.         return listOperations.rightPop();  
    87.     }  
    88.       
    89.     /** 
    90.      * blocking 
    91.      * remove and get first item from queue:BLPOP 
    92.      * @return 
    93.      */  
    94.     public T takeFromHead(int timeout) throws InterruptedException{  
    95.         lock.lockInterruptibly();  
    96.         try{  
    97.             List<byte[]> results = connection.bLPop(timeout, rawKey);  
    98.             if(CollectionUtils.isEmpty(results)){  
    99.                 return null;  
    100.             }  
    101.             return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  
    102.         }finally{  
    103.             lock.unlock();  
    104.         }  
    105.     }  
    106.       
    107.     public T takeFromHead() throws InterruptedException{  
    108.         return takeFromHead(0);  
    109.     }  
    110.   
    111.     @Override  
    112.     public void destroy() throws Exception {  
    113.         if(isClosed){  
    114.             return;  
    115.         }  
    116.         shutdown();  
    117.         RedisConnectionUtils.releaseConnection(connection, factory);  
    118.     }  
    119.       
    120.     private void shutdown(){  
    121.         try{  
    122.             listenerThread.interrupt();  
    123.         }catch(Exception e){  
    124.             //  
    125.         }  
    126.     }  
    127.       
    128.     class ListenerThread extends Thread {  
    129.           
    130.         @Override  
    131.         public void run(){  
    132.             try{  
    133.                 while(true){  
    134.                     T value = takeFromHead();//cast exceptionyou should check.  
    135.                     //逐个执行  
    136.                     if(value != null){  
    137.                         try{  
    138.                             listener.onMessage(value);  
    139.                         }catch(Exception e){  
    140.                             //  
    141.                         }  
    142.                     }  
    143.                 }  
    144.             }catch(InterruptedException e){  
    145.                 //  
    146.             }  
    147.         }  
    148.     }  
    149.       
    150. }  

    3) 使用与测试:

    [java] view plain copy
    1. public static void main(String[] args) throws Exception{  
    2.     ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");  
    3.     RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");  
    4.     redisQueue.pushFromHead("test:app");  
    5.     Thread.sleep(15000);  
    6.     redisQueue.pushFromHead("test:app");  
    7.     Thread.sleep(15000);  
    8.     redisQueue.destroy();  
    9. }  

    在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。

  • 相关阅读:
    Notepad++的ftp远程编辑功能
    Scapy脚本执行出现警告WARNING解决办法
    在文本域textarea里添加含换行的字符串
    Python打包成exe工具
    SQL字段数据类型集锦
    解决Python扩展: Unable to find vcvarsall.bat
    解决安装python第三方模块 'mnocygwin'报错的问题
    python安装wmi模块
    ubuntu设置本地软件安装源【DVD版】
    linux下expect安装
  • 原文地址:https://www.cnblogs.com/duyinqiang/p/5696315.html
Copyright © 2011-2022 走看看