zoukankan      html  css  js  c++  java
  • NetMQ(三): 发布订阅模式 Publisher-Subscriber

    ZeroMQ系列 之NetMQ

    一:zeromq简介

    二:NetMQ 请求响应模式 Request-Reply

    三:NetMQ 发布订阅模式 Publisher-Subscriber

    四:NetMQ 推拉模式 Push-Pull

    NetMQ 发布订阅模式 Publisher-Subscriber

    1:简单介绍

    PUB-SUB模式一般处理的都不是系统的关键数据。发布者不关注订阅者是否收到发布的消息,订阅者也不知道自己是否收到了发布者发出的所有消息。你也不知道订阅者何时开始收到消息。类似于广播,收音机。因此逻辑上,它都不是可靠的。这个可以通过与请求响应模型组合来解决。

    简单的发布订阅模式
    图1:简单的发布订阅模式


    图2:与请求响应模式组合的发布订阅模式

    2:案例

    接下来,我们通过写一个天气预报的例子,来说明发布订阅模式。发布端一直在发布大量的天气信息,订阅端通过过滤字段,接收到想要的数据。

    使用的NetMQ版本是3.3.2.2

    发布端代码:

    主程序:

    class Program
    {
        static void Main(string[] args)
        {
            NetMQPub.Start();
        }
    }  
    

    发布类:

    public class NetMQPub
    {
        readonly static ManualResetEvent _terminateEvent = new ManualResetEvent(false);
        /// <summary>
        /// NetMQ 发布端
        /// </summary>
        public static void Start()
        {
            string[] weathers = new string[6] { "晴朗", "多云", "阴天", "霾", "雨", "雪" };
             
            Console.WriteLine("发布多个地区天气预报:");
    
            using (NetMQContext context = NetMQContext.Create())
            {
                using (var publisher = context.CreatePublisherSocket())
                {
                    publisher.Bind("tcp://127.0.0.1:5556");
    
                    var rng = new Random();
                    string msg;
                    int sleeptime = 1000;//1秒
    
                    ///指定发布的时间间隔,1秒
                    while (_terminateEvent.WaitOne(1000) == false)
                    {
                        //随机生成天气数据
                        int zipcode = rng.Next(0, 99);
                        int temperature = rng.Next(-50, 50);
                        int weatherId = rng.Next(0, 5);
    
                        msg = string.Format("{0} {1} {2}", zipcode, temperature, weathers[weatherId]);
                        publisher.SendFrame(msg);
    
                        Console.WriteLine(msg);
                        Thread.Sleep(sleeptime);
                    }
                }
            }
        }
    
        private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            Console.WriteLine("exit……");
            _terminateEvent.Set();
        }
    }
    

    订阅端代码

    主程序:

    class Program
    {
        static void Main(string[] args)
        {
            NetMQSub.Start();
        }
    }
    

    订阅类:

    public class NetMQSub
    {
        public delegate void GetDataHandler(string message);
        public static event GetDataHandler OnGetData;
        /// <summary>
        /// NetMQ 订阅端
        /// </summary>
        public static void Start()
        {
            var rng = new Random();
            int zipcode = rng.Next(0, 99);
            Console.WriteLine("接收本地天气预报{0}……", zipcode);
    
            OnGetData += new GetDataHandler(ProcessData);
    
            using (var context = NetMQContext.Create())
            {
                using (var subscriber = context.CreateSubscriberSocket())
                {
                    subscriber.Connect("tcp://127.0.0.1:5556");
    				//设置过滤字符串
                    subscriber.Subscribe(zipcode.ToString(CultureInfo.InvariantCulture));
    				//订阅所有的发布端内容
    				//subscriber.Subscribe("");
                    while (true)
                    {
                        string results = subscriber.ReceiveFrameString(Encoding.UTF8);
                        Console.WriteLine(".");
    
                        string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
    
                        int zip = int.Parse(split[0]);
                        if (zip == zipcode)
                        {
                            OnGetData(results);
                        }
                    }
                }
            }
        }
    
        private static void ProcessData(string message)
        {
            Console.WriteLine("天气情况:" + message);
        }
    }
    

    3:总结

    1. 一个发布端可以有多个订阅端
    2. 如果只想要接收指定的数据,订阅端必须要设置过滤字符
    3. 订阅端设置空字符串,订阅所有的发布内容。【You can set topic an empty string to subscribe to everything】
    4. 发布端和订阅端的套接字绑定的地址必须一样的。比如:tcp://127.0.0.1:5556,使用tcp协议,监听端口5556

    4:下载

    NetMQ3.3.3.1例子
    NetMQ3.3.2.2例子

  • 相关阅读:
    java 将表情转换成字符串存入数据库
    java html websocket简单实现
    Java poi读取Excel表格中公式的计算值
    转盘代码,自己搞了一个
    html5 canvas画布
    cat命令查看文件指定行数
    CentOS7 安装 gpbackup 和 gpbackup-s3-plugin 来备份和还原 Greenplum 数据库
    QT5 打包发布Release应用程序
    CentOS7 安装Redis6.0.10
    ES系列(二):基于多播的集群发现实现原理解析
  • 原文地址:https://www.cnblogs.com/weiqinl/p/5461060.html
Copyright © 2011-2022 走看看