zoukankan      html  css  js  c++  java
  • ZeroMQ_11 ROUTER-DEALER路由

    ROUTER-DEALER是一种最简单的路由方式。将ROUTER和多个DEALER相连接,用一种合适的算法来决定如何分发消息给DEALER。DEALER可以是一个黑洞(只负责处理消息,不给任何返回)、代理(将消息转发给其他节点)或是服务(会发送返回信息)。

    如果你要求DEALER能够进行回复,那就要保证只有一个ROUTER连接到DEALER,因为DEALER并不知道哪个特定的节点在联系它,如果有多个节点,它会做负载均衡,将消息分发出去。但如果DEALER是一个黑洞,那就可以连接任何数量的节点。

    ROUTER-DEALER路由可以用来做什么呢?如果DEALER会将它完成任务的时间回复给ROUTER,那ROUTER就可以知道这个DEALER的处理速度有多快了。因为ROUTER和DEALER都是异步的套接字,所以我们要用zmq_poll()来处理这种情况。

    下面例子中的两个DEALER不会返回消息给ROUTER,我们的路由采用加权随机算法:发送两倍多的信息给其中的一个DEALER。

     rtdealer.c

    // 2015-02-27T11:40+08:00
    //  ROUTER-to-DEALER example
    
    #include "../zhelpers.h"
    #include <pthread.h>
    #define NBR_WORKERS 10
    
    static void *
    worker_task(void *args)
    {
        void *context = zmq_ctx_new();
        void *worker = zmq_socket(context, ZMQ_DEALER);
    
    #if (defined (WIN32))
        s_set_id(worker, (intptr_t)args);
    #else
        s_set_id(worker);          //  Set a printable identity
    #endif
    
        zmq_connect (worker, "tcp://localhost:5671");
    
        int total = 0;
        while (1) {
            //  Tell the broker we're ready for work
            s_sendmore(worker, "");
            s_send(worker, "Hi Boss");
    
            //  Get workload from broker, until finished
            free(s_recv(worker));     //  Envelope delimiter
            char *workload = s_recv(worker);
            //  .skip
            int finished = (strcmp(workload, "Fired!") == 0);
            free(workload);
            if (finished) {
                printf("Completed: %d tasks
    ", total);
                break;
            }
            total++;
    
            //  Do some random work
            s_sleep(randof(500) + 1);
        }
        zmq_close(worker);
        zmq_ctx_destroy(context);
        return NULL;
    }
    
    //  .split main task
    //  While this example runs in a single process, that is just to make
    //  it easier to start and stop the example. Each thread has its own
    //  context and conceptually acts as a separate process.
    
    int main(void)
    {
        void *context = zmq_ctx_new();
        void *broker = zmq_socket(context, ZMQ_ROUTER);
    
        zmq_bind(broker, "tcp://*:5671");
        srandom((unsigned)time(NULL));
    
        int worker_nbr;
        for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
            pthread_t worker;
            pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr);
        }
        //  Run for five seconds and then tell workers to end
        int64_t end_time = s_clock() + 5000;
        int workers_fired = 0;
        while (1) {
            //  Next message gives us least recently used worker
            char *identity = s_recv(broker);
            s_sendmore(broker, identity);
            free(identity);
            free(s_recv(broker));     //  Envelope delimiter
            free(s_recv(broker));     //  Response from worker
            s_sendmore(broker, "");
    
            //  Encourage workers until it's time to fire them
            if (s_clock() < end_time)
                s_send(broker, "Work harder");
            else {
                s_send(broker, "Fired!");
                if (++workers_fired == NBR_WORKERS)
                    break;
            }
        }
        zmq_close(broker);
        zmq_ctx_destroy(context);
        return 0;
    }
    //  .until

    out:

    Completed: 16 tasks
    Completed: 20 tasks
    Completed: 18 tasks
    Completed: 17 tasks
    Completed: 21 tasks
    Completed: 15 tasks
    Completed: 18 tasks
    Completed: 21 tasks
    Completed: 22 tasks
    Completed: 18 tasks
  • 相关阅读:
    线程(C++11)
    初始化、赋值
    优质学习资料总结
    移植之乱谈
    采集音频和摄像头视频并实时H264编码及AAC编码
    H264解码器源码(Android 1.6 版)
    移植ffmpeg到VC环境心得
    收集的网络上大型的开源图像处理软件代码(提供下载链接)
    C++开源库,欢迎补充。
    一个轻量级AOP的实现(开源)
  • 原文地址:https://www.cnblogs.com/vczf/p/12942635.html
Copyright © 2011-2022 走看看