zoukankan      html  css  js  c++  java
  • 消息中间件

    消息中间件NetMQ结合Protobuf简介

     

    概述

      对于稍微熟悉这两个优秀的项目来说,每个内容单独介绍都不为过,本文只是简介并探讨如何将两部分内容合并起来,使其在某些场景下更适合、更高效。

      NetMQ:ZeroMQ的.Net版本,ZeroMQ简单来说就是局域网内的消息中间件(与MSMQ类似),包括了进程间通讯、点对点通讯、订阅模式通讯等等,底层用更“完美”的Socket实现,ZeroMQ实现了多语言、跨平台、高效率等诸多优势。详细介绍请参考ZeroMQ和NetMQ官方文档:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/

      Protocol Buffer:源自与Google内部的开源项目,作为高效的RPC消息协议,相比较Json、XML协议的消息格式,Protobuf在序列化以及数据大小上都具有十分明显的优势,跨平台,协议可读性也接近于Json等等。这里也推荐一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

    定义Protobuf协议

      Protocol Buffer(简称Protobuf)是以.proto的脚本形式实现的通用语义形式,类似于Json格式:

    复制代码
    message  WeatherMessage 
    { 
        enum CommandType 
        {
            Debug=0;
            Weather=1;
            Other=2;
        }
     
        required CommandType Command=1 [default=Weather];
        optional string Content=2;
    
        message Loaction 
        {
            required int32 East=1;
            required int32 North=2;
        }
    
        repeated Loaction UserLocation=3;
    }
    复制代码

      这里的Message、required(必选属性)、optional(可有可无属性)、repeated(内部嵌套的类型属性)等都是proto的关键字,具体意义以及为关键字的功能大家可以查看官方文档,这里只介绍如何应用,或者Stephen Liu的文章也不错。

      当然,光定义脚本是不能实现应用的,还需要根据特定的编码语言进行描述,这里利用Protobuf-Net来实现.Net平台的协议实现。

      首先,下载软件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

      然后,解压并将刚才的.proto文件复制到文件夹ProtoGen下。

      最后,启动CMD并cd到ProtoGen文件夹目录下,运行命令:

      protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

    (-i指定了输入,-o指定了输出,-ns指定了生成代码的namespace)

      如果,正确的话(当然了,我给出的脚本是不会错的),就会生成一个PBWeatherMessage.cs文件,这样的话就可以将.cs文件加入到项目中当做一个纯粹的类来使用了。

    代码中使用,就是类似于二进制序列化一样,只是这回序列化的是Protobuf专用的序列化方式而已。

      序列化:

    复制代码
                            #region Protobuf
                            var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                            {
                                Command = PBProtocol.WeatherMessage.CommandType.Weather,
                                Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), 
                            };
                            
                            using (var sm = new MemoryStream())
                            {
                                ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                                publisher.Send(sm.ToArray());
                            }
                            #endregion
    复制代码

      反序列化:

    复制代码
                          var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                          var receivedBytes = subscriber.Receive();
                          using (var sm = new MemoryStream(receivedBytes))
                          {
                              weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                          }
    复制代码

      这里就简单介绍完了protobuf协议的使用,下面介绍一下NetMQ+Protobuf的使用。

    NetMQ+Protobuf

      接下来我们来改造下NetMQ Sample中的Publisher-Subscriber模式:

      首先下载从GitHub上下载NetMQ Sample: https://github.com/zeromq/netmq

    或者下载我的示例代码,其中包含了一个No Protobuf的工程,这个是直接摘自原作者的示例代码。

      服务端Publisher:

    复制代码
                using (var context = NetMQContext.Create())// NetMQ全局维护的Content上下文,建议只有一个并且使用完毕后及时回收。
                using (var publisher = context.CreatePublisherSocket())// 从Content上下文中创建CreatePublisherSocket,这里如果用其他四种模式之一需要Create其他类型。
                {
                    publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。
    
                    var rng = new Random();
    
                    while (!stopRequested)
                    {
                        int zipcode =  rng.Next(0, 99999);// 这里模拟一个随机命令编号(如果非10001,客户端直接丢弃此Publisher发布的消息,实现消息过滤)
                        int temperature = rng.Next(-80, 135);
                        int relhumidity = rng.Next(0, 90);
    
                        publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干净整洁。
                    }
                }
    复制代码

      客户端Subscriber:

    复制代码
     using (var context = NetMQContext.Create())// 创建全局NetMQ句柄,建议唯一,使用完毕及时回收。 
                using (var subscriber = context.CreateSubscriberSocket())// 创建Publisher-Subscriber模式的客户端监听。
                {
                    subscriber.Connect("tcp://127.0.0.1:5556");// 连接到指定Socket
                    subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 这里创建消息内容的过滤,如果不包含“zipToSubscribeTo”值则不接收消息。
    
                    for (int i = 0; i < iterations; i++)
                    {
                        string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”开头,则会返回整条信息。
                        Console.Write(".");
    
                        // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                        string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解码。
    
                        int zip = int.Parse(split[0]);
                        if (zip != zipToSubscribeTo)
                        {
                            throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));
                        }
    
                        totalTemp += int.Parse(split[1]);
                        totalHumidity += int.Parse(split[2]);
                    }
                }
    复制代码

      这就是四种模式之一的发布者模式,使用起来很方便,但是这仅仅传递的是基于String的字符串,还不是一个可以序列化的对象,下一步我们将把消息字符串用Protobuf进行序列化与反序列化,来优化我们的消息格式。

    请参考,我的示例代码中的Publisher Pattern工程:

      服务端Publisher:

     View Code

      客户端Subscriber:

    复制代码
     using (var context = NetMQContext.Create())
                  using (var subscriber = context.CreateSubscriberSocket())
                  {
                      subscriber.Connect("tcp://127.0.0.1:5556");
                      subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.
    
                      while (true)
                      {
                          if (curIndex > iterations) break;
    
                          var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                          var receivedBytes = subscriber.Receive();
                          using (var sm = new MemoryStream(receivedBytes))
                          {
                              weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                          }
    
                          // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                          string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
                          int cmdId = int.Parse(split[0]);
                          if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather)
                          {
                              if (cmdId == zipToSubscribeTo)
                              {
                                  curIndex++;
                                  WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));
                                  totalTemp += int.Parse(split[1]);
                                  totalHumidity += int.Parse(split[2]);
                              }
                          }
                      }
    复制代码

      好了,其实单独来看,这两部分内容并为涉及的很深入,只是作为一个技术实践、技术储备,希望其中有问题或者有更好的应用场景,还请各位留言,不胜感谢!

      我的示例代码下载

    冷静下来

    这里补充一些不足:

    1. NetMQ中的过滤:默认NetMQ支持过滤,可是当我们摒弃String类型传递而转向Protobuf格式的时候NetMQ通道是无法解析其内容的,所以我们需要先解析内容,然后手写一些过滤代码,放弃了原生的支持。subscriber.SubscribeToAnyTopic()监听所有非过滤模式。
    2. NetMQ消息持久化:基于ZMQ的NetMQ设计理念中均不支持数据持久化(相比MSMQ而言,NetMQ不能接收当客户端不在线情况下的消息),所以如果需要持久化还需要做其他工作或者转战其他MQ家族。

    引用

    ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

    NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

    Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

    Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

    Protobuf-Net:https://code.google.com/p/protobuf-net/

    作者:Stephen Cui 
    出处:http://www.cnblogs.com/cuiyansong 
    版权声明:文章属于本人及博客园共有,凡是没有标注[转载]的,请在文章末尾加入我的博客地址。 
    如果您觉得文章写的还不错,请点击“推荐一下”,谢谢。

     
    标签: ProtobufNetMQ
  • 相关阅读:
    Recommended Books for Algo Trading in 2020
    Market Making is simpler than you think!
    Top Crypto Market Makers of 2020
    Top Crypto Market Makers, Rated and Reviewed
    爬取伯乐在线文章(五)itemloader
    爬取伯乐在线文章(四)将爬取结果保存到MySQL
    爬取伯乐在线文章(三)爬取所有页面的文章
    爬取伯乐在线文章(二)通过xpath提取源文件中需要的内容
    爬取伯乐在线文章(一)
    爬虫去重策略
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4340628.html
Copyright © 2011-2022 走看看