zoukankan      html  css  js  c++  java
  • ZMQ 模式学习

    发布订阅模式:

    PUB发送,send。SUB接收,recv。和PUSH-PULL模式不同,PUB将消息同时发给和他建立的链接,类似于广播。另外发布订阅模式也可以使用订阅过滤来实现只接收特定的消息。订阅过滤是在服务器上进行过滤的,如果一个订阅者设定了过滤,那么发布者将只发布满足他订阅条件的消息。
    这个就是广播和收听的关系。PUB-SUB模式虽然没有使用网络的广播功能,但是它内部是异步的。也就是一次发送没有结束立刻开始下一次发送。
    广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。
    943117-20160705143357999-601098470.pngserver:

     1 #include <zmq.h>
     2 #include <stdio.h>
     3 #include <stdlib.h>
     4 #include "zmq_helper.h"
     5 
     6 int main(void)
     7 {
     8     void * context = zmq_ctx_new();
     9     void * socket = zmq_socket(context, ZMQ_PUB);
    10     zmq_bind(socket, "tcp://*:5556");
    11 
    12     srandom((unsigned)time(NULL));
    13 
    14     while(1)
    15     {
    16         int zipcode = randof(100000);   // 邮编: 0 ~ 99999
    17         int temp = randof(84) - 42;     // 温度: -42 ~ 41
    18         int relhumidity = randof(50) + 10;  // 相对湿度: 10 ~ 59
    19 
    20         char msg[20];
    21         snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity);
    22         s_send(socket, msg);
    23     }
    24 
    25     zmq_close(socket);
    26     zmq_ctx_destroy(context);
    27 
    28     return 0;
    29 
    30 }

    client:

    #include <zmq.h>
    #include <stdio.h>
    #include "zmq_helper.h"
    
    int main(void)
    {
        void * context = zmq_ctx_new();
        void * socket = zmq_socket(context, ZMQ_SUB);
        zmq_connect(socket, "tcp://localhost:5556");
    
        char * zipcode = "10001";
        zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode));
    
        for(int i = 0; i < 50; ++i)
        {
            char * string = s_recv(socket);
            printf("[Subscriber] Received weather report msg: %s
    ", string);
            free(string);
        }
    
        zmq_close(socket);
        zmq_ctx_destroy(context);
        
        return 0;
    }
    1. ZMQ_PUB类型的socket, 如果没有任何client与其相连, 其所有消息都将被简单就地抛弃
    2. ZMQ_SUB类型的socket, 即是client, 可以与多个ZMQ_PUB类型的socket相连, 即村民可以同时收听多个msg 但必须为每个msg都设置过滤器. 否则默认情况下, zmq认为client不关心msg里的所有内容.
    3. 当一个cline收听多个时, 接收消息采用公平队列策略
    4. 如果存在至少一个clint在收听, 那么这个消息就不会被随意抛弃: 这句话的意思是, 当消息过多, 而client的消化能力比较低的话, 未发送的消息会缓存在msg里.
    5. 在ZMQ大版本号在3以上的版本里, 当msg与client的速度不匹配时. 若使用的传输层协议是tcpipc这种面向连接的协议, 则堆积的消息缓存在里, 当使用epgm这种协议时, 堆积的消息缓存了client里. 在ZMQ 大版本号为2的版本中, 所有情况下, 消息都将堆积在clinet里

     

    Parallel Pipeline模式:

    由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。

    当连接被断开,数据不会丢失,重连后数据继续发送到对端。
    943117-20160705143418983-31884127.png

    分治套路里有三个角色:

    1. Ventilator. 包工头, 向手下各个工程队分派任务. 一个.
    2. Worker. 工程队, 从包工头里接收任务, 干活. 多个.
    3. Sink. 甲方监理, 工程队干完活后, 向甲方监理报告. 所以工程队的活干完之后, 监理统一收集所有工程队的成果. 一个.

    包工头代码:

     1 #include <zmq.h>
     2 #include <stdio.h>
     3 #include <time.h>
     4 #include "zmq_helper.h"
     5 
     6 int main(void)
     7 {
     8     void * context = zmq_ctx_new();
     9     void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    10     void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
    11     zmq_connect(socket_to_sink, "tcp://localhost:5558");
    12     zmq_bind(socket_to_worker, "tcp://*:5557");
    13 
    14     printf("Press Enter when all workers get ready:");
    15     getchar();
    16     printf("Sending tasks to workers...
    ");
    17 
    18     s_send(socket_to_sink, "Get ur ass up");    // 通知监理, 干活了
    19 
    20     srandom((unsigned)time(NULL));
    21 
    22     int total_ms = 0;
    23     for(int i = 0; i < 100; ++i)
    24     {
    25         int workload = randof(100) + 1;     // 工作需要的耗时, 单位ms
    26         total_ms += workload;
    27         char string[10];
    28         snprintf(string, sizeof(string), "%d", workload);
    29         s_send(socket_to_worker, string);   // 将工作分派给工程队
    30     }
    31 
    32     printf("Total expected cost: %d ms
    ", total_ms);
    33 
    34     zmq_close(socket_to_sink);
    35     zmq_close(socket_to_worker);
    36     zmq_ctx_destroy(context);
    37 
    38     return 0;
    39 }

    工程队代码:

     1 #include <zmq.h>
     2 #include <stdio.h>
     3 #include "zmq_helper.h"
     4 
     5 int main(void)
     6 {
     7     void * context = zmq_ctx_new();
     8     void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
     9     void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    10     zmq_connect(socket_to_ventilator, "tcp://localhost:5557");
    11     zmq_connect(socket_to_sink, "tcp://localhost:5558");
    12 
    13     while(1)
    14     {
    15         char * msg = s_recv(socket_to_ventilator);
    16         printf("Received msg: %s
    ", msg);
    17         fflush(stdout);
    18         s_sleep(atoi(msg));     // 干活, 即睡眠指定毫秒
    19         free(msg);
    20         s_send(socket_to_sink, "DONE"); // 活干完了通知监理
    21     }
    22 
    23     zmq_close(socket_to_ventilator);
    24     zmq_close(socket_to_sink);
    25     zmq_ctx_destroy(context);
    26 
    27     return 0;
    28 }

    监理代码:

     1 #include <zmq.h>
     2 #include <stdio.h>
     3 #include "zmq_helper.h"
     4 
     5 int main(void)
     6 {
     7     void * context = zmq_ctx_new();
     8     void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL);
     9     zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558");
    10 
    11     char * msg = s_recv(socket_to_worker_and_ventilator);
    12     printf("Received msg: %s", msg);    // 接收来自包工头的开始干活的消息
    13     free(msg);
    14 
    15     int64_t start_time = s_clock();
    16 
    17     for(int i = 0; i < 100; ++i)
    18     {
    19         // 接收100个worker干完活的消息
    20         char * msg = s_recv(socket_to_worker_and_ventilator);
    21         free(msg);
    22 
    23         if(i / 10 * 10 == i)
    24             printf(":");
    25         else
    26             printf(".");
    27         fflush(stdout);
    28     }
    29 
    30     printf("Total elapsed time: %d ms]
    ", (int)(s_clock() - start_time));
    31 
    32     zmq_close(socket_to_worker_and_ventilator);
    33     zmq_ctx_destroy(context);
    34 
    35     return 0;
    36 }

    这个示例程序的逻辑流程是这样的:

    1. 包工头向两个角色发送消息: 向工程队发送共计100个任务, 向监理发送消息, 通知监理开始干活
    2. 工程队接收来自包工头的消息, 并按消息里的数值, 睡眠指定毫秒. 每个任务结束后都通知监理.
    3. 监理先是接收来自包工头的消息, 开始计时. 然后统计来自工程队的消息, 当收集到100个任务完成的消息后, 计算实际耗时.

    包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.

    这里个例子中需要注意的点有:

    1. 这个例子中使用了ZMQ_PULLZMQ_PUSH两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点类似于发布-订阅套路, 具体之间的区别后续章节会讲到.
    2. 工程队上接包工头, 下接监理. 在任务执行过程中, 你可以随意的增加工程队的数量.
    3. 我们通过让包工头通知监理, 以及手动输入enter来启动任务分发的方式, 手动同步了工程队/包工头/监理. PUSH/PULL模式虽然和PUB/SUB不一样, 不会丢失消息. 但如果不手动同步的话, 最先建立连接的工程队将几乎把所有任务都接收到手, 导致后续完成连接的工程队拿不到任务, 任务分配不平衡.
    4. 包工头分派任务使用的是轮流/平均分配的方式.这是一种简单的负载均衡
    5. 监理接收多个工程队的消息, 使用的是公平队列策略.

    正确的处理context

    你大致注意到了, 在上面的所有示例代码中, 每次都以zmq_ctx_new()函数创建出一个名为context的变量, 目前你不需要了解它的细节, 这只是ZMQ库的标准套路. 甚至于你将来都不需要了解这个context里面到底是什么. 但你必须要遵循zmq中关于这个context的一些编程规定:

    1. 在一个进程起始时调用zmq_ctx_new()创建context
    2. 在进程结束之前调用zmq_ctx_destroy()销毁掉它

    每个进程, 应该持有, 且应该只持有, 一个context. 当然, 目前来说, 你这样理解就行了, 后续章节或许我们会深入探索一下context, 但目前, 请谨记, one context per process.

    如果你在代码中调用了fork系统调用, 那么请在子进程代码区的开始处调用zmq_ctx_new(), 为子进程创建自己的context

  • 相关阅读:
    使用TransactionScope实现事务
    CYQ.Data 框架系列
    MVP
    DYCOM用于开发网络应用程序的通信部分功能的快速开发
    架构师要了解
    Entity Framework资源
    Sina Blogs
    关于TransactionScope出错:“与基础事务管理器的通信失败”的解决方法总结
    在西方的程序员眼里,东方的程序员是什么样的?
    net2.0事务学习
  • 原文地址:https://www.cnblogs.com/mysky007/p/12288729.html
Copyright © 2011-2022 走看看