zoukankan      html  css  js  c++  java
  • ZeroMQ_10 节点协调

    当你想要对节点进行协调时,PAIR套接字就不怎么合适了,这也是线程和节点之间的不同之处。一般来说,节点是来去自由的,而线程则较为稳定。使用PAIR套接字时,若远程节点断开连接后又进行重连,PAIR不会予以理会。

    第二个区别在于,线程的数量一般是固定的,而节点数量则会经常变化。让我们以气象信息模型为基础,看看要怎样进行节点的协调,以保证客户端不会丢失最开始的那些消息。

    下面是程序运行逻辑:

    • 发布者知道预期的订阅者数量,这个数字可以任意指定;
    • 发布者启动后会先等待所有订阅者进行连接,也就是节点协调。每个订阅者会使用另一个套接字来告知发布者自己已就绪;
    • 当所有订阅者准备就绪后,发布者才开始发送消息。

    这里我们会使用REQ-REP套接字来同步发布者和订阅者。发布者的代码如下:

    syncpub: Synchronized publisher in C

    #include "../zhelpers.h"
    #define SUBSCRIBERS_EXPECTED  10  //  We wait for 10 subscribers 
    
    int main (void)
    {
        void *context = zmq_ctx_new ();
    
        //  Socket to talk to clients
        void *publisher = zmq_socket (context, ZMQ_PUB);
    
        int sndhwm = 11000000;
        // ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量)
        zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int));
    
        zmq_bind (publisher, "tcp://*:5561");
    
        //  Socket to receive signals
        void *syncservice = zmq_socket (context, ZMQ_REP);
        zmq_bind (syncservice, "tcp://*:5562");
    
        //  Get synchronization from subscribers
        printf ("Waiting for subscribers
    ");
        int subscribers = 0;
        while (subscribers < SUBSCRIBERS_EXPECTED) {
            //  - wait for synchronization request
            char *string = s_recv (syncservice);
            free (string);
            //  - send synchronization reply
            s_send (syncservice, "");
            subscribers++;
        }
        //  Now broadcast exactly 1M updates followed by END
        printf ("Broadcasting messages
    ");
        int update_nbr;
        for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
            s_send (publisher, "Rhubarb");
    
        s_send (publisher, "END");
    
        zmq_close (publisher);
        zmq_close (syncservice);
        zmq_ctx_destroy (context);
        return 0;
    }

    以下是订阅者的代码:

    syncsub: Synchronized subscriber in C

    #include "../zhelpers.h"
    #define SUBSCRIBERS_EXPECTED  10  //  We wait for 10 subscribers 
    
    int main (void)
    {
        void *context = zmq_ctx_new ();
    
        //  First, connect our subscriber socket
        void *subscriber = zmq_socket (context, ZMQ_SUB);
        zmq_connect (subscriber, "tcp://localhost:5561");
        zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
    
        //  0MQ is so fast, we need to wait a while...
        sleep (1);
    
        //  Second, synchronize with publisher
        void *syncclient = zmq_socket (context, ZMQ_REQ);
        zmq_connect (syncclient, "tcp://localhost:5562");
    
        //  - send a synchronization request
        s_send (syncclient, "");
    
        //  - wait for synchronization reply
        char *string = s_recv (syncclient);
        free (string);
    
        //  Third, get our updates and report how many we got
        int update_nbr = 0;
        while (1) {
            char *string = s_recv (subscriber);
            if (strcmp (string, "END") == 0) {
                free (string);
                break;
            }
            free (string);
            update_nbr++;
        }
        printf ("Received %d updates
    ", update_nbr);
    
        zmq_close (subscriber);
        zmq_close (syncclient);
        zmq_ctx_destroy (context);
        return 0;
    }

    shell脚本

    #!/bin/bash
    echo "正在启动订阅者..."
    for a in 1 2 3 4 5 6 7 8 9 10; do
        ./syncsub &
    done
    echo "正在启动发布者..."
    ./syncpub

    out:

    zf@eappsvr-0:~/ds/zmq/test/syncsub> ./run.sh 
    正在启动订阅者...
    正在启动发布者...
    Waiting for subscribers
    Broadcasting messages
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
    Received 1000000 updates
  • 相关阅读:
    数据库存储的数据为 unicode格式,在.NET 读取出来并转换为繁体字?
    ASP.net 打包并附加数据库
    xml特殊符號
    母板頁面 弹出提示错误的解决方案
    SQL一句實現上一條 下一條信息
    Serializing Faults using XmlSerializer
    VS2005 編譯WebService 為 CS文件
    Java开发的一些文件结构及位置
    kalilinux 其他配置
    完美解决mysql保存中文出现1366错误
  • 原文地址:https://www.cnblogs.com/vczf/p/12886588.html
Copyright © 2011-2022 走看看