zoukankan      html  css  js  c++  java
  • ZeroMQ_04 发布订阅模式

    简单来说,就是服务端不断发布消息,客户端订阅了就会收到消息。

    下面我们看个简单的实例:

    Server:

    #include <stdlib.h> 
    #include <zmq.h>
    #include <string.h>
    #include <unistd.h>
    #include <time.h> 
    
    #define buffersize 4096
    #define randof(num)  (int) ((float) (num) * random () / (RAND_MAX + 1.0))
    
    int main(int argc, char* argv[])
    {
        // [0]创建对象
        void* ctx = zmq_ctx_new();
        void* publisher = zmq_socket(ctx, ZMQ_PUB);
        // [1]绑定到5566端口
        zmq_bind(publisher, "tcp://*:5566");
    
         //  初始化随机数生成器
        srandom ((unsigned) time (NULL));
        while (1) {
           int zipcode, temperature, relhumidity;
            zipcode     = randof (100000);
            temperature = randof (215) - 80;
            relhumidity = randof (50) + 10;
    
            //  Send message to all subscribers
            char update [20];
            sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
            printf("server send: %s
    ", update);
            //s_send (publisher, update);
            zmq_send (publisher, update, strlen (update), 0);
            sleep(1);
        }
        zmq_close(publisher);
        zmq_ctx_destroy(ctx);
        return 0;
    }

    Client:

    #include <stdlib.h> 
    #include <zmq.h>
    #include <string.h>
    #include <unistd.h>
    #include <time.h> 
    #include <assert.h>
    
    static char *s_recv (void *socket) {
        char buffer [256];
        int size = zmq_recv (socket, buffer, 255, 0);
        if (size == -1)
            return NULL;
        buffer[size] = '';
    
        return strndup (buffer, sizeof(buffer) - 1);
    }
    
    int main (int argc, char *argv [])
    {
        //  [0]创建对象,连接到5566端口
        printf ("Collecting updates from weather server...
    ");
        void *context = zmq_ctx_new ();
        void *subscriber = zmq_socket (context, ZMQ_SUB);
        int rc = zmq_connect (subscriber, "tcp://localhost:5566");
        assert (rc == 0);
    
        //  [1]设置过滤条件,设置为空,表示全订阅,这里“1”表示匹配开头为“1”的数据
        const char *filter =  "1";
        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                             filter, strlen (filter));
        assert (rc == 0);
        //  [2]接受数据
        int update_nbr;
        long total_temp = 0;
        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
            
            char *string = s_recv (subscriber);
            printf ("client: %s
    ", string);
            int zipcode, temperature, relhumidity;
            sscanf (string, "%d %d %d",
                &zipcode, &temperature, &relhumidity);
            total_temp += temperature;
            free (string);
        }
        printf ("Average temperature for zipcode '%s' was %dF
    ",
            filter, (int) (total_temp / update_nbr));
    
        zmq_close (subscriber);
        zmq_ctx_destroy (context);
        return 0;
    }

    out:

    // server
    server send: 43345 -41 19
    server send: 44203 110 59
    server send: 78038 2 25
    server send: 55377 59 18
    server send: 40135 -65 36
    server send: 37950 43 10
    
    // client
    zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
    Collecting updates from weather server...
    client....
    client: 10057 67 11
    client: 16839 94 25

     总的来说就是发布者不断发布消息,订阅者可以有选择的订阅消息,订阅规则可以设置多个。这里我们再加个订阅规则看一下。

    //  [1]设置过滤条件,设置为空,表示全订阅,这里“1”表示匹配开头为“1”的数据
        const char *filter =  "1";
        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                             filter, strlen (filter));
        assert (rc == 0);
        const char *filter2 =  "2";
        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                             filter2, strlen (filter2));
        assert (rc == 0);
    zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
    Collecting updates from weather server...
    client: 25113 56 58
    client: 29957 62 21
    client: 17914 -14 55
    client: 15522 -80 57
    client: 13588 34 53
    client: 13291 -22 58
    client: 18876 -62 13
    client: 25827 38 54
    client: 13747 -55 23

    我们可以看到订阅规则可以设置多个,就是订阅多了,就收的多了。

    注意: 

    需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。

    PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。

    关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。

  • 相关阅读:
    Django-Auth组件
    Django-choice用法
    Django-Cookie和session组件
    Django-DRF
    Django-DRF分页器
    Django-DRF全局异常捕获,响应封装,自动生成接口文档
    Java学习路线一张图足够
    Java基础内容总结
    java基础学习之反射反射的基本概念及使用
    Java基础的方法使用详解
  • 原文地址:https://www.cnblogs.com/vczf/p/12751249.html
Copyright © 2011-2022 走看看