zoukankan      html  css  js  c++  java
  • NetMQ(四): 推拉模式 Push-Pull

    ZeroMQ系列 之NetMQ

    一:zeromq简介

    二:NetMQ 请求响应模式 Request-Reply

    三:NetMQ 发布订阅模式 Publisher-Subscriber

    四:NetMQ 推拉模式 Push-Pull

    NetMQ 推拉模式 Push-Pull

    1:简介

    推拉模式,也叫 管道模式”Parallel Pipeline”。想象一下这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine比较适合于这种场景,他的结构图,如图1所示

    图1 官方图

    Ventilator,在管道中生产任务;
    Worker ,处理任务;
    Sink,收集Worker处理的结果。

    2:案例

    下面有三个对象Ventilator 消息分发者,Worker 消息处理者,Sink 接受Worker处理消息后返回的结果,耗时的计算处理工作是交给Worker的,如果开多个Worker.exe,可以提升处理速度,Worker的最终目的是分布式计算,部署到多台PC上面,把计算工作交给他们去做(在分布式爬虫上面,每个Worker相当于一个爬虫)。
    下面案例结构,如图2所示:


    图2

    源码:

    Ventilator

        static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
    
    
            //socket to send messages on
            using (NetMQSocket sender = new DealerSocket())
            {
                sender.Bind("tcp://*:5557");
    
                using (var sink = new DealerSocket())
                {
                    sink.Connect("tcp://localhost:5558");
    
                    Console.WriteLine("Press enter when worker are ready");
                    Console.ReadLine();
    
                    //the first message it "0" and signals start of batch
                    //see the Sink.csproj Program.cs file for where this is used
                    Console.WriteLine("Sending start of batch to Sink");
                    sink.SendFrame("0");
    
                    Console.WriteLine("Sending tasks to workers");
    
                    //initialise random number generator
                    Random rand = new Random(0);
    
                    //expected costs in Ms
                    int totalMs = 0;
    
                    //send 100 tasks (workload for tasks, is just some random sleep time that
                    //the workers can perform, in real life each work would do more than sleep
                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        //Random workload from 1 to 100 msec
                        int workload = rand.Next(0, 100);
                        totalMs += workload;
                        Console.WriteLine("Workload : {0}", workload);
                        sender.SendFrame(workload.ToString());
                    }
                    Console.WriteLine("Total expected cost : {0} msec", totalMs);
                    Console.WriteLine("Press Enter to quit");
                    Console.ReadLine();
                }
            }
        }  
    

    Worker

        static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");
    
    
            //socket to receive messages on
            using (var receiver = new DealerSocket())
            {
                receiver.Connect("tcp://localhost:5557");
    
                //socket to send messages on
                using (var sender = new DealerSocket())
                {
                    sender.Connect("tcp://localhost:5558");
    
                    //process tasks forever
                    while (true)
                    {
                        //workload from the vetilator is a simple delay
                        //to simulate some work being done, see
                        //Ventilator.csproj Proram.cs for the workload sent
                        //In real life some more meaningful work would be done
                        string workload = receiver.ReceiveString();
    
                        //simulate some work being done
                        Thread.Sleep(int.Parse(workload));
    
                        //send results to sink, sink just needs to know worker
                        //is done, message content is not important, just the precence of
                        //a message means worker is done. 
                        //See Sink.csproj Proram.cs 
                        Console.WriteLine("Sending to Sink");
                        sender.SendFrame(string.Empty);
                    }
                }
            }
        }
    

    Sink

        static void Main(string[] args)
        {
            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");
    
            //socket to receive messages on
            using (var receiver = new DealerSocket())
            {
                receiver.Bind("tcp://localhost:5558");
    
                //wait for start of batch (see Ventilator.csproj Program.cs)
                var startOfBatchTrigger = receiver.ReceiveString();
                Console.WriteLine("Seen start of batch");
    
                //Start our clock now
                Stopwatch watch = new Stopwatch();
                watch.Start();
    
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    var workerDoneTrigger = receiver.ReceiveString();
                    if (taskNumber % 10 == 0)
                    {
                        Console.Write(":");
                    }
                    else
                    {
                        Console.Write(".");
                    }
                }
                watch.Stop();
                //Calculate and report duration of batch
                Console.WriteLine();
                Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                Console.ReadLine();
            }
        }  
    

    效果图:

    处理一个Ventilator任务,可以使用数量不同的worker:

    一个worker:
    在我本地计算机上,耗时 5566 mesc

    二个worker:
    在我本地计算机上,耗时2917 mesc

    三个worker:
    在我本地计算机上,耗时2031 msec

    3:总结

    1. 使用的NetMQ版本是3.3.3.1,实例化DealerSocket,来创建socket。
    2. Ventilator分发工作到不同的Worker,实现负载均衡。
    3. Ventilator和Sink是静态部分,Worker是动态的。开启更多的Worker,理论上完成工作更快。
    4. Sink收集Worker处理的结果.

    4:下载

    NetMQ3.3.3.1例子
    NetMQ3.3.2.2例子

  • 相关阅读:
    docker cacti
    zabbix5.0官方部署+监控nginx+mysql
    CentOS7 Haproxy2.2.2部署示例
    LVS(DR) + keepalived
    linux备份整个系统
    docker部署OceanBase 试用版
    NextCloud开源视频会议平台
    idea使用maven proguard 对ssm项目进行代码混合详细步骤
    C# 范围运算符[1..2]
    对象是否为空的扩展方法
  • 原文地址:https://www.cnblogs.com/weiqinl/p/5461184.html
Copyright © 2011-2022 走看看