zoukankan      html  css  js  c++  java
  • ZeroMQ指南第1章基础放出消息 标签: zeromqzeroMQZeroMQZMQzmq 20130217 23:37 3006人阅读

    放出消息

    第二个经典模式是单向数据分发,服务器推送更新到一组客户端。让我们看一个推送天气情况变化的例子,包含地区编码、温度、和相对湿度。我们会生成随机值来模拟真实气象站。

    这是服务器代码,这个程序我们使用5556端口。

    wuserver: Weather update server in C

    //
    // Weather update server
    // Binds PUB socket to tcp://*:5556
    // Publishes random weather updates
    //
    #include "zhelpers.h"
    
    int main (void)
    {
        // Prepare our context and publisher
        void *context = zmq_ctx_new ();
        void *publisher = zmq_socket (context, ZMQ_PUB);
        int rc = zmq_bind (publisher, "tcp://*:5556");
        assert (rc == 0);
        rc = zmq_bind (publisher, "ipc://weather.ipc");
        assert (rc == 0);
    
        // Initialize random number generator
        srandom ((unsigned) time (NULL));
        while (1) {
            // Get values that will fool the boss
            int zipcode, temperature, relhumidity;
            zipcode = randof (100000);
            temperature = randof (215) - 80;
            relhumidity = randof (50) + 10;
    
            // Send message to all subscribers
            char update [20];
            sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
            s_send (publisher, update);
        }
        zmq_close (publisher);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    更新流既无开始也无结束,像一个永不结束的天气预报。

    图 4 – 发布-订阅

    这是客户端程序,监听更新流并捕获符合特定地区编码的所有消息,默认为纽约市因为那是个冒险的好地方:

    wuclient: Weather update client in C

    //
    // Weather update client
    // Connects SUB socket to tcp://localhost:5556
    // Collects weather updates and finds avg temp in zipcode
    //
    #include "zhelpers.h"
    
    int main (int argc, char *argv [])
    {
        void *context = zmq_ctx_new ();
    
        // Socket to talk to server
        printf ("Collecting updates from weather server…\n");
        void *subscriber = zmq_socket (context, ZMQ_SUB);
        int rc = zmq_connect (subscriber, "tcp://localhost:5556");
        assert (rc == 0);
    
        // Subscribe to zipcode, default is NYC, 10001
        char *filter = (argc > 1) ? argv [1] : "10001 ";
        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
        assert (rc == 0);
    
        // Process 100 updates
        int update_nbr;
        long total_temp = 0;
        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
            char *string = s_recv (subscriber);
    
            int zipcode, temperature, relhumidity;
            sscanf (string, "%d %d %d",
                    &zipcode, &temperature, &relhumidity);
            total_temp += temperature;
            free (string);
        }
        printf ("Average temperature for zipcode '%s' was %dF\n",
                filter, (int) (total_temp / update_nbr));
    
        zmq_close (subscriber);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    注意当你使用一个订阅套接字时你必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅,就像这段代码中那样。如果你不设置任何订阅,就得不到任何消息。这是初学者的常见错误。订阅者可以设置很多订阅,会合并到一起。就是说,如果一个更新匹配任意一个订阅,订阅者都会接收。订阅者也可以取消特定的订阅。一个订阅通常是一个可打印字符串,但也不是必须的。参考zmq_setsockopt()看这是怎么工作的。

    发布订阅套接字对是异步的。客户端在循环中(或单次如果有必要)做zmq_msg_recv()。尝试发送消息到订阅套接字将导致错误。同样的,服务按所需频率做zmq_msg_send(),但绝不能对发布套接字做zmq_msg_recv()。

    理论上,ØMQ套接字不在乎哪一端来连接哪一端来绑定。但是在实践中会有未公开的差异,待会我会提及。现在,绑定发布并连接订阅,除非你的网络设计导致这无法实现。

    还有一个关于发布订阅套接字的重要事项:你无法精确知道订阅者什么时候开始获取消息。即使你先启动一个订阅者,过一会再启动发布者,订阅者总是会错过发布者发送的第一条消息。这是因为订阅者连接到发布者时(占用了短暂但非零的时间),发布者可能已经将消息发送出去了。

    这种“迟钝加入者”症状击中了很多人、很多次,我们需要详细解释一下。记住ØMQ是异步I/O的,也就是在后台。比如说你有两个节点按这个顺序这么做:

    • 订阅者连接到一个端点并接收和计数消息
    • 发布者绑定到一个端点并立刻发送1000条消息

    那么订阅者很可能不会接收到任何东西。你可以眨眨眼,检查一下是否设置了正确的过滤器,再试试,然而订阅者还是不会接收任何东西。

    建立TCP连接牵涉到大概几毫秒的来回握手,这取决于网络状况和节点之间跳跃的次数。这个时间里,ØMQ已经可以发送好多消息了。为了证明,假设花费5毫秒来建立连接,而相同的链路可以搞定1M每秒的消息。在订阅者连接到发布者的5毫秒里,发布者仅耗费1毫秒就将这1K的消息发送出去了。

    在第2章 - 套接字与模式中我们会解释如何让发布者和订阅者同步,以便无需等待订阅者(们)都已连接并就绪时才开始发布数据。有一个简单的笨办法来延迟发布者,通过睡眠sleep。但是在真实程序中可不要这么做,因为这极为脆弱、不雅而缓慢。先用sleep向自己证明到底发生了什么,然后等到第2章再看正确做法。

    同步的另一种替代方案是简单的假设发布的数据流是无限的,没有开始也没有结束。还假设订阅者不在乎它启动之前发生过什么。这就是我们建造的例子天气客户端的方式。

    客户端订阅到选定的地区编码并收集1000个更新。也就是大概1000万服务器更新,如果地区编码是随机分发的。你可以先启动客户端,再启动服务器,而客户端会持续工作。你可以随时停止并重启服务器,而客户端会持续工作。当客户端收集到了1000个更新,它计算出平均值,打印输出,然后退出。

    关于发布订阅模式的一些要点:

    • 订阅者可以每次调用“connect”来连接到一个以上的发布者。数据会交错到达(“公平队列”)以避免发布者相互淹没。
    • 如果一个发布者没有已连接的订阅者,那么它直接丢弃所有消息。
    • 如果你使用TCP时一个订阅者很慢,消息会在发布者那里排上队。我们待会看看这种情况下如何用“高水位线”来保护发布者。
    • 从ØMQ 3.x开始,使用已连接协议(tcp: 或 ipc:)将在发布者方面进行过滤,使用epgm://协议时,会在订阅者一方进行过滤。在ØMQ2.x中所有过滤都在订阅者方面进行。

    如下是在我的笔记本电脑中接收过滤10M消息花费的时间,电脑配置是2011-era Intel i5,体面但也没啥特殊的。

    $ time wuclient
    Collecting updates from weather server...
    Average temperature for zipcode '10001 ' was 28F
    
    real    0m4.470s
    user    0m0.000s
    sys     0m0.008s
    
  • 相关阅读:
    SPOJ 694 (后缀数组) Distinct Substrings
    POJ 2774 (后缀数组 最长公共字串) Long Long Message
    POJ 3693 (后缀数组) Maximum repetition substring
    POJ 3261 (后缀数组 二分) Milk Patterns
    UVa 1149 (贪心) Bin Packing
    UVa 12206 (字符串哈希) Stammering Aliens
    UVa 11210 (DFS) Chinese Mahjong
    UVa (BFS) The Monocycle
    UVa 11624 (BFS) Fire!
    HDU 3032 (Nim博弈变形) Nim or not Nim?
  • 原文地址:https://www.cnblogs.com/zerofire/p/7162162.html
Copyright © 2011-2022 走看看