发布订阅(pub/sub)是一种消息通信模式,主要的目的是解耦消息发布者和消息订阅者之间的耦合,这点和设计模式中的观察者模式比较相似。pub /sub不仅仅解决发布者和订阅者直接代码级别耦合也解决两者在物理部署上的耦合。redis作为一个pub/sub server,在订阅者和发布者之间起到了消息路由的功能。订阅者可以通过subscribe和psubscribe命令向redis server订阅自己感兴趣的消息类型,redis将消息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的消息时。订阅该消息类型的全部client都会收到此消息。这里消息的传递是多对多的。一个client可以订阅多个 channel,也可以向多个channel发送消息。
下面做个实验。这里使用两个不同的client一个是redis自带的redis-cli另一个是用java版客户端jedis写的。java代码如下:
package com.jd.redis.client;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub;
publicclass TestPubSubextends JedisPubSub{
@Override publicvoid onMessage(String channel, String message) { System.out.println("onMessage: channel["+channel+"], message["+message+"]"); }
@Override publicvoid onPMessage(String pattern, String channel, String message) { System.out.println("onPMessage: channel["+channel+"], message["+message+"]"); }
@Override publicvoid onSubscribe(String channel,int subscribedChannels) { System.out.println("onSubscribe: channel["+channel+"],"+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onUnsubscribe(String channel,int subscribedChannels) { System.out.println("onUnsubscribe: channel["+channel+"], "+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onPUnsubscribe(String pattern,int subscribedChannels) { System.out.println("onPUnsubscribe: pattern["+pattern+"],"+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onPSubscribe(String pattern,int subscribedChannels) { System.out.println("onPSubscribe: pattern["+pattern+"], "+ "subscribedChannels["+subscribedChannels+"]"); }
publicstaticvoid main(String[] args) { Jedis jr = null; try { jr = new Jedis("192.168.157.128", 6379, 0);//redis服务地址和端口号 TestPubSub sp = new TestPubSub(); sp.proceed(jr.getClient(),"news.share", "news.blog"); //sp.proceedWithPatterns(jr.getClient(), "news.*"); } catch (Exception e) { e.printStackTrace(); } finally{ if(jr!=null){ jr.disconnect(); } } } } |
代码就是用TestPubSub对象来订阅,对象中的那此onXXX方法监听到相应事件
1 首先运行此java程序;
onSubscribe: channel[news.share], subscribedChannels[1] onSubscribe: channel[news.blog], subscribedChannels[2] |
2 启动redis-cli
redis 127.0.0.1:6379> psubscribe news.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 |
3 再启动一个redis-cli用来发布两条消息:
redis 127.0.0.1:6379> publish news.share "share a link http://www.google.com" (integer) 2 redis 127.0.0.1:6379> publish news.blog "I post a blog" (integer) 2 |
4.查看两个订阅client的输出 此时java client打印如下内容:
onMessage: channel[news.share], message[share a link http://www.google.com] onMessage: channel[news.blog], message[I post a blog] |
另一个redis-cli输出如下:
1) "pmessage" 2) "news.*" 3) "news.share" 4) "share a link http://www.google.com" 1) "pmessage" 2) "news.*" 3) "news.blog" 4) "I post a blog" |
redis client使用psubscribe订阅了一个使用通配符的通道(*表示任意字符串),此订阅会收到所有与news.*匹配的通道消息。redis- cli打印到控制台的订阅成功消息表示使用psubscribe命令订阅news.*成功后,连接订阅通道总数为1。
当我们在一个client使用publish向news.share和news.blog通道发出两个消息后。redis返回的(integer) 2表示有两个连接收到了此消息。
看完一个小例子后应该对pub/sub功能有了一个感性的认识。需要注意的是当一个连接通过subscribe或者psubscribe订阅通道后就进入订阅模式。在这种模式除了再订阅额外的通道或者用unsubscribe或者punsubscribe命令退出订阅模式,就不能再发送其他命令。另外使用 psubscribe命令订阅多个通配符通道,如果一个消息匹配上了多个通道模式的话,会多次收到同一个消息。 redis的pub/sub还是有点太单薄(实现才用150行代码)。在安全,认证,可靠性这方便都没有太多支持