zoukankan      html  css  js  c++  java
  • NetMQ 发布订阅模式 Publisher-Subscriber

    第一部分引用于:点击打开

    1:简单介绍

    PUB-SUB模式一般处理的都不是系统的关键数据。发布者不关注订阅者是否收到发布的消息,订阅者也不知道自己是否收到了发布者发出的所有消息。你也不知道订阅者何时开始收到消息。类似于广播,收音机。因此逻辑上,它都不是可靠的。这个可以通过与请求响应模型组合来解决。


    图1:简单的发布订阅模式


    图2:与请求响应模式组合的发布订阅模式

    2:案例

    定义IPublishser接口

    namespace NetMQDemoPublisher
    {
        public interface IPublisher:IDisposable
        {
            /// <summary>
            /// 发布消息
            /// </summary>
            /// <param name="topicName">主题</param>
            /// <param name="data">内容</param>
            void Publish(string topicName, string data);
        }
    }

    Publisher实现类

    namespace NetMQDemoPublisher
    {
        public class Publisher:IPublisher
        {
            private object _lockObject = new object();
    
            private PublisherSocket _publisherSocket;
    
            public Publisher(string endPoint)
            {
                _publisherSocket = new PublisherSocket();
                _publisherSocket.Options.SendHighWatermark = 1000;
                _publisherSocket.Bind(endPoint);
            }
            #region Implementation of IDisposable
    
            /// <summary>
            /// 执行与释放或重置非托管资源相关的应用程序定义的任务。
            /// </summary>
            public void Dispose()
            {
                lock (_lockObject)
                {
                    _publisherSocket.Close();
                    _publisherSocket.Dispose();
                }
            }
    
            /// <summary>
            /// 发布消息
            /// </summary>
            /// <param name="topicName">主题</param>
            /// <param name="data">内容</param>
            public void Publish(string topicName, string data)
            {
                lock (_lockObject)
                {
                    _publisherSocket.SendMoreFrame(topicName).SendFrame(data);
                }
            }
    
            #endregion
        }
    }

    Publisher窗口界面

    界面中实现的功能代码

    namespace NetMQDemoPublisher
    {
        public partial class PublisherForm : Form
        {
            private IPublisher publisher;
            public PublisherForm()
            {
                InitializeComponent();
                publisher = new Publisher("tcp://127.0.0.1:8888");
            }
    
            private void button1_Click(object sender, EventArgs e)
            {
                string strContent = textBox1.Text;
                ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}",  strContent));
                listView1.Items.Add(item);
                publisher.Publish("NetMQ", strContent);
            }
        }
    }

    定义ISubscriber接口

    namespace NetMQDemoSubscriber
    {
        public interface ISubscriber:IDisposable
        {
            /// <summary>
            /// 事件
            /// </summary>
            event Action<string, string> Nofity;
    
            /// <summary>
            /// 注册订阅主题
            /// </summary>
            /// <param name="topics"></param>
            void RegisterSubscriber(List<string> topics);
    
            /// <summary>
            /// 注册订阅
            /// </summary>
            void RegisterSbuscriberAll();
    
            /// <summary>
            /// 移除所有订阅消息,并关闭
            /// </summary>
            void RemoveSbuscriberAll();
        }
    }

    Subscriber实现类

    namespace NetMQDemoSubscriber
    {
        public class Subscriber:ISubscriber
        {
            private SubscriberSocket _subscriberSocket = null;
            private string _endpoint = @"tcp://127.0.0.1:9876";
    
            public Subscriber(string endPoint)
            {
                _subscriberSocket = new SubscriberSocket();
                _endpoint = endPoint;
            }
            #region Implementation of IDisposable
    
            /// <summary>
            /// 执行与释放或重置非托管资源相关的应用程序定义的任务。
            /// </summary>
            public void Dispose()
            {
                throw new NotImplementedException();
            }
    
            #endregion
    
            #region Implementation of ISubscriber
    
            public event Action<string, string> Nofity = delegate { };
    
            /// <summary>
            /// 注册订阅主题
            /// </summary>
            /// <param name="topics"></param>
            public void RegisterSubscriber(List<string> topics)
            {
                InnerRegisterSubscriber(topics);
            }
    
            /// <summary>
            /// 注册订阅
            /// </summary>
            public void RegisterSbuscriberAll()
            {
                InnerRegisterSubscriber();
            }
    
            /// <summary>
            /// 移除所有订阅消息,并关闭
            /// </summary>
            public void RemoveSbuscriberAll()
            {
                InnerStop();
            }
    
            #endregion
    
            #region 内部实现
    
            /// <summary>
            /// 注册订阅消息
            /// </summary>
            /// <param name="topics">订阅的主题</param>
            private void InnerRegisterSubscriber(List<string> topics = null)
            {
                InnerStop();
                _subscriberSocket = new SubscriberSocket();
                _subscriberSocket.Options.ReceiveHighWatermark = 1000;
                _subscriberSocket.Connect(_endpoint);
                if (null == topics)
                {
                    _subscriberSocket.SubscribeToAnyTopic();
                }
                else
                {
                    topics.ForEach(item => _subscriberSocket.Subscribe(item));
                }
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        string messageTopicReceived = _subscriberSocket.ReceiveFrameString();
                        string messageReceived = _subscriberSocket.ReceiveFrameString();
                        Nofity(messageTopicReceived, messageReceived);
                    }
                });
            }
    
            /// <summary>
            /// 关闭订阅
            /// </summary>
            private void InnerStop()
            {
                _subscriberSocket.Close();
            }
    
            #endregion
        }
    }

    Subscriber窗口界面

    窗体功能代码

    namespace NetMQDemoSubscriber
    {
        public partial class SubscriberForm : Form
        {
            private ISubscriber subscriber;
            public SubscriberForm()
            {
                InitializeComponent();
            }
    
            private void SubscriberForm_Load(object sender, EventArgs e)
            {
                subscriber = new Subscriber("tcp://127.0.0.1:8888");
                subscriber.RegisterSbuscriberAll();
                subscriber.Nofity+= delegate(string s, string s1)
                {
                    ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1));
                    listView1.Items.Add(item);
                };
            }
        }
    }

    运行后,Publiser开启一个,Subscirber开启三个,进行测试如图

    源码下载

    如果觉得文章好,记得关注一下公众号哟!

    作者:芝麻科技
    出处:芝麻麻雀-Asp.Net学习之路
    技术:C++,C#
    向我打赏
    加我微信,聊一聊技术
  • 相关阅读:
    ps 快捷键
    python中== 和 is 的区别
    微信开发者工具快捷键汇总
    IDEA常用快捷键
    非前后端分离项目使用vue继承,提取公共方法和filters
    Plupload上传插件中文帮助文档
    idea 提示 string template are not supported current JavaScrip Version 的解决
    nginx的分配方式
    nginx相关配置的内容
    Deepin启动界面个性化
  • 原文地址:https://www.cnblogs.com/mzy-google/p/7665656.html
Copyright © 2011-2022 走看看