zoukankan      html  css  js  c++  java
  • ZeroMQ_12 请求应答---LRU模式

    // 2015-02-27T11:40+08:00
    //  ROUTER-to-DEALER example
    
    #include "../zhelpers.h"
    #include <pthread.h>
    #define NBR_WORKERS 10
    
    static void *
    worker_task(void *args)
    {
        void *context = zmq_ctx_new();
        void *worker = zmq_socket(context, ZMQ_REQ);
    
    #if (defined (WIN32))
        s_set_id(worker, (intptr_t)args);
    #else
        s_set_id(worker);          //  Set a printable identity
    #endif
    
        zmq_connect (worker, "ipc://routing.ipc");
    
        int total = 0;
        while (1) {
            //  Tell the broker we're ready for work
            s_send(worker, "ready");
    
            char *workload = s_recv(worker);
    
            //  .skip
            int finished = (strcmp(workload, "END") == 0);
            free(workload);
            if (finished) {
                printf("Completed: %d tasks
    ", total);
                break;
            }
            total++;
    
            //  Do some random work
            s_sleep(randof(1000) + 1);
        }
        zmq_close(worker);
        zmq_ctx_destroy(context);
        return NULL;
    }
    
    //  .split main task
    //  While this example runs in a single process, that is just to make
    //  it easier to start and stop the example. Each thread has its own
    //  context and conceptually acts as a separate process.
    
    int main(void)
    {
        void *context = zmq_ctx_new();
        void *broker = zmq_socket(context, ZMQ_ROUTER);
    
        zmq_bind(broker, "ipc://routing.ipc");
        srandom((unsigned)time(NULL));
    
        int worker_nbr;
        for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
            pthread_t worker;
            pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr);
        }
    
        int task_nbr;
        for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
            // 最近最少使用的worker就在消息队列中
            char *address = s_recv(broker);
            char *empty = s_recv(broker);
            free(empty);
            char *ready = s_recv(broker);
            free(ready);
     
            s_sendmore(broker, address);
            s_sendmore(broker, "");
            s_send(broker, "This is the workload");
            free(address);
        }
         // 通知所有REQ套接字结束工作
        for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
            char *address = s_recv(broker);
            char *empty = s_recv(broker);
            free(empty);
            char *ready = s_recv(broker);
            free(ready);
     
            s_sendmore(broker, address);
            s_sendmore(broker, "");
            s_send(broker, "END");
            free(address);
        }
        zmq_close(broker);
        zmq_ctx_destroy(context);
        return 0;
    }
    //  .until

    out:

    Completed: 9 tasks
    Completed: 10 tasks
    Completed: 9 tasks
    Completed: 11 tasks
    Completed: 9 tasks
    Completed: 12 tasks
    Completed: 12 tasks
    Completed: 10 tasks
    Completed: 8 tasks
    Completed: 10 tasks

    关于以上代码的几点说明:

    • 我们不需要像前一个例子一样等待一段时间,因为REQ套接字会明确告诉ROUTER它已经准备好了。

    • 我们使用了zhelpers.h提供的s_set_id()函数来为套接字生成一个可打印的字符串标识,这是为了让例子简单一些。在现实环境中,REQ套接字都是匿名的,你需要直接调用zmq_recv()和zmq_send()来处理消息,因为s_recv()和s_send()只能处理字符串标识的套接字。

    • 更糟的是,我们使用了随机的标识,不要在现实环境中使用随机标识的持久套接字,这样做会将节点消耗殆尽。

    • 如果你只是将上面的代码拷贝过来,没有充分理解,那你就像是看到蜘蛛人从屋顶上飞下来,你也照着做了,后果自负吧。

    在将消息路由给REQ套接字时,需要注意一定的格式,即地址-空帧-消息:

  • 相关阅读:
    windows 系统配置(二)MongoDB
    The AJP Connector is configured with secretRequired="true" but the secret attribute is either null or "".
    【读书笔记】C#高级编程 第十三章 异步编程
    【读书笔记】C#高级编程 第十二章 动态语言扩展
    【读书笔记】C#高级编程 第十一章 LINQ
    【读书笔记】C#高级编程 第十章 集合
    【读书笔记】C#高级编程 第九章 字符串和正则表达式
    【读书笔记】C#高级编程 第八章 委托、lambda表达式和事件
    【读书笔记】C#高级编程 第七章 运算符和类型强制转换
    【读书笔记】C#高级编程 第六章 数组
  • 原文地址:https://www.cnblogs.com/vczf/p/12955003.html
Copyright © 2011-2022 走看看