zoukankan      html  css  js  c++  java
  • ZeroMQ指南第1章基础放出消息 标签: zeromqzeroMQZeroMQZMQzmq 20130217 23:37 3006人阅读

    放出消息

    第二个经典模式是单向数据分发,服务器推送更新到一组客户端。让我们看一个推送天气情况变化的例子,包含地区编码、温度、和相对湿度。我们会生成随机值来模拟真实气象站。

    这是服务器代码,这个程序我们使用5556端口。

    wuserver: Weather update server in C

    //
    // Weather update server
    // Binds PUB socket to tcp://*:5556
    // Publishes random weather updates
    //
    #include "zhelpers.h"
    
    int main (void)
    {
        // Prepare our context and publisher
        void *context = zmq_ctx_new ();
        void *publisher = zmq_socket (context, ZMQ_PUB);
        int rc = zmq_bind (publisher, "tcp://*:5556");
        assert (rc == 0);
        rc = zmq_bind (publisher, "ipc://weather.ipc");
        assert (rc == 0);
    
        // Initialize random number generator
        srandom ((unsigned) time (NULL));
        while (1) {
            // Get values that will fool the boss
            int zipcode, temperature, relhumidity;
            zipcode = randof (100000);
            temperature = randof (215) - 80;
            relhumidity = randof (50) + 10;
    
            // Send message to all subscribers
            char update [20];
            sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
            s_send (publisher, update);
        }
        zmq_close (publisher);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    更新流既无开始也无结束,像一个永不结束的天气预报。

    图 4 – 发布-订阅

    这是客户端程序,监听更新流并捕获符合特定地区编码的所有消息,默认为纽约市因为那是个冒险的好地方:

    wuclient: Weather update client in C

    //
    // Weather update client
    // Connects SUB socket to tcp://localhost:5556
    // Collects weather updates and finds avg temp in zipcode
    //
    #include "zhelpers.h"
    
    int main (int argc, char *argv [])
    {
        void *context = zmq_ctx_new ();
    
        // Socket to talk to server
        printf ("Collecting updates from weather server…\n");
        void *subscriber = zmq_socket (context, ZMQ_SUB);
        int rc = zmq_connect (subscriber, "tcp://localhost:5556");
        assert (rc == 0);
    
        // Subscribe to zipcode, default is NYC, 10001
        char *filter = (argc > 1) ? argv [1] : "10001 ";
        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
        assert (rc == 0);
    
        // Process 100 updates
        int update_nbr;
        long total_temp = 0;
        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
            char *string = s_recv (subscriber);
    
            int zipcode, temperature, relhumidity;
            sscanf (string, "%d %d %d",
                    &zipcode, &temperature, &relhumidity);
            total_temp += temperature;
            free (string);
        }
        printf ("Average temperature for zipcode '%s' was %dF\n",
                filter, (int) (total_temp / update_nbr));
    
        zmq_close (subscriber);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    注意当你使用一个订阅套接字时你必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅,就像这段代码中那样。如果你不设置任何订阅,就得不到任何消息。这是初学者的常见错误。订阅者可以设置很多订阅,会合并到一起。就是说,如果一个更新匹配任意一个订阅,订阅者都会接收。订阅者也可以取消特定的订阅。一个订阅通常是一个可打印字符串,但也不是必须的。参考zmq_setsockopt()看这是怎么工作的。

    发布订阅套接字对是异步的。客户端在循环中(或单次如果有必要)做zmq_msg_recv()。尝试发送消息到订阅套接字将导致错误。同样的,服务按所需频率做zmq_msg_send(),但绝不能对发布套接字做zmq_msg_recv()。

    理论上,ØMQ套接字不在乎哪一端来连接哪一端来绑定。但是在实践中会有未公开的差异,待会我会提及。现在,绑定发布并连接订阅,除非你的网络设计导致这无法实现。

    还有一个关于发布订阅套接字的重要事项:你无法精确知道订阅者什么时候开始获取消息。即使你先启动一个订阅者,过一会再启动发布者,订阅者总是会错过发布者发送的第一条消息。这是因为订阅者连接到发布者时(占用了短暂但非零的时间),发布者可能已经将消息发送出去了。

    这种“迟钝加入者”症状击中了很多人、很多次,我们需要详细解释一下。记住ØMQ是异步I/O的,也就是在后台。比如说你有两个节点按这个顺序这么做:

    • 订阅者连接到一个端点并接收和计数消息
    • 发布者绑定到一个端点并立刻发送1000条消息

    那么订阅者很可能不会接收到任何东西。你可以眨眨眼,检查一下是否设置了正确的过滤器,再试试,然而订阅者还是不会接收任何东西。

    建立TCP连接牵涉到大概几毫秒的来回握手,这取决于网络状况和节点之间跳跃的次数。这个时间里,ØMQ已经可以发送好多消息了。为了证明,假设花费5毫秒来建立连接,而相同的链路可以搞定1M每秒的消息。在订阅者连接到发布者的5毫秒里,发布者仅耗费1毫秒就将这1K的消息发送出去了。

    在第2章 - 套接字与模式中我们会解释如何让发布者和订阅者同步,以便无需等待订阅者(们)都已连接并就绪时才开始发布数据。有一个简单的笨办法来延迟发布者,通过睡眠sleep。但是在真实程序中可不要这么做,因为这极为脆弱、不雅而缓慢。先用sleep向自己证明到底发生了什么,然后等到第2章再看正确做法。

    同步的另一种替代方案是简单的假设发布的数据流是无限的,没有开始也没有结束。还假设订阅者不在乎它启动之前发生过什么。这就是我们建造的例子天气客户端的方式。

    客户端订阅到选定的地区编码并收集1000个更新。也就是大概1000万服务器更新,如果地区编码是随机分发的。你可以先启动客户端,再启动服务器,而客户端会持续工作。你可以随时停止并重启服务器,而客户端会持续工作。当客户端收集到了1000个更新,它计算出平均值,打印输出,然后退出。

    关于发布订阅模式的一些要点:

    • 订阅者可以每次调用“connect”来连接到一个以上的发布者。数据会交错到达(“公平队列”)以避免发布者相互淹没。
    • 如果一个发布者没有已连接的订阅者,那么它直接丢弃所有消息。
    • 如果你使用TCP时一个订阅者很慢,消息会在发布者那里排上队。我们待会看看这种情况下如何用“高水位线”来保护发布者。
    • 从ØMQ 3.x开始,使用已连接协议(tcp: 或 ipc:)将在发布者方面进行过滤,使用epgm://协议时,会在订阅者一方进行过滤。在ØMQ2.x中所有过滤都在订阅者方面进行。

    如下是在我的笔记本电脑中接收过滤10M消息花费的时间,电脑配置是2011-era Intel i5,体面但也没啥特殊的。

    $ time wuclient
    Collecting updates from weather server...
    Average temperature for zipcode '10001 ' was 28F
    
    real    0m4.470s
    user    0m0.000s
    sys     0m0.008s
    
  • 相关阅读:
    海量数据处理
    mysql数据导出
    手机归属地
    如何正确合理的建立MYSQL数据库索引
    Java 复杂excel报表导出
    NullpointerException真的一定要被预防?
    代码传递信息方式的探究
    ThreadLoacl的反思
    Codis分布式锁
    spring mvc:事务引起的try/catch失效
  • 原文地址:https://www.cnblogs.com/zerofire/p/7162162.html
Copyright © 2011-2022 走看看