zoukankan      html  css  js  c++  java
  • redis发布订阅Java代码实现

    Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:

    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.1.0</version>
    </dependency>

    由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:

    <listener>
    <listener-class>com.test.listener.InitListener</listener-class>
    </listener>

    一、订阅消息(InitListener实现)

    redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。

    public class InitListener implements ServletContextListener{
        private Logger logger = Logger.getLogger(InitListener.class);
        
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            logger.info("启动tomcat");// 连接redis
            Map<String, String> proMap = PropertyReader.getProperties();
            final String url = proMap.get("redis.host");
            final Integer port = Integer.parseInt(proMap.get("redis.port"));
            final ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
            final RedisSubListener redisSubListener = (RedisSubListener) classPathXmlApplicationContext.getBean("redisSubListener");
            // 为防止阻塞tomcat启动,开启线程执行
            new Thread(new Runnable(){  
                public void run(){  
                    // 连接redis,建立监听
                    Jedis jedis = null;
                    while(true){
                        //解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知
                        String[] channels = new String[] { "decodeResourceUpdateNtf", "tvSplitPlayRsp","tvSplitPlayStopRsp",
                                "planStartStatusNtf", "planStopStatusNtf", "pollStartStatusNtf", "pollStopStatusNtf",
                                "planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"};
                        try{
                            jedis = new Jedis(url,port);
                            logger.info("redis请求订阅通道");
                            jedis.subscribe(redisSubListener,channels);
                            logger.info("redis订阅结束");
                        }catch(JedisConnectionException e){
                            logger.error("Jedis连接异常,异常信息 :" + e);
                        }catch(IllegalStateException e){
                             logger.error("Jedis异常,异常信息 :" + e);
                        }
                        
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if(jedis != null){
                            jedis = null;
                        }
                    }
                }})
            .start();
        }

    最后在spring配置文件里接入以下配置:

    <!-- redis -->
         <bean id="redisMessageService" class="com.test.service.impl.RedisMessageServiceImpl" scope="singleton">
             <property name="webSocketService"><ref local="webSocketService" /></property>
             <property name="tvSplitStatusDao" ref="tvSplitStatusDao"></property>
         </bean>
         <bean id="redisSubListener" class="com.test.common.RedisSubListener" scope="singleton">
             <property name="redisMessageService"><ref local="redisMessageService" /></property>
         </bean>
    RedisMessageServiceImpl用于处理接收的redis消息。

    二、发布消息

    public class RedisPublishUtil {
        private Logger logger = Logger.getLogger(RedisPublishUtil.class);
        public static Jedis pubJedis;
        private static Map<String, String> proMap = PropertyReader.getProperties();
        private static final String redisPort = proMap.get("redis.port");
        private static String url = proMap.get("redis.host");
        private static final int port = Integer.parseInt(redisPort);
        
        public void setPubJedis(Jedis jedis) {
            RedisPublishUtil.pubJedis = jedis;
        }
        
        public Jedis getPubJedis() {
            if (pubJedis == null) {
                createJedisConnect();
            }
            // 返回对象
            return pubJedis;
        }
        
        public Jedis createJedisConnect(){
            // 连接redis
            logger.info("===创建连接jedis=====");
            try {
                pubJedis = new Jedis(url, port);
            } catch (JedisConnectionException e) {
                logger.error("Jedis连接异常,异常信息 :" + e.getMessage());
                try {
                    Thread.sleep(1000);
                    logger.info("发起重新连接jedis");
                    createJedisConnect();
                } catch (InterruptedException except) {
                    except.printStackTrace();
                }
            }
            // 返回对象
            return pubJedis;
        }
        //公共发布接口
        public void pubRedisMsg(String msgType,String msg){
            logger.info("redis准备发布消息内容:" + msg);
            try {
                this.getPubJedis().publish(msgType, msg);
    
            } catch (JedisConnectionException e) {
                logger.error("redis发布消息失败!", e);
                this.setPubJedis(null);
                logger.info("重新发布消息,channel="+msgType);
                pubRedisMsg(msgType, msg);
            }
        }
    
    }
    public class PropertyReader {
    
         private static Logger logger = Logger.getLogger(PropertyReader.class);
        
        /*
         * 获得数据库链接的配置文件
         */
        public static Map<String,String> getProperties(){
            logger.info("读取redis配置文件开始。。。");
            
             Properties prop = new Properties();     
            
             Map<String,String> proMap  = new HashMap<String,String>();
             
            try {
                 //读取属性文件redis.properties
                InputStream in= PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties");  
                
                prop.load(in);     ///加载属性列表
                Iterator<String> it=prop.stringPropertyNames().iterator();
                while(it.hasNext()){
                    String key=it.next();
                    proMap.put(key, prop.getProperty(key));
                }
                in.close();
                logger.info("读取redis配置文件成功。。。");
            } catch (Exception e) {
                logger.error("读取redis配置文件异常!", e);
                e.printStackTrace();
            }
            return proMap;
        }
    }
  • 相关阅读:
    CruiseControl.NET 三言两语
    在MFC程序中增加控制台
    VS2005 编译Release版本出现清单文件的错误
    内存泄露问题
    软件设计原则
    boost::shared_ptr 分析与实现
    corelDraw 的CDR格式解析
    13. 量词操作符—【LINQ标准查询操作符】
    SQL SERVER执行查询的顺序
    CSLA命令对象的简单封装
  • 原文地址:https://www.cnblogs.com/bingyimeiling/p/10309970.html
Copyright © 2011-2022 走看看