放出消息
第二个经典模式是单向数据分发,服务器推送更新到一组客户端。让我们看一个推送天气情况变化的例子,包含地区编码、温度、和相对湿度。我们会生成随机值来模拟真实气象站。
这是服务器代码,这个程序我们使用5556端口。
wuserver: Weather update server in C
// // Weather update server // Binds PUB socket to tcp://*:5556 // Publishes random weather updates // #include "zhelpers.h" int main (void) { // Prepare our context and publisher void *context = zmq_ctx_new (); void *publisher = zmq_socket (context, ZMQ_PUB); int rc = zmq_bind (publisher, "tcp://*:5556"); assert (rc == 0); rc = zmq_bind (publisher, "ipc://weather.ipc"); assert (rc == 0); // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { // Get values that will fool the boss int zipcode, temperature, relhumidity; zipcode = randof (100000); temperature = randof (215) - 80; relhumidity = randof (50) + 10; // Send message to all subscribers char update [20]; sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); s_send (publisher, update); } zmq_close (publisher); zmq_ctx_destroy (context); return 0; }
更新流既无开始也无结束,像一个永不结束的天气预报。
图 4 – 发布-订阅
这是客户端程序,监听更新流并捕获符合特定地区编码的所有消息,默认为纽约市因为那是个冒险的好地方:
wuclient: Weather update client in C
// // Weather update client // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode // #include "zhelpers.h" int main (int argc, char *argv []) { void *context = zmq_ctx_new (); // Socket to talk to server printf ("Collecting updates from weather server…\n"); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5556"); assert (rc == 0); // Subscribe to zipcode, default is NYC, 10001 char *filter = (argc > 1) ? argv [1] : "10001 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { char *string = s_recv (subscriber); int zipcode, temperature, relhumidity; sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity); total_temp += temperature; free (string); } printf ("Average temperature for zipcode '%s' was %dF\n", filter, (int) (total_temp / update_nbr)); zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }
注意当你使用一个订阅套接字时你必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅,就像这段代码中那样。如果你不设置任何订阅,就得不到任何消息。这是初学者的常见错误。订阅者可以设置很多订阅,会合并到一起。就是说,如果一个更新匹配任意一个订阅,订阅者都会接收。订阅者也可以取消特定的订阅。一个订阅通常是一个可打印字符串,但也不是必须的。参考zmq_setsockopt()看这是怎么工作的。
发布订阅套接字对是异步的。客户端在循环中(或单次如果有必要)做zmq_msg_recv()。尝试发送消息到订阅套接字将导致错误。同样的,服务按所需频率做zmq_msg_send(),但绝不能对发布套接字做zmq_msg_recv()。
理论上,ØMQ套接字不在乎哪一端来连接哪一端来绑定。但是在实践中会有未公开的差异,待会我会提及。现在,绑定发布并连接订阅,除非你的网络设计导致这无法实现。
还有一个关于发布订阅套接字的重要事项:你无法精确知道订阅者什么时候开始获取消息。即使你先启动一个订阅者,过一会再启动发布者,订阅者总是会错过发布者发送的第一条消息。这是因为订阅者连接到发布者时(占用了短暂但非零的时间),发布者可能已经将消息发送出去了。
这种“迟钝加入者”症状击中了很多人、很多次,我们需要详细解释一下。记住ØMQ是异步I/O的,也就是在后台。比如说你有两个节点按这个顺序这么做:
- 订阅者连接到一个端点并接收和计数消息
- 发布者绑定到一个端点并立刻发送1000条消息
那么订阅者很可能不会接收到任何东西。你可以眨眨眼,检查一下是否设置了正确的过滤器,再试试,然而订阅者还是不会接收任何东西。
建立TCP连接牵涉到大概几毫秒的来回握手,这取决于网络状况和节点之间跳跃的次数。这个时间里,ØMQ已经可以发送好多消息了。为了证明,假设花费5毫秒来建立连接,而相同的链路可以搞定1M每秒的消息。在订阅者连接到发布者的5毫秒里,发布者仅耗费1毫秒就将这1K的消息发送出去了。
在第2章 - 套接字与模式中我们会解释如何让发布者和订阅者同步,以便无需等待订阅者(们)都已连接并就绪时才开始发布数据。有一个简单的笨办法来延迟发布者,通过睡眠sleep。但是在真实程序中可不要这么做,因为这极为脆弱、不雅而缓慢。先用sleep向自己证明到底发生了什么,然后等到第2章再看正确做法。
同步的另一种替代方案是简单的假设发布的数据流是无限的,没有开始也没有结束。还假设订阅者不在乎它启动之前发生过什么。这就是我们建造的例子天气客户端的方式。
客户端订阅到选定的地区编码并收集1000个更新。也就是大概1000万服务器更新,如果地区编码是随机分发的。你可以先启动客户端,再启动服务器,而客户端会持续工作。你可以随时停止并重启服务器,而客户端会持续工作。当客户端收集到了1000个更新,它计算出平均值,打印输出,然后退出。
关于发布订阅模式的一些要点:
- 订阅者可以每次调用“connect”来连接到一个以上的发布者。数据会交错到达(“公平队列”)以避免发布者相互淹没。
- 如果一个发布者没有已连接的订阅者,那么它直接丢弃所有消息。
- 如果你使用TCP时一个订阅者很慢,消息会在发布者那里排上队。我们待会看看这种情况下如何用“高水位线”来保护发布者。
- 从ØMQ 3.x开始,使用已连接协议(tcp: 或 ipc:)将在发布者方面进行过滤,使用epgm://协议时,会在订阅者一方进行过滤。在ØMQ2.x中所有过滤都在订阅者方面进行。
如下是在我的笔记本电脑中接收过滤10M消息花费的时间,电脑配置是2011-era Intel i5,体面但也没啥特殊的。
$ time wuclient Collecting updates from weather server... Average temperature for zipcode '10001 ' was 28F real 0m4.470s user 0m0.000s sys 0m0.008s