zoukankan      html  css  js  c++  java
  • java-Redis集合

    引用包:jedis-3.0.1.jar、commons-pool2-2.6.0.jar

    一、从Redis集合中实时获取数据:

    连接Redis

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public static Lock lock = new ReentrantLock();
    
    /**
         * 连接Redis
         * @param conferenceId
         * @return
         */
        public String startRedis(String topicId) {
            textMessage = "";
            String result = "";
            try {
                JedisPoolConfig config = new JedisPoolConfig();
                config.setTimeBetweenEvictionRunsMillis(30000);
                config.setMaxWaitMillis(10 * 1000);
                config.setMaxIdle(1000);
                config.setTestOnBorrow(true);
                JedisPool jedisPool = new JedisPool(config, redisIpYJ, Integer.parseInt(redisPortYJ),10000);// 连接redis服务端
                result = "连接Redis成功";
                lock.lock();
                try {
                    Thread thread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            getRecordTextNew_Redis(jedisPool,topicId);
                        }
                    });
                    thread.start();
                } finally {
                    lock.unlock();
                }        
            } catch (Exception e) {
                result = "连接Redis失败:" + e.getMessage();
            }
            return result;
        }

    实时获取数据

    public static Boolean isSelectRedis = false;//是否继续查询Redis
    
        /**
         * 从Redis实时获取语音记录文本(党组会)
         * @param topicId
         */
        public void getRecordTextNew_Redis(JedisPool jedisPool,String topicId) {
            Jedis jedis = null;
            while (isSelectRedis) {
            try {
                    jedis = jedisPool.getResource();   //取出一个连接
                    Set<String> results = jedis.zrange("asr:text:"+topicId,0,-1);
                    for (String result: results) {
                        //TODO消费result
                        if (StringUtils.isNotEmpty(result)) {
                            JSONObject resultMsg = JSONObject.parseObject(result);
                            String text = resultMsg.getString("result");
                            System.out.println("消息text:"+text);
                            String pgs = "1";
                            String micName = resultMsg.getString("roleName");
                            String micId = resultMsg.getString("role");
                            String uId = resultMsg.getString("uid");//段落ID
                            if (StringUtils.isNotEmpty(text)) {
                                String dataText = "<b>" + micName + ":</b>" + text;
                                String dataText2 = "<div id=""+ uId +""><b>" + micName + ":</b>" + text+"</div>";
                                textMap.put(uId, dataText2);
                                System.out.println("消息dataText:"+dataText);
                                JSONObject textObj = new JSONObject();
                                textObj.put("dataText", dataText);
                                textObj.put("dataPgs", pgs);
                                textObj.put("dataUId", uId);
                                try {
                                    Thread.sleep(400);
                                } catch (InterruptedException e) {
                                    // TODO 自动生成的 catch 块
                                    e.printStackTrace();
                                }
                                ConfWebSocketService.sendMessage(textObj.toJSONString(), "2");//向页面发送消息
                            }
                        }
                    }
                    String[] strResults = (String[])(results.toArray(new String[results.size()]));
                    if (strResults.length > 0) {
                        //TODO 移除消费掉的数据
                        jedis.zrem("asr:text:"+topicId, strResults);
                    }
                    Thread.sleep(300);            
            } catch (Exception e) {
                if (jedis != null) {
                    jedis.close();
                }
                e.printStackTrace();
            }finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
            }
        }

    二、通过Redis订阅消息:

    package net.nblh.utils.common;
    
    import java.util.Set;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * 建立订阅者,订阅者去订阅频道(mychannel)
     * @author lijd
     *
     */
    public class GetSpeechRecognition_YJ_Sub extends Thread{
        private final JedisPool jedisPool;
        private final GetSpeechRecognition_YJ_Msg msgListener = new GetSpeechRecognition_YJ_Msg();
        private final String channel = "db0";//"mychannel";
        
        public GetSpeechRecognition_YJ_Sub(JedisPool jedisPool) {
            super("GetSpeechRecognition_YJ_Sub");
            this.jedisPool = jedisPool;
        }
        
        @Override
        public void run() {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();   //取出一个连接
                Set<String> result = jedis.zrange("asr:text:1112",0,-1);
                //jedis.subscribe(msgListener, channel); //通过subscribe的api去订阅,参数是订阅者和频道名
                //注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码
                //这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行
                System.out.println("继续执行后续代码。。。");
                
            } catch (Exception e) {
                if (jedis != null) {
                    jedis.close();
                }
                e.printStackTrace();
            }finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }
    }
    package net.nblh.utils.common;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * //建立发布者,通过频道(mychannel)发布消息
     * @author lijd
     *
     */
    public class GetSpeechRecognition_YJ_Pub extends Thread{
        private final JedisPool jedisPool;
        
        public GetSpeechRecognition_YJ_Pub(JedisPool jedisPool) {
            this.jedisPool = jedisPool;
        }
        
        @Override
        public void run() {
            while (true) {
                Jedis jedis = null;
                try {
                    Thread.sleep(1000);
                    jedis = jedisPool.getResource();//连接池中取出一个连接
                    String line = "fabuxiaoxi:";
                    if (!"quit".equals(line)) {
                        jedis.publish("mychannel", line);//从通过mychannel 频道发布消息
                        System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line));
                    }else {
                        break;
                    }
                    if (jedis != null) {
                        jedis.close();
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }            
            }
        }
    }
    package net.nblh.utils.common;
    
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * 建立消息监听类,并重写了JedisPubSub的一些相关方法
     * @author lijd
     *
     */
    public class GetSpeechRecognition_YJ_Msg extends JedisPubSub{
        public GetSpeechRecognition_YJ_Msg(){}
        
        @Override
        public void onMessage(String channel, String message) {       
            //收到消息会调用
            System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
            //this.unsubscribe();
        }
     
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {    
            //订阅频道会调用
            System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d",
            channel, subscribedChannels));
        }
     
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {   
            //取消订阅会调用
            System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d",
                    channel, subscribedChannels));
        }
    }
  • 相关阅读:
    Java 项目运用个人看法(简写)
    windows 搭建Solr连接数据库
    总结2016年,计划2017
    如何解决,自己认为特别难的问题?(文摘)
    spring -quartz 定时任务多任务配置
    (转) java Timer 定时每天凌晨1点执行任务
    spring多数据源切换,写入报错的问题
    如何合理和有效的进行数据库设计
    Main方法定点执行线程任务
    莫辜负当下,莫悔恨过去,莫打扰错过的人
  • 原文地址:https://www.cnblogs.com/lijianda/p/10371377.html
Copyright © 2011-2022 走看看