zoukankan      html  css  js  c++  java
  • 消息队列zmq常用通信模式

    zmq是一个消息队列。可以在进程内、进程间、TCP、多播中,以消息为单位传输数据,而不是socket的字节流。官方主页上有下载、使用、文档,蛮全的。

    常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。

    Request-Reply

    request

    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REQ);
    //Send the request
    zmq::message_t request(6);
    memcpy ((void *)request.data(), "Hello", 5);
    socket.send(request);
    //Get the reply
    zmq::message_t reply;
    socket.recv(&reply);


    server

    zmq::context_t context (1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind ("tcp://*:5555");
    while (true) {
      zmq::message_t request;
      //  Wait for next request from client
      socket.recv (&request);
      std::cout << "Received Hello" << std::endl;
      //  Do some 'work'
      sleep (1);
      //  Send reply back to client
      zmq::message_t reply (5);
      memcpy ((void *) reply.data (), "World", 5);
      socket.send (reply);
    }
    Publish-Subscribe

    publisher

    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    publisher.send(message);

    subscriber

    zmq::context_t context (1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");
    const char *filter = "";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
    zmq::message_t update;
    subscriber.recv(&update);
    Parallel Pipeline

    ventilator
    zmq::context_t context (1);
    //  Socket to send messages on
    zmq::socket_t  sender(context, ZMQ_PUSH);
    sender.bind("tcp://*:5557");
    //  通知sink开始处理任务
    zmq::socket_t sink(context, ZMQ_PUSH);
    sink.connect("tcp://localhost:5558");
    zmq::message_t message(2);
    memcpy(message.data(), "0", 1);
    sink.send(message);
    //开始往pipeline发送数据
    message.rebuild(10);
    sprintf ((char *) message.data(), "%d", workload);
    sender.send(message);    

    worker
    zmq::context_t context(1);
    
    //  Socket to receive messages on
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");
    
    //  Socket to send messages to
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.connect("tcp://localhost:5558");
    
    //  Process tasks forever
    while (1) {
      receiver.recv(&message);
      //  Send results to sink
      message.rebuild();
      sender.send(message);
    }

    sink
    //  Prepare our context and socket
    zmq::context_t context(1);
    zmq::socket_t receiver(context,ZMQ_PULL);
    receiver.bind("tcp://*:5558");
    
    //  Wait for start of batch
    zmq::message_t message;
    receiver.recv(&message);
    //receive from worker
    receiver.recv(&message);




  • 相关阅读:
    再理解HDFS的存储机制
    C实现头插法和尾插法来构建单链表(不带头结点)
    linux系统编程:线程同步-相互排斥量(mutex)
    基于github for windows&amp;github的团队协作基本操作
    分治法求众数问题 (配图)
    hdu1576 mod 运算的逆元
    Android5.0(lollipop)新特性介绍(一)
    jenkins详解(一)
    手机APP测试点总结
    App测试方法总结
  • 原文地址:https://www.cnblogs.com/whuqin/p/4982023.html
Copyright © 2011-2022 走看看