下面一个示例程序中,我们将使用ZMQ进行超级计算,也就是并行处理模型:
- 任务分发器会生成大量可以并行计算的任务;
- 有一组worker会处理这些任务;
- 结果收集器会在末端接收所有worker的处理结果,进行汇总。
taskvent:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> static int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } #define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) int main (void) { // [0]创建对象 void *context = zmq_ctx_new (); // [1]发送消息的嵌套字 void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // [2]分发消息的嵌套字 void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers... "); // [3]发送开始信号 s_send (sink, "0"); // [4]初始化随机数 srandom ((unsigned) time (NULL)); // [5]发送100个任务 int task_nbr; int total_msec = 0; // 预计执行时间(毫秒) for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // 随机产生1-100毫秒 workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } printf ("Total expected cost: %d msec ", total_msec); zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; }
taskwork:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> void s_sleep (int msecs) { struct timespec t; t.tv_sec = msecs / 1000; t.tv_nsec = (msecs % 1000) * 1000000; nanosleep (&t, NULL); } int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } char *s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; buffer[size] = '