zoukankan      html  css  js  c++  java
  • Redis的消息订阅/发布 Utils工具类

    package cn.cicoding.utils;
    
    import org.json.JSONException;
    import org.json.JSONObject;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    import redis.clients.jedis.JedisPubSub;
    import redis.clients.jedis.Protocol;
    import redis.clients.jedis.exceptions.JedisConnectionException;
    
    class MQClient {
        public static final int MSG_REALTIME = 1;
        public static final int MSG_CACHED = 2;
        public static final int MSG_SERIALIZABLE = 3;
        public static final String NOTIFY_CHANNEL = "ClientNotify";
    
        private JedisPool pool;
        private boolean exit;
        private JedisPubSub pubsub;
    
        public MQClient(String ip, int port, JedisPubSub pubsub) {
            JedisPoolConfig config = new JedisPoolConfig();   
            config.setMaxWaitMillis(10 * 1000);  
            config.setMaxIdle(1000);  
            config.setTestOnBorrow(true);  
            pool = new JedisPool(config, ip, port, Protocol.DEFAULT_TIMEOUT, null);
            exit = false;
            this.pubsub = pubsub;
        }
    
        public boolean publish(String channels, String message, String content) {
            JSONObject obj = new JSONObject();
            boolean ret = false;
            Jedis jedis = null;
            try {
                jedis = pool.getResource();
                if (message != null) {
                    obj.put("message", message);
                }
                try {
                    JSONObject objCon = new JSONObject(content);
                    obj.put("content", objCon);
                } catch (JSONException e) {
                    obj.put("content", content);
                }
                String[] tmp = channels.split(";");
                for (String channel : tmp) {
                    try {
                        if (jedis.publish(channel, obj.toString()) > 0) {
                            ret = true;
                        }
                    } catch (Exception e) {
                        break;
                    }
                }
            } catch (JSONException e) {
            } finally {
                if (jedis != null){
                    jedis.close();
                }
            }
    
            return ret;
        }
    
        public boolean clientNotify(String clients, String message, String content, int type) {
            if (type == MSG_REALTIME) {
                return publish(clients, message, content);
            }
    
            boolean ret = false;
    
            try {
                JSONObject obj = new JSONObject();
                obj.put("clients", clients);
                obj.put("type", type);
                if (message != null) {
                    obj.put("message", message);
                }
                try {
                    JSONObject objCon = new JSONObject(content);
                    obj.put("content", objCon);
                } catch (JSONException e) {
                    obj.put("content", content);
                }
    
                if (pool.getResource().publish(NOTIFY_CHANNEL, obj.toString()) > 0) {
                    ret = true;
                }
            } catch (JSONException e) {
            }
    
            return ret;
        }
    
        public boolean setValue(String key, String value) {
            try {
                String response = pool.getResource().set(key, value);
                if (response != null && response.equals("OK")) {
                    return true;
                }
            } catch (JedisConnectionException e) {
                e.printStackTrace();
            }
    
            return false;
        }
    
        public String getValue(String key) {
            return pool.getResource().get(key);
        }
    
        public void subscribe(String... channels) {
            while (!exit) {
                try {
                    pool.getResource().subscribe(pubsub, channels);
                } catch (JedisConnectionException e) {
                    e.printStackTrace();
                    System.out.println("try reconnect");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

    Test测试函数:

    package cn.cicoding.utils;
    
    import redis.clients.jedis.JedisPubSub;
    
    public class Test extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(channel + ":" + message);
        }
    
        public static void main(String[] args) {
            MQClient client = new MQClient("127.0.0.1", 6379, new Test());
            client.setValue("abc", "java setted");
            System.out.println(client.getValue("abc"));
            System.out.println(client.clientNotify("nodeSubscriber", "message from java", "{\"debug\":0}", MQClient.MSG_REALTIME));
            client.subscribe("testInitiativePerception");
        }
    }
  • 相关阅读:
    HBase 高性能加入数据
    Please do not register multiple Pages in undefined.js 小程序报错的几种解决方案
    小程序跳转时传多个参数及获取
    vue项目 调用百度地图 BMap is not defined
    vue生命周期小笔记
    解决小程序背景图片在真机上不能查看的问题
    vue项目 菜单侧边栏随着右侧内容盒子的高度实时变化
    vue项目 一行js代码搞定点击图片放大缩小
    微信小程序进行地图导航使用地图功能
    小程序报错Do not have xx handler in current page的解决方法
  • 原文地址:https://www.cnblogs.com/zhaokejin/p/9630069.html
Copyright © 2011-2022 走看看