redis内置了发布/订阅功能,可以作为消息机制使用。所以这里主要使用Jedis的Publish/Subscribe功能。
1、使用Spring来配置Jedis连接池
<!-- pool配置 --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxActive" value="20" /> <property name="maxIdle" value="10" /> <property name="maxWait" value="1000" /> <property name="testOnBorrow" value="true" /> </bean> <!-- jedis pool配置 --> <bean id="jedisPool" class="redis.clients.jedis.JedisPool"> <constructor-arg index="0" ref="jedisPoolConfig" /> <constructor-arg index="1" value="127.0.0.1" /> <constructor-arg index="2" value="6389"/> <constructor-arg index="3" value="30000"/> <constructor-arg index="4" value="123456789"/> </bean>
2、编写Lister
要使用Jedis的Publish/Subscribe功能,必须编写对JedisPubSub的自己的实现,其中的函数的功能如下:
package demo; import redis.clients.jedis.JedisPubSub; public class RedisSubPubListener extends JedisPubSub { // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { //TODO:接收订阅频道消息后,业务处理逻辑 System.out.println(channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { // System.out.println(channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { // System.out.println(channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "=" + channel + "=" + message); }
}
3、实现订阅动能
Jedis有两种订阅模式:subsribe(一般模式设置频道)和psubsribe(使用模式匹配来设置频道)。不管是那种模式都可以设置个数不定的频道。订阅得到信息在将会lister的onMessage(…)方法或者onPMessage(…)中进行进行处理,这里我们只是做了简单的输出。
这里启动了订阅监听,线程将在这里被阻塞,订阅得到信息在lister的onMessage(…)方法或者onPMessage(…)方法中进行处理
@Autowired private JedisPool jedisPool; @Test public void testSub() { final RedisSubPubListener listener = new RedisSubPubListener(); Jedis jedis = jedisPool.getResource(); jedis.subscribe(listener, "channel"); }
//可以订阅多个频道
jedis.subscribe(listener, "foo", "bar");
//也用数组的方式设置多个频道
jedis.subscribe(listener, new String[]{"hello_foo","hello_test"});
//使用模式匹配的方式设置频道
jedis.psubscribe(listener, new String[]{"hello_*"});
4、实现发布端代码
发布消息只用调用Jedis的publish(…)方法即可。
@Test public void testPub(){ Jedis jedis = jedisPool.getResource(); jedis.publish("channel", "bar123"); System.out.println("==========已经发布消息==================="); }
注:ShardedJedis没有发布和订阅功能,需要还原为Jedis,获取方式如下(完全可以重新配置一份JedisPool):
ShardedJedis shardedJedis =shardedJedisPool.getResource(); Jedis[] jedisArray = new Jedis[]{}; jedisArray = shardedJedis.getAllShards().toArray(jedisArray); Jedis jedis = jedisArray[0];