zoukankan      html  css  js  c++  java
  • 搭建消息队列

    搭建消息队列(一)

    针对高并发,可扩展的互联网架构,搭建消息队列(一)

      想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.

      ZeroMQ介绍:(也拼写作 ØMQ、 0MQ 或 ZMQ) 是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个 C++ 编写成的单个库文件libzmq.dll, 可以链接到任何应用程序中。如果要在.NET 环境中使用,我们需要用到一个C#编写的名为 clrzmq.dll 包装库。ZeroMQ可以在 Windows、 OS X 和 Linux 等多种操作系统上运行, C、 C++、C#、 Java、 Python 等语言都可以编写ZeroMQ 应用程序这使得不同平台上的不同应用程序之间可以相互通讯。

    1、环境搭建:

      codeproject专题,下载对应的Download binaries - 377.6 KB,解压缩到你的指定路径。

      这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:

      PM> Install-Package NetMQ

      为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。

      待安装好后,系统会自动添加NetMQ的引用。

      

      可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。

      先看个简单的demo,初步了解一下:

      

    复制代码
     1 class Program
     2 {
     3   static void Main(string[] args)
     4   {
     5     using (NetMQContext context = NetMQContext.Create())
     6     {
     7       Task serverTask = Task.Factory.StartNew(() =>Server(context));
     8       Task clientTask = Task.Factory.StartNew(() => Client(context));
     9       Task.WaitAll(serverTask, clientTask);
    10     }
    11   }
    12  
    13   static void Server(NetMQContext context)
    14   {
    15     using (NetMQSocket serverSocket = context.CreateResponseSocket())
    16     {
    17       serverSocket.Bind("tcp://*:5555");
    18  
    19       while (true)
    20       {
    21         string message = serverSocket.ReceiveString();
    22  
    23         Console.WriteLine("Receive message {0}", message);
    24  
    25         serverSocket.Send("World");          
    26  
    27         if (message == "exit")
    28         {
    29           break;
    30         }
    31       }
    32     }      
    33   }
    34  
    35   static void Client(NetMQContext context)
    36   {
    37     using (NetMQSocket clientSocket = context.CreateRequestSocket())
    38     {
    39       clientSocket.Connect("tcp://127.0.0.1:5555");
    40  
    41       while (true)
    42       {
    43         Console.WriteLine("Please enter your message:");
    44         string message = Console.ReadLine();
    45         clientSocket.Send(message);
    46  
    47         string answer = clientSocket.ReceiveString();
    48  
    49         Console.WriteLine("Answer from server: {0}", answer);
    50  
    51         if (message == "exit")
    52         {
    53           break;
    54         }
    55       }
    56     }
    57   }
    58 }
    复制代码

      代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。

      这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:

      Server:

      

    复制代码
     1 /// <summary>
     2     /// Mq服务端
     3     /// </summary>
     4     public class OctMQServer : IDisposable
     5     {
     6         public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
     7 
     8         protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
     9         {
    10             EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
    11             if (handler != null) handler(this, e);
    12         }
    13 
    14         private int _port;
    15         private NetMQSocket _serverSocket;
    16         private ServerType _type;
    17         private NetMQContext _context;
    18 
    19         public void Init(int port, ServerType type)
    20         {
    21             _type = type;
    22             _port = port;
    23             _context = NetMQContext.Create();
    24             CreateServer();
    25         }
    26 
    27         void CreateServer()
    28         {
    29             switch (_type)
    30             {
    31                 case ServerType.Response:
    32                     _serverSocket = _context.CreateResponseSocket(); break;
    33                 case ServerType.Pub:
    34                     _serverSocket = _context.CreatePushSocket(); break;
    35                 case ServerType.Router:
    36                     _serverSocket = _context.CreateRouterSocket(); break;
    37                 case ServerType.Stream:
    38                     _serverSocket = _context.CreateStreamSocket(); break;
    39                 case ServerType.Push:
    40                     _serverSocket = _context.CreatePushSocket(); break;
    41                 case ServerType.XPub:
    42                     _serverSocket = _context.CreateXPublisherSocket(); break;
    43                 default:
    44                     _serverSocket = _context.CreateResponseSocket(); break;
    45             }
    46             _serverSocket.Bind("tcp://*:" + _port);
    47             Task.Factory.StartNew(() =>
    48             AsyncRead(_serverSocket), TaskCreationOptions.LongRunning);
    49         }
    50 
    51         private void AsyncRead(NetMQSocket serverSocket)
    52         {
    53             while (true)
    54             {
    55                 var msg = serverSocket.ReceiveMessage();
    56                 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(serverSocket, msg));
    57             }
    58         }
    59 
    60 
    61         public NetMQSocket Server
    62         {
    63             get { return _serverSocket; }
    64         }
    65 
    66         public void Dispose()
    67         {
    68             _serverSocket.Dispose();
    69             _context.Dispose();
    70         }
    71 
    72         public void Send(NetMQMessage msg)
    73         {
    74             _serverSocket.SendMessage(msg);
    75         }
    76 
    77         public NetMQMessage CreateMessage()
    78         {
    79             return new NetMQMessage();
    80         }
    81     }
    复制代码

      这样,使用者就可以根据枚举进行服务端的创建, 不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。

      Client:

      

    复制代码
      1  /// <summary>
      2     /// MQ客户端
      3     /// </summary>
      4     public class OctMQClient:IDisposable
      5     {
      6         public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
      7 
      8         protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
      9         {
     10             EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
     11             if (handler != null) handler(this, e);
     12         }
     13 
     14         private int _port;
     15         private NetMQSocket _clientSocket;
     16         private ClientType _type;
     17         private NetMQContext _context;
     18         private string _ip;
     19         private Task task;
     20         public void Init(string ip, int port, ClientType type)
     21         {
     22             _type = type;
     23             _ip = ip;
     24             _port = port;
     25             _context = NetMQContext.Create();
     26             CreateClient();
     27         }
     28 
     29         void CreateClient()
     30         {
     31             switch (_type)
     32             {
     33                 case ClientType.Request:
     34                     _clientSocket = _context.CreateRequestSocket(); break;
     35                 case ClientType.Sub:
     36                     _clientSocket = _context.CreateSubscriberSocket(); break;
     37                 case ClientType.Dealer:
     38                     _clientSocket = _context.CreateDealerSocket(); break;
     39                 case ClientType.Stream:
     40                     _clientSocket = _context.CreateStreamSocket(); break;
     41                 case ClientType.Pull:
     42                     _clientSocket = _context.CreatePullSocket(); break;
     43                 case ClientType.XSub:
     44                     _clientSocket = _context.CreateXSubscriberSocket(); break;
     45                 default:
     46                     _clientSocket = _context.CreateRequestSocket(); break;
     47             }
     48             _clientSocket.Connect("tcp://" + _ip + ":" + _port);
     49         }
     50 
     51         public void StartAsyncReceive()
     52         {
     53             task = Task.Factory.StartNew(() =>
     54          AsyncRead(_clientSocket), TaskCreationOptions.LongRunning);
     55            
     56         }
     57 
     58         private void AsyncRead(NetMQSocket cSocket)
     59         {
     60             while (true)
     61             {
     62                 var msg = cSocket.ReceiveMessage();
     63                 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(cSocket, msg));
     64             }
     65         }
     66 
     67         public NetMQSocket Client
     68         {
     69             get { return _clientSocket; }
     70         }
     71 
     72         public T GetClient<T>() where T : NetMQSocket
     73         {
     74             return (T)_clientSocket;
     75         }
     76 
     77         public void Send(NetMQMessage msg)
     78         {
     79             _clientSocket.SendMessage(msg);
     80         }
     81 
     82         public NetMQMessage CreateMessage()
     83         {
     84             return new NetMQMessage();
     85         }
     86 
     87         public NetMQMessage ReceiveMessage()
     88         {
     89             return _clientSocket.ReceiveMessage();
     90         }
     91 
     92         public void Dispose()
     93         {
     94             _clientSocket.Dispose();
     95             _context.Dispose();
     96             if (task != null)
     97             {
     98                 task.Dispose();
     99             }
    100         }
    101     }
    复制代码

      客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。

      先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。

      Server Console

      这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。

    复制代码
      1 class Program
      2     {
      3         private static OctMQServer _server;
      4         static ServerType _type;
      5         static void Main(string[] args)
      6         {
      7             AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;
      8             CreateCmd();
      9 
     10         }
     11 
     12         /// <summary>
     13         /// 创建mq对象
     14         /// </summary>
     15         static void Create()
     16         {
     17             _server = new OctMQServer();
     18             _server.OnReceive += server_OnReceive;
     19             _server.Init(5555, _type);
     20            
     21         }
     22 
     23         /// <summary>
     24         /// 选择类型
     25         /// </summary>
     26         private static void CreateCmd()
     27         {
     28             Csl.Wl(ConsoleColor.Red, "请选择您要创建的MQ服务端类型");
     29             Csl.Wl(ConsoleColor.Yellow, "1.PUB   2.REP");
     30             var key = System.Console.ReadLine();
     31             switch (key)
     32             {
     33                 case "1":
     34                     {
     35                         _type = ServerType.Pub;
     36                         Create();
     37                         Cmd();
     38                     }
     39 
     40                     break;
     41                 case "2":
     42                     _type = ServerType.Response;
     43                     Create();
     44                     Cmd();
     45                     break;
     46                 default:
     47                     {
     48                         CreateCmd();
     49                        
     50                     }
     51                     break;
     52             }
     53         }
     54 
     55         static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
     56         {
     57             Csl.WlEx((Exception)e.ExceptionObject);
     58         }
     59 
     60         /// <summary>
     61         /// 接收消息
     62         /// </summary>
     63         private static void Cmd()
     64         {
     65             if (_type == ServerType.Pub)
     66             {
     67                 Csl.Wl(ConsoleColor.Red, "请输入您要发个订阅者的信息主题与信息用空格分开");
     68             }
     69             else
     70             {
     71                 Csl.Wl(ConsoleColor.Red, "等待消息");
     72             }
     73             var cmd = System.Console.ReadLine();
     74 
     75             switch (cmd)
     76             {
     77                 case "exit":
     78                     Csl.Wl("正在关闭应用程序。。。等待最后一个心跳执行完成。。。");
     79                     _server.Dispose();
     80                     break;
     81 
     82                 default:
     83                     {
     84                         var str = cmd.Split(' ');
     85                         var msg = _server.CreateMessage();
     86                         msg.Append(str[0],Encoding.UTF8);
     87                         msg.Append(str[1],Encoding.UTF8);
     88                         _server.Send(msg);
     89                         Cmd();
     90                         break;
     91                     }
     92                     return;
     93             }
     94         }
     95 
     96         static void server_OnReceive(object sender, DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
     97         {
     98             var msg = e.Arg2;
     99             var server = e.Arg1;
    100             Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8));
    101             server.Send("你好,您的请求已处理,并返回消息及处理结果",Encoding.UTF8);
    102         }
    103     }
    复制代码

      Client Form 

      客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:

      响应式:

      

      订阅者式:

      

      不会做gif ,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1-》2->3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。

      然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。

      贴出客户端代码:

      

    复制代码
     1 public partial class Form1 : Form
     2     {
     3         public Form1()
     4         {
     5             InitializeComponent();
     6             Csl.Init();
     7         }
     8 
     9         /// <summary>
    10         /// mq客户端
    11         /// </summary>
    12         private OctMQClient _client;
    13 
    14         /// <summary>
    15         /// 订阅者模式连接
    16         /// </summary>
    17         /// <param name="sender"></param>
    18         /// <param name="e"></param>
    19         private void btnConn_Click(object sender, EventArgs e)
    20         {
    21             _client = new OctMQClient();
    22             _client.OnReceive += _client_OnReceive;
    23 
    24             _client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Sub);
    25             var sub = (SubscriberSocket) _client.Client;
    26             sub.Subscribe(txtTop.Text);
    27             _client.StartAsyncReceive();
    28             
    29         }
    30 
    31         /// <summary>
    32         /// 订阅者模式受到消息
    33         /// </summary>
    34         /// <param name="sender"></param>
    35         /// <param name="e"></param>
    36         void _client_OnReceive(object sender, Core.Args.DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
    37         {
    38             var msg = e.Arg2;
    39             Csl.Wl("主题:"+msg.Pop().ConvertToString(Encoding.UTF8));
    40             Csl.Wl("内容:" + msg.Pop().ConvertToString(Encoding.UTF8));
    41         }
    42 
    43         /// <summary>
    44         /// 发送响应消息
    45         /// </summary>
    46         /// <param name="sender"></param>
    47         /// <param name="e"></param>
    48         private void btnSend_Click(object sender, EventArgs e)
    49         {
    50             using (_client = new OctMQClient())
    51             {
    52                 _client.Init(txtip.Text, int.Parse(txtport.Text), ClientType.Request);
    53                 var content = txtContent.Text;
    54                 var msg = _client.CreateMessage();
    55                 msg.Append(content, Encoding.UTF8);
    56                 _client.Send(msg);
    57                 var rmsg = _client.ReceiveMessage();
    58                 var reqStr = rmsg.Pop().ConvertToString(Encoding.UTF8);
    59                 Csl.Wl(reqStr);
    60             }
    61             
    62         }
    63 
    64         /// <summary>
    65         /// 释放资源
    66         /// </summary>
    67         /// <param name="e"></param>
    68         protected override void OnClosed(EventArgs e)
    69         {
    70             base.OnClosed(e);
    71             if (_client != null)
    72             {
    73                 _client.Dispose();                
    74             }
    75         }
    76     }
    复制代码

      好了,大家先消化一下,等系列写完了,我会提交到github上。下一期,会写一些并发情况下的应用。

     
     
    分类: ZeroMQ
  • 相关阅读:
    C/C++知识点清单01
    数独GUI程序项目实现
    第三章--Win32程序的执行单元(部分概念及代码讲解)(中-线程同步
    《JAVA程序设计与实例》记录与归纳--继承与多态
    《JAVA程序设计与实例》记录与归纳--类与对象
    C语言范例学习06-上
    单一职责原则
    牙疼真要命,牙坏真烧钱
    跳槽 & 思维导图
    成熟的 Git 分支模型
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4173263.html
Copyright © 2011-2022 走看看