前言:
本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar
其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换
1、jedis的消息队列方法简述
1.1、发布消息方法
(其中,channel是对应消息通道,message是对应消息体)
jedis.publish(channel, message);
1.2、监听消息方法
(其中,jedisPubSub用于处理监听到的消息,channels是对应的通道)
jedis.subscribe(jedisPubSub, channels);
2、发布消息
/** * 从jedis连接池获取jedis操作实例 * @return */ public static Jedis getJedis() { return RedisPoolManager.getJedis(); } /** * 推入消息到redis消息通道 * * @param String * channel * @param String * message */ public static void publish(String channel, String message) { Jedis jedis = null; try { jedis = getJedis(); jedis.publish(channel, message); } finally { jedis.close(); } } /** * 推入消息到redis消息通道 * * @param byte[] * channel * @param byte[] * message */ public void publish(byte[] channel, byte[] message) { Jedis jedis = null; try { jedis = getJedis(); jedis.publish(channel, message); } finally { jedis.close(); } }
3、监听消息
3.1、监听消息主体方法
/** * 监听消息通道 * @param jedisPubSub - 监听任务 * @param channels - 要监听的消息通道 */ public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { Jedis jedis = null; try { jedis = getJedis(); jedis.subscribe(jedisPubSub, channels); } finally { jedis.close(); } } /** * 监听消息通道 * @param jedisPubSub - 监听任务 * @param channels - 要监听的消息通道 */ public static void subscribe(JedisPubSub jedisPubSub, String... channels) { Jedis jedis = null; try { jedis = getJedis(); jedis.subscribe(jedisPubSub, channels); } finally { jedis.close(); } }
3.2、处理监听到的消息任务
class Tasker implements Runnable { private String[] channel = null;//监听的消息通道 private JedisPubSub jedisPubSub = null;//消息处理任务 public Tasker(JedisPubSub jedisPubSub, String ...channel) { this.jedisPubSub = jedisPubSub; this.channel = channel; } @Override public void run() { // 监听channel通道的消息 RedisMQ.subscribe(jedisPubSub, channel); } }
3.3、处理监听到的消息主体类实现
package cn.eguid.livePushServer.redisManager; import java.util.Map; import org.json.JSONObject; import cc.eguid.livepush.PushManager; import redis.clients.jedis.JedisPubSub; public class RedisMQHandler extends JedisPubSub{ PushManager pushManager = null; public RedisMQHandler(PushManager pushManager) { super(); this.pushManager = pushManager; } @Override // 接收到消息后进行分发执行 public void onMessage(String channel, String message) { JSONObject jsonObj = new JSONObject(message); System.out.println(channel+","+message); if ("push".equals(channel)) { Map<String,Object> map=jsonObj.toMap(); System.out.println("接收到一条推流消息,准备推流:"+map); // String appName=pushManager.push(map); //推流完成后还需要发布一个成功消息到返回队列 } else if ("close".equals(channel)) { String appName=jsonObj.getString("appName"); System.out.println("接收到一条关闭消息,准备关闭应用:"+appName); // pushManager.closePush(appName); } } }
4、测试消息队列发布和监听
public static void main(String[] args) throws InterruptedException { PushManager pushManager= new PushManagerImpl(); Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push")); Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close")); t1.start(); t2.start(); LivePushEntity livePushInfo=new LivePushEntity(); livePushInfo.setAppName("test1"); JSONObject json=new JSONObject(livePushInfo); publish("push",json.toString()); publish("close", json.toString()); Thread.sleep(2000); publish("push", json.toString()); publish("close",json.toString()); Thread.sleep(2000); publish("push", json.toString()); publish("close",json.toString()); }