zoukankan      html  css  js  c++  java
  • ZeroMQ指南第1章基础分而治之 标签: zeromqzeroMQZeroMQZMQzmq 20130217 23:49 4557人阅读

    分而治之

    作为最终示例(你肯定对生动的代码开始生厌并希望回头去钻研关于比较性、抽象性准则的语言学探讨),让我们来做一个小型超级计算。然后喝个咖啡。我们的超级计算程序是个非常典型的并行处理模型。我们有:

    • 一个通风机(ventilator)来产生可以并行处理的任务
    • 一组工人(worker)来处理任务
    • 一个水槽(sink)来回收工人处理的结果

    事实上,工人运行于超快的机子,没准是GPU(图形处理单元)来做困难运算。这是通风机代码,生成100个任务,每个任务都是一条消息告诉工人休眠(sleep)几毫秒。

    taskvent: Parallel task ventilator in C

    //
    // Task ventilator
    // Binds PUSH socket to tcp://localhost:5557
    // Sends batch of tasks to workers via that socket
    //
    #include "zhelpers.h"
    
    int main (void)
    {
        void *context = zmq_ctx_new ();
    
        // Socket to send messages on
        void *sender = zmq_socket (context, ZMQ_PUSH);
        zmq_bind (sender, "tcp://*:5557");
    
        // Socket to send start of batch message on
        void *sink = zmq_socket (context, ZMQ_PUSH);
        zmq_connect (sink, "tcp://localhost:5558");
    
        printf ("Press Enter when the workers are ready: ");
        getchar ();
        printf ("Sending tasks to workers…\n");
    
        // The first message is "0" and signals start of batch
        s_send (sink, "0");
    
        // Initialize random number generator
        srandom ((unsigned) time (NULL));
    
        // Send 100 tasks
        int task_nbr;
        int total_msec = 0; // Total expected cost in msecs
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            int workload;
            // Random workload from 1 to 100msecs
            workload = randof (100) + 1;
            total_msec += workload;
            char string [10];
            sprintf (string, "%d", workload);
            s_send (sender, string);
        }
        printf ("Total expected cost: %d msec\n", total_msec);
        sleep (1); // Give 0MQ time to deliver
    
        zmq_close (sink);
        zmq_close (sender);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    图 5 - 并行管道

    这是工人程序。接收消息,休眠指定的时间,然后表明自己完成任务:

    taskwork: Parallel task worker in C

    //
    // Task worker
    // Connects PULL socket to tcp://localhost:5557
    // Collects workloads from ventilator via that socket
    // Connects PUSH socket to tcp://localhost:5558
    // Sends results to sink via that socket
    //
    #include "zhelpers.h"
    
    int main (void)
    {
        void *context = zmq_ctx_new ();
    
        // Socket to receive messages on
        void *receiver = zmq_socket (context, ZMQ_PULL);
        zmq_connect (receiver, "tcp://localhost:5557");
    
        // Socket to send messages to
        void *sender = zmq_socket (context, ZMQ_PUSH);
        zmq_connect (sender, "tcp://localhost:5558");
    
        // Process tasks forever
        while (1) {
            char *string = s_recv (receiver);
            // Simple progress indicator for the viewer
            fflush (stdout);
            printf ("%s.", string);
    
            // Do the work
            s_sleep (atoi (string));
            free (string);
    
            // Send results to sink
            s_send (sender, "");
        }
        zmq_close (receiver);
        zmq_close (sender);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    这是水槽程序。它收集这100个任务,然后计算整个处理消耗的时间,让我们能够证实如果有多个工人时他们真的是并行运转的:

    tasksink: Parallel task sink in C

    //
    // Task sink
    // Binds PULL socket to tcp://localhost:5558
    // Collects results from workers via that socket
    //
    #include "zhelpers.h"
    
    int main (void)
    {
        // Prepare our context and socket
        void *context = zmq_ctx_new ();
        void *receiver = zmq_socket (context, ZMQ_PULL);
        zmq_bind (receiver, "tcp://*:5558");
    
        // Wait for start of batch
        char *string = s_recv (receiver);
        free (string);
    
        // Start our clock now
        int64_t start_time = s_clock ();
    
        // Process 100 confirmations
        int task_nbr;
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            char *string = s_recv (receiver);
            free (string);
            if ((task_nbr / 10) * 10 == task_nbr)
                printf (":");
            else
                printf (".");
            fflush (stdout);
        }
        // Calculate and report duration of batch
        printf ("Total elapsed time: %d msec\n",
                (int) (s_clock () - start_time));
    
        zmq_close (receiver);
        zmq_ctx_destroy (context);
        return 0;
    }
    

    批处理的平均消耗为5秒。当我们启动1个、2个、4个工人时,我们从水槽取得的结果是这样的:

    #   1 worker
    Total elapsed time: 5034 msec
    #   2 workers
    Total elapsed time: 2421 msec
    #   4 workers
    Total elapsed time: 1018 msec
    

    让我们更细致的查看这段代码的某些方面:

    • 工人们上游连接通风机,下游连接水槽。这意味着你可以任意添加工人。如果工人绑定到他们的端点,你会需要(a)更多的端点(b)每添加一个工人都得修改通风机或水槽。我们说通风机和水槽是结构中的“稳定”部分,而工人们是“动态”部分。
    • 我们不得不在批次的开始与所有工人们都起来运行两者间做出同步。这是一个ØMQ中特别常见的陷阱,也没有简单方案。“连接”方法需要一定时间。所以当一组工人连接到通风机,第一个成功连接的工人会在瞬间得到消息的全部负载,而其他人仍在连接。如果你总是不去同步批次的开始,系统完全不会并行运转。试着移除等待看看。
    • 通风机的推送(PUSH)套接字均匀的分发任务到工人们(假定批次开始送出之前他们都已连接)。这叫做负载均衡,我们会再详细看看。
    • 水槽的拉取(PULL)套接字均匀的收集工人的成果。这叫做公平队列

    图6 - 公平队列

    管道模式也表现出“迟钝加入者”综合症,导致了对推送套接字不能正确负载均衡的控诉。如果你使用推送和拉取,而且其中一个工人比其他人得到更多的消息,那是因为他的推送套接字比别人连接的更快,然后在其他人连接达成之前捕获了一大堆消息。如果你想要正确的负载均衡,你可能想要看看第3章 - 高级请求应答模式中的小节:负载均衡模式。

  • 相关阅读:
    Ubuntu18.04查看ip地址
    使用Vmware克隆功能快速创建多台虚拟机
    使用Vmware快照功能对虚拟机进行备份还原
    安装Vmware并创建Ubuntu虚拟机
    使用vmware+Ubuntu搭建hadoop集群
    Gitee图床+PicGo+Typora便捷在博客中使用图片
    使用Gitee Pages+hugo免费搭建你的博客
    Scheduler的WaitRun存在卡死的问题
    使用OpenJDK进行Delphi Android开发
    citus
  • 原文地址:https://www.cnblogs.com/zerofire/p/7162161.html
Copyright © 2011-2022 走看看