zoukankan      html  css  js  c++  java
  • NetMQ发布订阅C#示例

          NetMQ (ZeroMQ to .Net),ØMQ号称史上最快中间件。它对socket通信进行了封装,使得我们不需要写socket函数调用就能完成复杂的网络通信。和一般意义上的消息队列产品不同的是,它没有消息队列服务器,而更像是一个网络通信库。从网络通信的角度看,它处于会话层之上,应用层之下。【ZeroMQ 官网】:http://zeromq.org

          ØMQ有4个基本通信模型:分别是一对一结对模型(Exclusive-Pair)、请求回应模型(Request-Reply)、发布订阅模型(Publish-Subscribe)、推拉模型(Push-Pull)。

    Request-reply pattern 请求-回复模型 

    • 这种模型主要用于从客户端向一个或多个服务实例发送请求,然后等待紧接着对于每个请求的回复
    • 里面又具体分了ZMQ_REQ ZMQ_REP ZMQ_DEALER ZMQ_ROUTER 
    • REQ 发送完消息后,必须接收一个回应消息后,才能发送新的消息
    • REP当接收消息时,都会返回一个消息 

    Publish-subscribe pattern 发布-订阅模式

    • 这种模式主要用于1对多的数据发布(一个发布者,多个订阅者)
    • 里面又具体分了ZMQ_PUB ZMQ_SUB 
    • PUB发送消息给所有的SUB。如果此时SUB没有启动,下次启动时会漏掉该消息 

    Pipeline pattern 管道模式

    • 这种模式主要用于发布数据到由管道排列的节点上面,数据总是沿着管道流动。每个管道阶段连接了至少一个节点
    • 里面又具体分了ZMQ_PUSH ZMQ_PULL
    • 一个1对N队列的实现,PUSH将数据放入队列,PULL从队列中不取出数据。数据会负载均衡的发送给每一个PULL 

    Exclusive pair pattern 独立对模式

    • peer to peer 模式。主要用于进程内部线程间通信
    • 里面又具体分了ZMQ_PAIR
    • 线程间1-to-1队列的实现,采用了lock free实现,所以速度很快

    下面是订阅发布的示例代码:

    发布服务端:

      public static class NetMQPub
        {
            readonly static ManualResetEvent _terminateEvent = new ManualResetEvent(false);
            /// <summary>
            /// NetMQ 发布模式
            /// </summary>
            public static void Start()
            {
                string[] wethers = new string[5] {"晴朗","多云","阴天","小雨","暴雪" };
    
                //CTRL+C 退出程序
                Console.CancelKeyPress += Console_CancelKeyPress;
                Console.WriteLine("发布多个地区天气预报:");
    
                using (var context = NetMQContext.Create())
                {
                    using (var publisher = context.CreatePublisherSocket())
                    {
                        publisher.Bind("tcp://127.0.0.1:5556");
    
                        var rng = new Random();
                        string msg;
                        int sleeptime = 10;
    
                        while (_terminateEvent.WaitOne(0) == false)
                        {
                            //随机生成天气数据
                            int zipcode = rng.Next(0, 99);
                            int temperature = rng.Next(-50, 50);
                            int wetherId = rng.Next(0, 4);
    
                            msg = string.Format("{0} {1} {2}", zipcode, temperature, wethers[wetherId]);
                            publisher.Send(msg,Encoding.UTF8, zmq.SendReceiveOptions.DontWait);
    
                            Console.WriteLine(msg);
                            Thread.Sleep(sleeptime);
                        }
                    }
                }
            }
    
            static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
            {
                Console.WriteLine("exit...");
                _terminateEvent.Set();
            }
        }
    

      订阅客户端,可启动多个实例来模拟接收天气信息:

      public static class NetMQSub
        {
            public delegate void GetDataHandler(string message);
            public static event GetDataHandler OnGetData;
    
            /// <summary>
            /// NetMQ 订阅模式
            /// </summary>
            public static void Start()
            {            
                var rng = new Random();
                int zipcode = rng.Next(0, 99);
                Console.WriteLine("接收本地天气预报 {0}...", zipcode);
    
                OnGetData += new GetDataHandler(ProcessData);
        
                using (var context = NetMQContext.Create())
                using (var subscriber = context.CreateSubscriberSocket())
                {
                    subscriber.Connect("tcp://127.0.0.1:5556");
                    subscriber.Subscribe(zipcode.ToString(CultureInfo.InvariantCulture));
    
                    while(true)
                    {
                        string results = subscriber.ReceiveString(Encoding.UTF8);
                        Console.Write(".");
    
                        string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
    
                        int zip = int.Parse(split[0]);
                        if (zip == zipcode)
                        {
                            OnGetData(results);
                        }                  
                    }
                }
            }
    
            public static void ProcessData(string msg)
            {
                Console.WriteLine("天气情况:" + msg);
            }
        }
    

      

  • 相关阅读:
    【转载】PyQt QSetting保存设置
    Python WebDriver自动化测试
    Pyqt 控件的信号槽事件定义方法
    Pyqt SpVoice朗读功能
    Pyqt 国际化多语言支持
    MQTT研究之EMQ:【wireshark抓包分析】
    MQTT研究之EMQ:【SSL双向验证】
    ES6模板字符串【${}配合反单引号一起用】
    express中遇到的一个小问题“403”
    MQTT研究之EMQ:【基础研究】
  • 原文地址:https://www.cnblogs.com/dreign/p/4260306.html
Copyright © 2011-2022 走看看