ROUTER-DEALER是一种最简单的路由方式。将ROUTER和多个DEALER相连接,用一种合适的算法来决定如何分发消息给DEALER。DEALER可以是一个黑洞(只负责处理消息,不给任何返回)、代理(将消息转发给其他节点)或是服务(会发送返回信息)。
如果你要求DEALER能够进行回复,那就要保证只有一个ROUTER连接到DEALER,因为DEALER并不知道哪个特定的节点在联系它,如果有多个节点,它会做负载均衡,将消息分发出去。但如果DEALER是一个黑洞,那就可以连接任何数量的节点。
ROUTER-DEALER路由可以用来做什么呢?如果DEALER会将它完成任务的时间回复给ROUTER,那ROUTER就可以知道这个DEALER的处理速度有多快了。因为ROUTER和DEALER都是异步的套接字,所以我们要用zmq_poll()来处理这种情况。
下面例子中的两个DEALER不会返回消息给ROUTER,我们的路由采用加权随机算法:发送两倍多的信息给其中的一个DEALER。
rtdealer.c
// 2015-02-27T11:40+08:00 // ROUTER-to-DEALER example #include "../zhelpers.h" #include <pthread.h> #define NBR_WORKERS 10 static void * worker_task(void *args) { void *context = zmq_ctx_new(); void *worker = zmq_socket(context, ZMQ_DEALER); #if (defined (WIN32)) s_set_id(worker, (intptr_t)args); #else s_set_id(worker); // Set a printable identity #endif zmq_connect (worker, "tcp://localhost:5671"); int total = 0; while (1) { // Tell the broker we're ready for work s_sendmore(worker, ""); s_send(worker, "Hi Boss"); // Get workload from broker, until finished free(s_recv(worker)); // Envelope delimiter char *workload = s_recv(worker); // .skip int finished = (strcmp(workload, "Fired!") == 0); free(workload); if (finished) { printf("Completed: %d tasks ", total); break; } total++; // Do some random work s_sleep(randof(500) + 1); } zmq_close(worker); zmq_ctx_destroy(context); return NULL; } // .split main task // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. int main(void) { void *context = zmq_ctx_new(); void *broker = zmq_socket(context, ZMQ_ROUTER); zmq_bind(broker, "tcp://*:5671"); srandom((unsigned)time(NULL)); int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { pthread_t worker; pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker char *identity = s_recv(broker); s_sendmore(broker, identity); free(identity); free(s_recv(broker)); // Envelope delimiter free(s_recv(broker)); // Response from worker s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } zmq_close(broker); zmq_ctx_destroy(context); return 0; } // .until
out:
Completed: 16 tasks Completed: 20 tasks Completed: 18 tasks Completed: 17 tasks Completed: 21 tasks Completed: 15 tasks Completed: 18 tasks Completed: 21 tasks Completed: 22 tasks Completed: 18 tasks