zoukankan      html  css  js  c++  java
  • 使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)

    前言:

    本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

    其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换

    1、jedis的消息队列方法简述

    1.1、发布消息方法

    (其中,channel是对应消息通道,message是对应消息体)

    jedis.publish(channel, message);

    1.2、监听消息方法

    (其中,jedisPubSub用于处理监听到的消息,channels是对应的通道

    jedis.subscribe(jedisPubSub, channels);

    2、发布消息

    /**
    	 * 从jedis连接池获取jedis操作实例
    	 * @return
    	 */
    	public static Jedis getJedis() {
    		return RedisPoolManager.getJedis();
    	}
    
    	/**
    	 * 推入消息到redis消息通道
    	 * 
    	 * @param String
    	 *            channel
    	 * @param String
    	 *            message
    	 */
    	public static void publish(String channel, String message) {
    		Jedis jedis = null;
    		try {
    			jedis = getJedis();
    			jedis.publish(channel, message);
    		} finally {
    			jedis.close();
    		}
    	}
    
    	/**
    	 * 推入消息到redis消息通道
    	 * 
    	 * @param byte[]
    	 *            channel
    	 * @param byte[]
    	 *            message
    	 */
    	public void publish(byte[] channel, byte[] message) {
    		Jedis jedis = null;
    		try {
    			jedis = getJedis();
    			jedis.publish(channel, message);
    		} finally {
    			jedis.close();
    		}
    
    	}

    3、监听消息

    3.1、监听消息主体方法

    /**
    	 * 监听消息通道
    	 * @param jedisPubSub - 监听任务
    	 * @param channels - 要监听的消息通道
    	 */
    	public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
    		Jedis jedis = null;
    		try {
    			jedis = getJedis();
    			jedis.subscribe(jedisPubSub, channels);
    		} finally {
    			jedis.close();
    		}
    	}
    
    	/**
    	 * 监听消息通道
    	 * @param jedisPubSub - 监听任务
    	 * @param channels - 要监听的消息通道
    	 */
    	public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
    		Jedis jedis = null;
    		try {
    			jedis = getJedis();
    			jedis.subscribe(jedisPubSub, channels);
    		} finally {
    			jedis.close();
    		}
    	}


    3.2、处理监听到的消息任务

    class Tasker implements Runnable {
    	private String[] channel = null;//监听的消息通道
    	private JedisPubSub jedisPubSub = null;//消息处理任务
    
    	public Tasker(JedisPubSub jedisPubSub, String ...channel) {
    		this.jedisPubSub = jedisPubSub;
    		this.channel = channel;
    	}
    
    	@Override
    	public void run() {
    		// 监听channel通道的消息
    		RedisMQ.subscribe(jedisPubSub, channel);
    	}
    
    }
    

    3.3、处理监听到的消息主体类实现

    package cn.eguid.livePushServer.redisManager;
    
    import java.util.Map;
    
    import org.json.JSONObject;
    
    import cc.eguid.livepush.PushManager;
    import redis.clients.jedis.JedisPubSub;
    
    public class RedisMQHandler extends JedisPubSub{
    	PushManager pushManager = null;
    
    	public RedisMQHandler(PushManager pushManager) {
    		super();
    		this.pushManager = pushManager;
    	}
    
    	@Override
    	// 接收到消息后进行分发执行
    	public void onMessage(String channel, String message) {
    		JSONObject jsonObj = new JSONObject(message);
    		System.out.println(channel+","+message);
    		if ("push".equals(channel)) {
    			Map<String,Object> map=jsonObj.toMap();
    			System.out.println("接收到一条推流消息,准备推流:"+map);
    //			String appName=pushManager.push(map);
    			//推流完成后还需要发布一个成功消息到返回队列
    			
    		} else if ("close".equals(channel)) {
    			String appName=jsonObj.getString("appName");
    			System.out.println("接收到一条关闭消息,准备关闭应用:"+appName);
    //			pushManager.closePush(appName);
    		}
    	}
    }
    

    4、测试消息队列发布和监听

    public static void main(String[] args) throws InterruptedException {
    		
    		PushManager pushManager= new PushManagerImpl();
    		Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
    		Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
    		t1.start();
    		t2.start();
    		
    		LivePushEntity livePushInfo=new LivePushEntity();
    		livePushInfo.setAppName("test1");
    		JSONObject json=new JSONObject(livePushInfo);
    		publish("push",json.toString());
    		publish("close", json.toString());
    		Thread.sleep(2000);
    		publish("push", json.toString());
    		publish("close",json.toString());
    		Thread.sleep(2000);
    		publish("push", json.toString());
    		publish("close",json.toString());
    
    	}


  • 相关阅读:
    使用 SQL Server 2008 数据类型-xml 字段类型参数进行数据的批量选取或删除数据
    启用Windows 7/2008 R2 XPS Viewer
    Office 2010培训资料
    WCF WebHttp Services in .NET 4
    ASP.NET MVC 2示例Tailspin Travel
    .NET 4.0 的Web Form和EF的例子 Employee Info Starter Kit (v4.0.0)
    连任 2010 年度 Microsoft MVP
    MIX 10 Session下载
    Microsoft Silverlight Analytics Framework
    Windows Azure入门教学
  • 原文地址:https://www.cnblogs.com/eguid/p/6821593.html
Copyright © 2011-2022 走看看