zoukankan      html  css  js  c++  java
  • zeromq学习记录(三)使用ZMQ_PULL ZMQ_PUSH

    /**************************************************************
    技术博客
    http://www.cnblogs.com/itdef/
     
    技术交流群
    群号码:324164944
     
    欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
    **************************************************************/
     
    zeromq 指南里第三个例子
     
    socket在代码中标记为ZMQ_PULL ZMQ_PUSH
    // taskvent_cpp.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    //
    //  Task ventilator in C++
    //  Binds PUSH socket to tcp://localhost:5557
    //  Sends batch of tasks to workers via that socket
    //
    //  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
    //
    #include <zhelpers.hpp>
    #include <zmq.hpp>
    #include <stdlib.h>
    #include <stdio.h>
    #include <iostream>
    
    
    
    int main(int argc, char *argv[])
    {
        zmq::context_t context(1);
    
        //  Socket to send messages on
        zmq::socket_t  sender(context, ZMQ_PUSH);
        sender.bind("tcp://*:5557");
    
        std::cout << "Press Enter when the workers are ready: " << std::endl;
        getchar();
        std::cout << "Sending tasks to workers…\n" << std::endl;
    
        //  The first message is "0" and signals start of batch
        zmq::socket_t sink(context, ZMQ_PUSH);
        sink.connect("tcp://localhost:5558");
        zmq::message_t message(2);
        memcpy(message.data(), "0", 1);
        sink.send(message);
    
        //  Initialize random number generator
        srandom((unsigned)time(NULL));
    
        //  Send 100 tasks
        int task_nbr;
        int total_msec = 0;     //  Total expected cost in msecs
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            int workload;
            //  Random workload from 1 to 100msecs
            workload = within(100) + 1;
            total_msec += workload;
    
            message.rebuild(10);
            memset(message.data(), '\0', 10);
            sprintf((char *)message.data(), "%d", workload);
            sender.send(message);
        }
        std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
        Sleep(1);              //  Give 0MQ time to deliver
        getchar();
        return 0;
    }
    View Code
    // taskwork_cpp.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    #include "zhelpers.hpp"
    #include <string>
    
    int main(int argc, char *argv[])
    {
        zmq::context_t context(1);
    
        //  Socket to receive messages on
        zmq::socket_t receiver(context, ZMQ_PULL);
        receiver.connect("tcp://localhost:5557");
    
        //  Socket to send messages to
        zmq::socket_t sender(context, ZMQ_PUSH);
        sender.connect("tcp://localhost:5558");
    
        //  Process tasks forever
        while (1) {
    
            zmq::message_t message;
            int workload;           //  Workload in msecs
    
            receiver.recv(&message);
            std::string smessage(static_cast<char*>(message.data()), message.size());
    
            std::istringstream iss(smessage);
            iss >> workload;
    
            //  Do the work
            s_sleep(workload);
    
            //  Send results to sink
            message.rebuild();
            sender.send(message);
    
            //  Simple progress indicator for the viewer
            std::cout << "." << std::flush;
        }
        getchar();
        return 0;
    }
    View Code
    // tasksink_cpp.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    //
    //  Task sink in C++
    //  Binds PULL socket to tcp://localhost:5558
    //  Collects results from workers via that socket
    //
    //  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
    //
    //#include <zmq.hpp>
    #include <time.h>
    #include "zhelpers.hpp"
    //#include <sys/time.h>
    #include <iostream>
    
    int main(int argc, char *argv[])
    {
        //  Prepare our context and socket
        zmq::context_t context(1);
        zmq::socket_t receiver(context, ZMQ_PULL);
        receiver.bind("tcp://*:5558");
    
        //  Wait for start of batch
        zmq::message_t message;
        receiver.recv(&message);
    
        //  Start our clock now
        struct timeval tstart;
        win_gettimeofday(&tstart);
    
    
        //  Process 100 confirmations
        int task_nbr;
        int total_msec = 0;     //  Total calculated cost in msecs
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    
            receiver.recv(&message);
            if ((task_nbr / 10) * 10 == task_nbr)
                std::cout << ":" << std::flush;
            else
                std::cout << "." << std::flush;
        }
        //  Calculate and report duration of batch
        struct timeval tend, tdiff;
        win_gettimeofday(&tend);
    
        if (tend.tv_usec < tstart.tv_usec) {
            tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
            tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
        }
        else {
            tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
            tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
        }
        total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
        std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;
        getchar();
        return 0;
    }
    View Code
  • 相关阅读:
    PHP迭代生成器---yield
    array_chunk — 将一个数组分割成多个
    php array_change_key_case
    PHP trait介绍
    mysql视图
    mysql常见内置函数
    MySQL表复制
    二分查找算法(折半查找算法)
    使用SplFixedArray创建固定大小的数组
    Frameset Example
  • 原文地址:https://www.cnblogs.com/itdef/p/5344866.html
Copyright © 2011-2022 走看看