zoukankan      html  css  js  c++  java
  • spring 集成 redis -- pub/sub

    redis除了常用的当做缓存外,还可以当做简单的消息中间件,实现消息发布订阅

    spring集成redis,可以使用spring-data-redis

    首先引入相关maven依赖(此处我spring相关版本是4.3.13.RELEASE,具体版本对应大家可自行查看官网文档),如下:

            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-redis</artifactId>
                <version>1.4.2.RELEASE</version>
            </dependency>
            
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.6.2</version>
            </dependency>        

    下面开始进行spring相关配置,此处采用xml配置。配置如下:

    1、pub端配置,消息发送主要通过RedisTemplate的convertAndSend方法,官网样例如下:

    发送demo:

    // send message through connection RedisConnection con = ...
    byte[] msg = ...
    byte[] channel = ...
    con.publish(msg, channel); // send message through RedisTemplate
    RedisTemplate template = ...
    template.convertAndSend("hello!", "world");  //第一个参数是chanel名称, 第二个参数是发送的消息
    
    <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">  
            <property name="maxTotal" value="1000"/> <!-- 控制一个pool可分配多少个jedis实例 -->  
        	<property name="maxIdle" value="200" />   <!-- 控制一个pool最多有多少个状态为idle(空闲)的jedis实例 -->  
        	<property name="maxWaitMillis" value="2000" />  <!-- 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException -->  
        	<property name="testOnBorrow" value="true" /> <!-- 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的 -->  
        </bean>  
    	
    	<bean id="jedisConnFactory" 
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" 
        p:use-pool="true" p:hostName="192.168.1.116" p:port="6379" p:password="" p:pool-config-ref="poolConfig"/>
    
    	<!-- redis template definition -->
    	<bean id = "redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
          <property name="connectionFactory" ref="jedisConnFactory"/>
          <property name="keySerializer">
             <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"></bean>
          </property>
       </bean>
    

    2、sub端配置

        <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
    	  <constructor-arg ref="redisMessageListener"/>
    	</bean>
    	
    	<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
    	  <property name="connectionFactory" ref="jedisConnFactory"/>
    	  <property name="messageListeners">
    	    <map>
    	      <entry key-ref="messageListener">
    	      	<list>
                  <!--监听的chanel列表--> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="product_bank"/> </bean> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="product_trust"/> </bean> </list> </entry> </map> </property> </bean>
    com.tianwen.dcdp.redis.RedisMessageListener.java 代码如下:
    package com.tianwen.dcdp.redis;
    
    import java.io.Serializable;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.mongodb.core.MongoTemplate;
    import org.springframework.data.mongodb.core.query.Criteria;
    import org.springframework.data.mongodb.core.query.Query;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import com.tianwen.dcdp.common.CollectionEnum;
    import com.tianwen.dcdp.finance.task.BankProfTask;
    import com.tianwen.dcdp.finance.task.TrustProfTask;
    import com.tianwen.dcdp.pojo.FinanceBankWithBLOBs;
    import com.tianwen.dcdp.pojo.FinanceTrustWithBLOBs;
    import com.tianwen.dcdp.service.impl.TaskServiceImpl;
    
    @Component("redisMessageListener")
    //实际处理消息的类 public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate<Serializable, Serializable> redisTemplate; @Autowired private MongoTemplate mongoTemplate; private Logger logger = LoggerFactory.getLogger(RedisMessageListener.class); @Override public void onMessage(Message message, byte[] pattern) { byte[] bodyByte = message.getBody(); //消息内容 byte[] channelByte = message.getChannel(); //监听的chanel String channel = redisTemplate.getStringSerializer().deserialize(channelByte); //消息反序列变化时,需要与消息发送时的redisSerializer一致 String body = (String) redisTemplate.getValueSerializer().deserialize(bodyByte);
    logger.debug("received message : {} , from channel : {}", new Object[] {body, channel}); if(channel.equalsIgnoreCase("product_bank")) { FinanceBankWithBLOBs bank = mongoTemplate.findOne(Query.query(Criteria.where("_id").is(body)), FinanceBankWithBLOBs.class, CollectionEnum.BANK.name()); //计算产品特性 if(bank != null) { TaskServiceImpl.ext.execute(new BankProfTask(mongoTemplate, bank)); } } else if(channel.equalsIgnoreCase("product_trust")) { FinanceTrustWithBLOBs trust = mongoTemplate.findOne(Query.query(Criteria.where("_id").is(body)), FinanceTrustWithBLOBs.class, CollectionEnum.TRUST.name()); //计算产品特性 if(trust != null) { TaskServiceImpl.ext.execute(new TrustProfTask(mongoTemplate, trust)); } } } }

     ,到此,配置完毕,当监听的chanel有消息进入时,就会调用上述的onMessage方法。

    若实际消息处理类没有实现MessageListener接口,查看MessageListenerAdapter源码发现,可通过为其配置defaultListenerMethod参数,指定具体的消息处理方法名称。

  • 相关阅读:
    Linkerd 2.10(Step by Step)—将 GitOps 与 Linkerd 和 Argo CD 结合使用
    Linkerd 2.10(Step by Step)—多集群通信
    Linkerd 2.10(Step by Step)—使用 Kustomize 自定义 Linkerd 的配置
    Linkerd 2.10(Step by Step)—控制平面调试端点
    Linkerd 2.10(Step by Step)—配置超时
    Linkerd 2.10(Step by Step)—配置重试
    Linkerd 2.10(Step by Step)—配置代理并发
    本地正常运行,线上环境诡异异常原因集合
    Need to invoke method 'xxx' declared on target class 'yyy', but not found in any interface(s) of the exposed proxy type
    alpine 安装常用命令
  • 原文地址:https://www.cnblogs.com/yinz/p/9316167.html
Copyright © 2011-2022 走看看