zoukankan      html  css  js  c++  java
  • Redis实战——Redis的pub/Sub(订阅与发布)在java中的实现

    借鉴:https://blog.csdn.net/canot/article/details/51938955

    1.什么是pub/sub

    Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 
    同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。

    2.Redis pub/sub的实现

    Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

    客户端1订阅CCTV1:

    127.0.0.1:6379> subscribe CCTV1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "CCTV1"
    3) (integer) 1
     

    客户端2订阅CCTV1和CCTV2:

    127.0.0.1:6379> subscribe CCTV1 CCTV2
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "CCTV1"
    3) (integer) 1
    1) "subscribe"
    2) "CCTV2"
    3) (integer) 2

    此时这两个客户端分别监听这指定的频道。现在另一个客户端向服务器推送了关于这两个频道的信息。

    127.0.0.1:6379> publish CCTV1 "cctv1 is good"
    (integer) 2
    //返回2表示两个客户端接收了次消息。被接收到消息的客户端如下所示。
    1) "message"
    2) "CCTV1"
    3) "cctv1 is good"
    ----
    1) "message"
    2) "CCTV1"
    3) "cctv1 is good"

    如上的订阅/发布也称订阅发布到频道(使用publish与subscribe命令),此外还有订阅发布到模式(使用psubscribe来订阅一个模式)

    订阅CCTV的全部频道

    127.0.0.1:6379> psubscribe CCTV*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "CCTV*"
    3) (integer) 1

    当依然先如上推送一个CCTV1的消息时,该客户端正常接收。

    3.Pub/Sub在java中的实现

    导入Redis驱动:

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>

    Redis驱动包提供了一个抽象类:JedisPubSub…继承这个类就完成了对客户端对订阅的监听。示例代码:

    /**
     * redis发布订阅消息监听器
     * @ClassName: RedisMsgPubSubListener 
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月22日 上午10:05:35  
     *
     */
    public class RedisMsgPubSubListener extends JedisPubSub {
        private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
        
        @Override
        public void unsubscribe() {
            super.unsubscribe();
        }
     
        @Override
        public void unsubscribe(String... channels) {
            super.unsubscribe(channels);
        }
     
        @Override
        public void subscribe(String... channels) {
            super.subscribe(channels);
        }
     
        @Override
        public void psubscribe(String... patterns) {
            super.psubscribe(patterns);
        }
     
        @Override
        public void punsubscribe() {
            super.punsubscribe();
        }
     
        @Override
        public void punsubscribe(String... patterns) {
            super.punsubscribe(patterns);
        }
     
        @Override
        public void onMessage(String channel, String message) {
            logger.info("onMessage: channel[{}], message[{}]",channel, message);
        }
     
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            logger.info("onPMessage: pattern[{}], channel[{}], message[{}]", pattern, channel, message);
        }
     
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            logger.info("onSubscribe: channel[{}], subscribedChannels[{}]", channel, subscribedChannels);
        }
     
        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            logger.info("onPUnsubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
        }
     
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
            logger.info("onPSubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
        }
     
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            logger.info("channel:{} is been subscribed:{}", channel, subscribedChannels);
        }
    }

    如上所示,抽象类中存在的方法。分别表示

    • 监听到订阅模式接受到消息时的回调 (onPMessage)
    • 监听到订阅频道接受到消息时的回调 (onMessage )
    • 订阅频道时的回调( onSubscribe )
    • 取消订阅频道时的回调( onUnsubscribe )
    • 订阅频道模式时的回调 ( onPSubscribe )
    • 取消订阅模式时的回调( onPUnsubscribe )

    运行我们刚刚编写的类:

    订阅者

    /**
     * 订阅者
     * @ClassName: RedisSubTest 
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月23日 下午2:59:42  
     *
     */
    public class RedisSubTest {
        @Test
        public void subjava() {
            System.out.println("订阅者 ");
            Jedis jr = null;
            try {
                jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端口号
                RedisMsgPubSubListener sp = new RedisMsgPubSubListener();
                // jr客户端配置监听两个channel
                jr.subscribe(sp, "news.share", "news.blog");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (jr != null) {
                    jr.disconnect();
                }
            }
        }
    }

    发布者

    /**
     * 发布者
     * @ClassName: RedisPubTest 
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月23日 下午2:59:25  
     *
     */
    public class RedisPubTest {
        @Test
        public void pubjava() {
            System.out.println("发布者 ");
            Jedis jr = null;
            try {
                jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端口号
                // jr客户端配置监听两个channel
                jr.publish( "news.share", "新闻分享");
                jr.publish( "news.blog", "新闻博客");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (jr != null) {
                    jr.disconnect();
                }
            }
        }
    }

    从代码中我们不难看出,我们声明的一个redis链接在设置监听后就可以执行一些操作,例如发布消息,订阅消息等。。。 
    当运行上述代码后会在控制台输出:

    此时当在有客户端向new.share或者new.blog通道publish消息时,onMessage方法即可被相应。(jedis.publish(channel, message))。

    4.Pub/Sub在Spring中的实践 
    导入依赖jar

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
        <version>2.0.8.RELEASE</version>
    </dependency>

     核心消息监听器

    /**
     * redis发布订阅消息监听器
     * @ClassName: RedisMsgPubSubListener 
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月22日 上午10:05:35  
     *
     */
    public class RedisMsgPubSubListener implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
        
        @Override
        public void onMessage( final Message message, final byte[] pattern ) {
            RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
            // message.getBody()是Redis的值,需要用redis的valueSerializer反序列化
            logger.info("Message receive-->pattern:{},message: {},{}", new String(pattern),
                    serializer.deserialize(message.getBody()),
                    redisTemplate.getStringSerializer().deserialize(message.getChannel()));
            logger.info(message.toString());
            JSONObject json = JSONObject.parseObject(serializer.deserialize(message.getBody()).toString());
            String cutomerId = json.getString("cutomerId");
            
            //可以与WebSocket结合使用,解决分布式服务中,共享Session
            if(StringUtils.isNotEmpty(cutomerId)) {
                logger.info("cutomerId: {},消息:{}", cutomerId, message.toString());
            }else {
                logger.info("cutomerId 为空,无法推送给对应的客户端,消息:{}", message.toString());
            }
        }
    }

    现在我们在获取RedisTemplate,并给WEB_SOCKET:LOTTERY这个channel publish数据。

    /**
     * 发布者
     * @ClassName: RedisMsgPubClient 
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月23日 下午3:59:33  
     *
     */
    @Controller
    @RequestMapping(value="/redisMsgPubClientBySpring")
    public class RedisMsgPubClient {
        private Logger logger = LoggerFactory.getLogger(RedisMsgPubClient.class);
        
        @Autowired
        private RedisTemplate<Object,Object> redisTemplate;
        
        @RequestMapping
        @ResponseBody
        public String pubMsg(HttpServletRequest request, HttpServletResponse response) {
            String cutomerId = request.getParameter("cutomerId").toString();
            String msg = request.getParameter("msg").toString();
            logger.info("发布消息:{}", request.getParameter("msg").toString());
            JSONObject json = new JSONObject();
            json.put("cutomerId", cutomerId);
            json.put("msg", msg);
            redisTemplate.convertAndSend("WEB_SOCKET:LOTTERY", json);
            return "成功";
        }
    }

    最后一步reids的配置

    <?xml version="1.0" encoding="UTF-8" ?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/aop
                            http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/tx
                            http://www.springframework.org/schema/tx/spring-tx.xsd
                            http://www.springframework.org/schema/mvc 
                            http://www.springframework.org/schema/mvc/spring-mvc.xsd
                            http://www.springframework.org/schema/cache   
                            http://www.springframework.org/schema/cache/spring-cache.xsd"
        default-autowire="byName">
        
        <description>redis 相关类 Spring 托管</description>
        
        <!-- 开启缓存 -->
        <cache:annotation-driven />
        <bean name="springCacheAnnotationParser" class="org.springframework.cache.annotation.SpringCacheAnnotationParser"></bean>
        <bean name="annotationCacheOperationSource" class="org.springframework.cache.annotation.AnnotationCacheOperationSource">
            <constructor-arg>
                <array>
                    <ref bean="springCacheAnnotationParser"/>
                </array>
            </constructor-arg>
        </bean>
        <bean name="cacheInterceptor" class="org.springframework.cache.interceptor.CacheInterceptor">
            <property name="cacheOperationSources" ref="annotationCacheOperationSource" />
        </bean>
        <bean class="org.springframework.cache.interceptor.BeanFactoryCacheOperationSourceAdvisor">
            <property name="cacheOperationSource" ref="annotationCacheOperationSource" />
            <property name="advice" ref="cacheInterceptor" />
            <property name="order" value="2147483647" />
        </bean>
        
        <!--载入 redis 配置文件-->
        <context:property-placeholder location="classpath:redis.properties" ignore-unresolvable="true"/>
    
        
        <!-- 配置JedisConnectionFactory -->
        <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
            <property name="hostName" value="${redis.host}"/>
            <property name="port" value="${redis.port}"/>
            <property name="password" value="${redis.pass}"/>
            <property name="database" value="${redis.dbIndex}"/>
            <property name="poolConfig" ref="jedisPoolConfig"/>
            <!-- <constructor-arg name="sentinelConfig" ref="redisSentinelConfiguration" /> -->
            <constructor-arg name="poolConfig" ref="jedisPoolConfig" />
        </bean>
        <!-- 配置 JedisPoolConfig 实例 -->
        <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
            <!-- 最大连接数 -->
            <property name="maxTotal" value="${redis.pool.maxActive}"/>
            <!-- 最大空闲时间 -->
            <property name="maxIdle" value="${redis.pool.maxIdle}"/>
            <!-- 最小空闲时间 -->
            <property name="minIdle" value="${redis.pool.minIdle}"/>
            <!-- 获得链接时的最大等待毫秒数,小于0:阻塞不确定时间,默认-1 -->
            <property name="maxWaitMillis" value="${redis.pool.maxWaitMillis}"/>
            <!-- 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的 -->
            <property name="testOnBorrow" value="${redis.pool.testOnBorrow}"/>
            <!-- 在空闲时检查有效性,默认false -->
            <property name="testWhileIdle" value="${redis.pool.testOnBorrow}"/>
            <!-- 表示idle object evitor两次扫描之间要sleep的毫秒数 -->
            <property name="timeBetweenEvictionRunsMillis" value="${redis.pool.timeBetweenEvictionRunsMillis}" />
            <!-- 表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义 -->
            <property name="minEvictableIdleTimeMillis" value="${redis.pool.minEvictableIdleTimeMillis}" />
            <!-- 表示idle object evitor每次扫描的最多的对象数 -->
            <property name="numTestsPerEvictionRun" value="${redis.pool.numTestsPerEvictionRun}" />
        </bean>
        <!-- 配置哨兵 -->
        <!-- <bean id="redisSentinelConfiguration" class="org.springframework.data.redis.connection.RedisSentinelConfiguration">
            <property name="master">
                <bean class="org.springframework.data.redis.connection.RedisNode">
                    <property name="name" value="mymaster" />
                </bean>
            </property>
            <property name="sentinels">
                <set>
                    <bean class="org.springframework.data.redis.connection.RedisNode">
                        <constructor-arg name="host" value="10.252.2.137" />
                        <constructor-arg name="port" value="26391" />
                    </bean>
                    <bean class="org.springframework.data.redis.connection.RedisNode">
                        <constructor-arg name="host" value="10.252.2.137" />
                        <constructor-arg name="port" value="26392" />
                    </bean>
                    <bean class="org.springframework.data.redis.connection.RedisNode">
                        <constructor-arg name="host" value="10.252.2.137" />
                        <constructor-arg name="port" value="26393" />
                    </bean>
                </set>
            </property>
        </bean> -->
        
        <!-- SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略。
            StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。 
            RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。 
            就是因为序列化策略的不同,即使是同一个key用不同的Template去序列化,结果是不同的。所以根据key去删除数据的时候就出现了删除失败的问题。
        -->
        <!-- redis 序列化策略 ,通常情况下key值采用String序列化策略, -->
        <!-- 如果不指定序列化策略,StringRedisTemplate的key和value都将采用String序列化策略; -->
        <!-- 但是RedisTemplate的key和value都将采用JDK序列化 这样就会出现采用不同template保存的数据不能用同一个template删除的问题 -->
        <!-- 配置RedisTemplate -->
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
            <property name="connectionFactory" ref="jedisConnectionFactory" />
            <property name="keySerializer">
                <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
            </property>
            <!-- <property name="valueSerializer" ref="stringRedisSerializer" /> value值如果是对象,这不能用stringRedisSerializer,报类型转换错误-->
            <!-- <property name="valueSerializer">
                hex(十六进制)的格式
                <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
            </property> -->
            <property name="valueSerializer" >
                <!-- json的格式,要注意实体属性名有没有‘_’,如user_name,有的话要加注解 ,@JsonNaming会将userName处理为user_name
                       @JsonSerialize
                    @JsonNaming(PropertyNamingStrategy.LowerCaseWithUnderscoresStrategy.class) 
                   -->
                <bean class="org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer" />
            </property>
        </bean>
    
        <!-- spring自己的缓存管理器,这里定义了缓存位置名称 ,即注解中的value -->
        <bean id="cacheManager" class="org.springframework.cache.support.SimpleCacheManager">
            <property name="caches">
                <set>
                    <!-- 这里可以配置多个redis -->
                    <bean
                        class="org.springframework.cache.concurrent.ConcurrentMapCacheFactoryBean">
                        <property name="name" value="localDefault" /><!-- 缺省本地缓存 -->
                    </bean>
                    <bean
                        class="org.springframework.cache.concurrent.ConcurrentMapCacheFactoryBean">
                        <property name="name" value="WSLocalTableCache" /><!-- 单表配置 -->
                    </bean>
                    <!-- 本地缓存2:管理缓存失效 -->
                    <bean class="com.only.mate.utils.RedisCache">
                        <property name="name" value="localTest" /><!-- 本地缓存名 -->
                        <property name="timeout" value="10" />  <!-- seconds -->
                        <property name="removeTimeout" value="true" /> <!-- 超时移除 -->
                    </bean>
                </set>
            </property>
        </bean>
        
        <!-- 配置redis发布订阅模式 -->
        <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
            <property name="connectionFactory" ref="jedisConnectionFactory" />
            <property name="messageListeners">
                <map>
                    <entry key-ref="messageListenerAdapter">
                        <bean class="org.springframework.data.redis.listener.ChannelTopic">
                            <constructor-arg value="WEB_SOCKET:LOTTERY"></constructor-arg>
                        </bean>
                    </entry>
                </map>
            </property>
        </bean>
    
        <bean id="messageListenerAdapter" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
            <constructor-arg ref="redisMsgPubSubListener"></constructor-arg>
        </bean>
    
        <bean id="redisMsgPubSubListener" class="com.redis.pubsub.spring.RedisMsgPubSubListener"></bean>
    </beans>

    如上的配置即配置了对Redis的链接。在配置类中的将ChannelTopic加入IOC容器。则在Spring启动时会在一个RedisTemplate(一个对Redis的链接)中设置的一个channel,即WEB_SOCKET:LOTTERY。 
    在上述配置中,RedisMsgPubSubListener是我们生成的,这个类即为核心监听类,RedisTemplate接受到数据如何处理就是在该类中处理的。

    附加上Java配置

    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.cache.CacheManager;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cache.interceptor.KeyGenerator;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.cache.RedisCacheManager;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.Topic;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.gfss.common.listener.CustomRedisMsgPubSubListener;
    
    @Configuration
    @EnableCaching
    public class RedisConfiguration extends CachingConfigurerSupport {
    
        @Override
        @Bean
        public KeyGenerator keyGenerator() {
            return new KeyGenerator() {
                @Override
                public Object generate(Object target, Method method, Object... params) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(target.getClass().getName());
                    sb.append(method.getName());
                    for (Object obj : params) {
                        sb.append(obj.toString());
                    }
                    return sb.toString();
                }
            };
    
        }
    
        @Bean
        public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) {
            return new RedisCacheManager(redisTemplate);
        }
    
        @Bean
        public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
            StringRedisTemplate template = new StringRedisTemplate(factory);
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
                    Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public RedisTemplate<String, Object> objectRedisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
            template.setConnectionFactory(factory);
            template.setKeySerializer(new StringRedisSerializer());
            return template;
        }
    
        /************** 配置redis发布订阅模式 *******************************/
        @Bean
        public CustomRedisMsgPubSubListener customRedisMsgPubSubListener() {
            return new CustomRedisMsgPubSubListener();
        }
    
        @Bean
        public MessageListenerAdapter messageListenerAdapter(MessageListener messageListener) {
            return new MessageListenerAdapter(messageListener);
        }
    
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                MessageListenerAdapter messageListenerAdapter) {
    
            List<Topic> collection = new ArrayList<Topic>();
            // 普通订阅,订阅具体的频道
            ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET:LOTTERY");
            collection.add(channelTopic);
    
            /*// 模式订阅,支持模式匹配订阅,*为模糊匹配符
            PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
            collection.add(PatternTopic);
            // 匹配所有频道
            PatternTopic PatternTopicAll = new PatternTopic("*");
            collection.add(PatternTopicAll);*/
    
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection);
            return redisMessageListenerContainer;
        }
    }

    访问页面去调用发布者

    http://localhost:8088/redis/redisMsgPubClientBySpring?cutomerId=all&msg=你们好

    订阅收到的消息

    5.拓展开发

      在分布式服务中,可以结合WebSocket与Redis的发布订阅模式相结合,解决session不能共享的问题。

      当业务处理完成之后,通过Redis的发布订阅模式,发布消息到每个订阅该频道的服务节点,然后由每个服务节点通过key寻找自己内存缓存中的session,然后找到了就向客户端推消息,否则不处理。

    Dubbo只能传输可序列化的对象,Redis只能缓存可序列化的对象,Dubbo基于网络流(TCP),Redis缓存的数据要存储在硬盘上,而WebSocketSession是没有实现序列化的,所以不能跨服务传递WebSocketSession,也不能使用Redis存储WebSocketSession,只能自定义一块缓存区。

    6.动态订阅频道

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.RedisSerializer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.gfss.common.websocket.CustomWebSocketHandler;
    
    /**
     * redis发布订阅消息监听器
     * @ClassName: RedisMsgPubSubListener
     * @Description: TODO
     * @author OnlyMate
     * @Date 2018年8月22日 上午10:05:35
     *
     */
    public class CustomRedisMsgPubSubListener implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(CustomRedisMsgPubSubListener.class);
    
        @Autowired
        private CustomWebSocketHandler customWebSocketHandler;
        @Autowired
        private ApplicationContext applicationContext;
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
        /**
         * 实例:
         *    JSONObject json = new JSONObject();
         *    json.put("cutomerId", notifyResult.getResult());
         *    json.put("resultCode", map.get("resultCode"));
         *    //向redis发布消息
         *    redisTemplate.convertAndSend(channelName, json);
         * @param message
         * @param pattern
         * @Throws
         * @Author: chetao
         * @Date: 2019年1月8日 下午10:40:21
         * @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])
         */
        @Override
        public void onMessage( final Message message, final byte[] pattern ) {
            RedisSerializer<?> serializer = redisTemplate.getKeySerializer();
            logger.info("Message receive-->pattern:{},message: {},{}", serializer.deserialize(pattern),
                    serializer.deserialize(message.getBody()), serializer.deserialize(message.getChannel()));
            if ("WEB_SOCKET:PAY_NOTIFY".equals(serializer.deserialize(message.getChannel()))) {
                RedisMessageListenerContainer redisMessageListenerContainer = applicationContext
                        .getBean("redisMessageListenerContainer", RedisMessageListenerContainer.class);
                MessageListenerAdapter messageListenerAdapter = applicationContext.getBean("messageListenerAdapter",
                        MessageListenerAdapter.class);
                /*List<Topic> collection = new ArrayList<Topic>();
                // 动态添加订阅主题
                ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY");
                collection.add(channelTopic);
                PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
                collection.add(PatternTopic);
                redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection);*/
                // 动态添加订阅主题
                ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY");
                redisMessageListenerContainer.addMessageListener(messageListenerAdapter, channelTopic);
                PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
                redisMessageListenerContainer.addMessageListener(messageListenerAdapter, PatternTopic);
            }
    
            JSONObject json = JSONObject.parseObject(message.toString());
            customWebSocketHandler.sendMessage(json.toJSONString());
        }
    }

    上面两种动态订阅频道的方式都可以,本人已测试是可行的,可以结合自己的业务去拓展,如:临时订阅频道后退订频道

  • 相关阅读:
    事件-(DOM标准事件模型)
    BOM-01 (BOM的对象)
    DOM-04 (DOM常用对象)
    DOM-03 (修改2,添加删除)
    DOM-02 (查找2,修改1)
    DOM-01 (DOM基础,DOM树,查找元素1)
    boot-02 (组件<(水平/胶囊/选项卡)导航,折叠,卡片,手风琴,折叠导航栏,媒体对象,焦点轮播图,巨幕,徽章>)
    boot-01 (栅格布局/表单样式/组件1)
    AI deeplab
    AI 强化学习
  • 原文地址:https://www.cnblogs.com/onlymate/p/9524960.html
Copyright © 2011-2022 走看看