zoukankan      html  css  js  c++  java
  • spring 集成 redis -- pub/sub

    redis除了常用的当做缓存外,还可以当做简单的消息中间件,实现消息发布订阅

    spring集成redis,可以使用spring-data-redis

    首先引入相关maven依赖(此处我spring相关版本是4.3.13.RELEASE,具体版本对应大家可自行查看官网文档),如下:

            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-redis</artifactId>
                <version>1.4.2.RELEASE</version>
            </dependency>
            
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.6.2</version>
            </dependency>        

    下面开始进行spring相关配置,此处采用xml配置。配置如下:

    1、pub端配置,消息发送主要通过RedisTemplate的convertAndSend方法,官网样例如下:

    发送demo:

    // send message through connection RedisConnection con = ...
    byte[] msg = ...
    byte[] channel = ...
    con.publish(msg, channel); // send message through RedisTemplate
    RedisTemplate template = ...
    template.convertAndSend("hello!", "world");  //第一个参数是chanel名称, 第二个参数是发送的消息
    
    <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">  
            <property name="maxTotal" value="1000"/> <!-- 控制一个pool可分配多少个jedis实例 -->  
        	<property name="maxIdle" value="200" />   <!-- 控制一个pool最多有多少个状态为idle(空闲)的jedis实例 -->  
        	<property name="maxWaitMillis" value="2000" />  <!-- 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException -->  
        	<property name="testOnBorrow" value="true" /> <!-- 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的 -->  
        </bean>  
    	
    	<bean id="jedisConnFactory" 
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" 
        p:use-pool="true" p:hostName="192.168.1.116" p:port="6379" p:password="" p:pool-config-ref="poolConfig"/>
    
    	<!-- redis template definition -->
    	<bean id = "redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
          <property name="connectionFactory" ref="jedisConnFactory"/>
          <property name="keySerializer">
             <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"></bean>
          </property>
       </bean>
    

    2、sub端配置

        <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
    	  <constructor-arg ref="redisMessageListener"/>
    	</bean>
    	
    	<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
    	  <property name="connectionFactory" ref="jedisConnFactory"/>
    	  <property name="messageListeners">
    	    <map>
    	      <entry key-ref="messageListener">
    	      	<list>
                  <!--监听的chanel列表--> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="product_bank"/> </bean> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="product_trust"/> </bean> </list> </entry> </map> </property> </bean>
    com.tianwen.dcdp.redis.RedisMessageListener.java 代码如下:
    package com.tianwen.dcdp.redis;
    
    import java.io.Serializable;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.mongodb.core.MongoTemplate;
    import org.springframework.data.mongodb.core.query.Criteria;
    import org.springframework.data.mongodb.core.query.Query;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import com.tianwen.dcdp.common.CollectionEnum;
    import com.tianwen.dcdp.finance.task.BankProfTask;
    import com.tianwen.dcdp.finance.task.TrustProfTask;
    import com.tianwen.dcdp.pojo.FinanceBankWithBLOBs;
    import com.tianwen.dcdp.pojo.FinanceTrustWithBLOBs;
    import com.tianwen.dcdp.service.impl.TaskServiceImpl;
    
    @Component("redisMessageListener")
    //实际处理消息的类 public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate<Serializable, Serializable> redisTemplate; @Autowired private MongoTemplate mongoTemplate; private Logger logger = LoggerFactory.getLogger(RedisMessageListener.class); @Override public void onMessage(Message message, byte[] pattern) { byte[] bodyByte = message.getBody(); //消息内容 byte[] channelByte = message.getChannel(); //监听的chanel String channel = redisTemplate.getStringSerializer().deserialize(channelByte); //消息反序列变化时,需要与消息发送时的redisSerializer一致 String body = (String) redisTemplate.getValueSerializer().deserialize(bodyByte);
    logger.debug("received message : {} , from channel : {}", new Object[] {body, channel}); if(channel.equalsIgnoreCase("product_bank")) { FinanceBankWithBLOBs bank = mongoTemplate.findOne(Query.query(Criteria.where("_id").is(body)), FinanceBankWithBLOBs.class, CollectionEnum.BANK.name()); //计算产品特性 if(bank != null) { TaskServiceImpl.ext.execute(new BankProfTask(mongoTemplate, bank)); } } else if(channel.equalsIgnoreCase("product_trust")) { FinanceTrustWithBLOBs trust = mongoTemplate.findOne(Query.query(Criteria.where("_id").is(body)), FinanceTrustWithBLOBs.class, CollectionEnum.TRUST.name()); //计算产品特性 if(trust != null) { TaskServiceImpl.ext.execute(new TrustProfTask(mongoTemplate, trust)); } } } }

     ,到此,配置完毕,当监听的chanel有消息进入时,就会调用上述的onMessage方法。

    若实际消息处理类没有实现MessageListener接口,查看MessageListenerAdapter源码发现,可通过为其配置defaultListenerMethod参数,指定具体的消息处理方法名称。

  • 相关阅读:
    jquery中$.get()提交和$.post()提交有区别吗?
    数据库连接池的原理。为什么要使用连接池。
    execute,executeQuery,executeUpdate的区别是什么?
    数据库的三级模式与二级映像
    KMP算法(超容易理解的next数组求法)
    软件危机(含通俗理解帮助记忆)
    O(1)复杂度求一个栈的最小值
    操作系统进程状态模型
    判断单链表是否有环,如果有环则找到其环的入口
    两个单链表判断是否相交
  • 原文地址:https://www.cnblogs.com/yinz/p/9316167.html
Copyright © 2011-2022 走看看