// 2015-02-27T11:40+08:00 // ROUTER-to-DEALER example #include "../zhelpers.h" #include <pthread.h> #define NBR_WORKERS 10 static void * worker_task(void *args) { void *context = zmq_ctx_new(); void *worker = zmq_socket(context, ZMQ_REQ); #if (defined (WIN32)) s_set_id(worker, (intptr_t)args); #else s_set_id(worker); // Set a printable identity #endif zmq_connect (worker, "ipc://routing.ipc"); int total = 0; while (1) { // Tell the broker we're ready for work s_send(worker, "ready"); char *workload = s_recv(worker); // .skip int finished = (strcmp(workload, "END") == 0); free(workload); if (finished) { printf("Completed: %d tasks ", total); break; } total++; // Do some random work s_sleep(randof(1000) + 1); } zmq_close(worker); zmq_ctx_destroy(context); return NULL; } // .split main task // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. int main(void) { void *context = zmq_ctx_new(); void *broker = zmq_socket(context, ZMQ_ROUTER); zmq_bind(broker, "ipc://routing.ipc"); srandom((unsigned)time(NULL)); int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { pthread_t worker; pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr); } int task_nbr; for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) { // 最近最少使用的worker就在消息队列中 char *address = s_recv(broker); char *empty = s_recv(broker); free(empty); char *ready = s_recv(broker); free(ready); s_sendmore(broker, address); s_sendmore(broker, ""); s_send(broker, "This is the workload"); free(address); } // 通知所有REQ套接字结束工作 for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { char *address = s_recv(broker); char *empty = s_recv(broker); free(empty); char *ready = s_recv(broker); free(ready); s_sendmore(broker, address); s_sendmore(broker, ""); s_send(broker, "END"); free(address); } zmq_close(broker); zmq_ctx_destroy(context); return 0; } // .until
out:
Completed: 9 tasks Completed: 10 tasks Completed: 9 tasks Completed: 11 tasks Completed: 9 tasks Completed: 12 tasks Completed: 12 tasks Completed: 10 tasks Completed: 8 tasks Completed: 10 tasks
关于以上代码的几点说明:
-
我们不需要像前一个例子一样等待一段时间,因为REQ套接字会明确告诉ROUTER它已经准备好了。
-
我们使用了zhelpers.h提供的s_set_id()函数来为套接字生成一个可打印的字符串标识,这是为了让例子简单一些。在现实环境中,REQ套接字都是匿名的,你需要直接调用zmq_recv()和zmq_send()来处理消息,因为s_recv()和s_send()只能处理字符串标识的套接字。
-
更糟的是,我们使用了随机的标识,不要在现实环境中使用随机标识的持久套接字,这样做会将节点消耗殆尽。
-
如果你只是将上面的代码拷贝过来,没有充分理解,那你就像是看到蜘蛛人从屋顶上飞下来,你也照着做了,后果自负吧。
在将消息路由给REQ套接字时,需要注意一定的格式,即地址-空帧-消息: