分而治之
作为最终示例(你肯定对生动的代码开始生厌并希望回头去钻研关于比较性、抽象性准则的语言学探讨),让我们来做一个小型超级计算。然后喝个咖啡。我们的超级计算程序是个非常典型的并行处理模型。我们有:
- 一个通风机(ventilator)来产生可以并行处理的任务
- 一组工人(worker)来处理任务
- 一个水槽(sink)来回收工人处理的结果
事实上,工人运行于超快的机子,没准是GPU(图形处理单元)来做困难运算。这是通风机代码,生成100个任务,每个任务都是一条消息告诉工人休眠(sleep)几毫秒。
taskvent: Parallel task ventilator in C
// // Task ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket // #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to send messages on void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // Socket to send start of batch message on void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers…\n"); // The first message is "0" and signals start of batch s_send (sink, "0"); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } printf ("Total expected cost: %d msec\n", total_msec); sleep (1); // Give 0MQ time to deliver zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; }
图 5 - 并行管道
这是工人程序。接收消息,休眠指定的时间,然后表明自己完成任务:
taskwork: Parallel task worker in C
// // Task worker // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket // #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to receive messages on void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // Socket to send messages to void *sender = zmq_socket (context, ZMQ_PUSH); zmq_connect (sender, "tcp://localhost:5558"); // Process tasks forever while (1) { char *string = s_recv (receiver); // Simple progress indicator for the viewer fflush (stdout); printf ("%s.", string); // Do the work s_sleep (atoi (string)); free (string); // Send results to sink s_send (sender, ""); } zmq_close (receiver); zmq_close (sender); zmq_ctx_destroy (context); return 0; }
这是水槽程序。它收集这100个任务,然后计算整个处理消耗的时间,让我们能够证实如果有多个工人时他们真的是并行运转的:
tasksink: Parallel task sink in C
// // Task sink // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket // #include "zhelpers.h" int main (void) { // Prepare our context and socket void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_bind (receiver, "tcp://*:5558"); // Wait for start of batch char *string = s_recv (receiver); free (string); // Start our clock now int64_t start_time = s_clock (); // Process 100 confirmations int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv (receiver); free (string); if ((task_nbr / 10) * 10 == task_nbr) printf (":"); else printf ("."); fflush (stdout); } // Calculate and report duration of batch printf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time)); zmq_close (receiver); zmq_ctx_destroy (context); return 0; }
批处理的平均消耗为5秒。当我们启动1个、2个、4个工人时,我们从水槽取得的结果是这样的:
# 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec
让我们更细致的查看这段代码的某些方面:
- 工人们上游连接通风机,下游连接水槽。这意味着你可以任意添加工人。如果工人绑定到他们的端点,你会需要(a)更多的端点(b)每添加一个工人都得修改通风机或水槽。我们说通风机和水槽是结构中的“稳定”部分,而工人们是“动态”部分。
- 我们不得不在批次的开始与所有工人们都起来运行两者间做出同步。这是一个ØMQ中特别常见的陷阱,也没有简单方案。“连接”方法需要一定时间。所以当一组工人连接到通风机,第一个成功连接的工人会在瞬间得到消息的全部负载,而其他人仍在连接。如果你总是不去同步批次的开始,系统完全不会并行运转。试着移除等待看看。
- 通风机的推送(PUSH)套接字均匀的分发任务到工人们(假定批次开始送出之前他们都已连接)。这叫做负载均衡,我们会再详细看看。
- 水槽的拉取(PULL)套接字均匀的收集工人的成果。这叫做公平队列。
图6 - 公平队列
管道模式也表现出“迟钝加入者”综合症,导致了对推送套接字不能正确负载均衡的控诉。如果你使用推送和拉取,而且其中一个工人比其他人得到更多的消息,那是因为他的推送套接字比别人连接的更快,然后在其他人连接达成之前捕获了一大堆消息。如果你想要正确的负载均衡,你可能想要看看第3章 - 高级请求应答模式中的小节:负载均衡模式。