zoukankan      html  css  js  c++  java
  • 消息中间件NetMQ结合Protobuf简介

    概述

      对于稍微熟悉这两个优秀的项目来说,每个内容单独介绍都不为过,本文只是简介并探讨如何将两部分内容合并起来,使其在某些场景下更适合、更高效。

      NetMQ:ZeroMQ的.Net版本,ZeroMQ简单来说就是局域网内的消息中间件(与MSMQ类似),包括了进程间通讯、点对点通讯、订阅模式通讯等等,底层用更“完美”的Socket实现,ZeroMQ实现了多语言、跨平台、高效率等诸多优势。详细介绍请参考ZeroMQ和NetMQ官方文档:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/

      Protocol Buffer:源自与Google内部的开源项目,作为高效的RPC消息协议,相比较Json、XML协议的消息格式,Protobuf在序列化以及数据大小上都具有十分明显的优势,跨平台,协议可读性也接近于Json等等。这里也推荐一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

    定义Protobuf协议

      Protocol Buffer(简称Protobuf)是以.proto的脚本形式实现的通用语义形式,类似于Json格式:

    message  WeatherMessage 
    { 
        enum CommandType 
        {
            Debug=0;
            Weather=1;
            Other=2;
        }
     
        required CommandType Command=1 [default=Weather];
        optional string Content=2;
    
        message Loaction 
        {
            required int32 East=1;
            required int32 North=2;
        }
    
        repeated Loaction UserLocation=3;
    }

      这里的Message、required(必选属性)、optional(可有可无属性)、repeated(内部嵌套的类型属性)等都是proto的关键字,具体意义以及为关键字的功能大家可以查看官方文档,这里只介绍如何应用,或者Stephen Liu的文章也不错。

      当然,光定义脚本是不能实现应用的,还需要根据特定的编码语言进行描述,这里利用Protobuf-Net来实现.Net平台的协议实现。

      首先,下载软件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

      然后,解压并将刚才的.proto文件复制到文件夹ProtoGen下。

      最后,启动CMD并cd到ProtoGen文件夹目录下,运行命令:

      protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

    (-i指定了输入,-o指定了输出,-ns指定了生成代码的namespace)

      如果,正确的话(当然了,我给出的脚本是不会错的),就会生成一个PBWeatherMessage.cs文件,这样的话就可以将.cs文件加入到项目中当做一个纯粹的类来使用了。

    代码中使用,就是类似于二进制序列化一样,只是这回序列化的是Protobuf专用的序列化方式而已。

      序列化:

                            #region Protobuf
                            var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                            {
                                Command = PBProtocol.WeatherMessage.CommandType.Weather,
                                Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), 
                            };
                            
                            using (var sm = new MemoryStream())
                            {
                                ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                                publisher.Send(sm.ToArray());
                            }
                            #endregion

      反序列化:

                          var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                          var receivedBytes = subscriber.Receive();
                          using (var sm = new MemoryStream(receivedBytes))
                          {
                              weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                          }

      这里就简单介绍完了protobuf协议的使用,下面介绍一下NetMQ+Protobuf的使用。

    NetMQ+Protobuf

      接下来我们来改造下NetMQ Sample中的Publisher-Subscriber模式:

      首先下载从GitHub上下载NetMQ Sample: https://github.com/zeromq/netmq

    或者下载我的示例代码,其中包含了一个No Protobuf的工程,这个是直接摘自原作者的示例代码。

      服务端Publisher:

                using (var context = NetMQContext.Create())// NetMQ全局维护的Content上下文,建议只有一个并且使用完毕后及时回收。
                using (var publisher = context.CreatePublisherSocket())// 从Content上下文中创建CreatePublisherSocket,这里如果用其他四种模式之一需要Create其他类型。
                {
                    publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。
    
                    var rng = new Random();
    
                    while (!stopRequested)
                    {
                        int zipcode =  rng.Next(0, 99999);// 这里模拟一个随机命令编号(如果非10001,客户端直接丢弃此Publisher发布的消息,实现消息过滤)
                        int temperature = rng.Next(-80, 135);
                        int relhumidity = rng.Next(0, 90);
    
                        publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干净整洁。
                    }
                }

      客户端Subscriber:

     using (var context = NetMQContext.Create())// 创建全局NetMQ句柄,建议唯一,使用完毕及时回收。 
                using (var subscriber = context.CreateSubscriberSocket())// 创建Publisher-Subscriber模式的客户端监听。
                {
                    subscriber.Connect("tcp://127.0.0.1:5556");// 连接到指定Socket
                    subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 这里创建消息内容的过滤,如果不包含“zipToSubscribeTo”值则不接收消息。
    
                    for (int i = 0; i < iterations; i++)
                    {
                        string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”开头,则会返回整条信息。
                        Console.Write(".");
    
                        // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                        string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解码。
    
                        int zip = int.Parse(split[0]);
                        if (zip != zipToSubscribeTo)
                        {
                            throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));
                        }
    
                        totalTemp += int.Parse(split[1]);
                        totalHumidity += int.Parse(split[2]);
                    }
                }

      这就是四种模式之一的发布者模式,使用起来很方便,但是这仅仅传递的是基于String的字符串,还不是一个可以序列化的对象,下一步我们将把消息字符串用Protobuf进行序列化与反序列化,来优化我们的消息格式。

    请参考,我的示例代码中的Publisher Pattern工程:

      服务端Publisher:

                    using (var context = NetMQContext.Create())
                    using (var publisher = context.CreatePublisherSocket()) 
                    {
                        publisher.Bind("tcp://127.0.0.1:5556");
                        var rng = new Random();
                        while (!stopRequested)
                        { 
                            int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);
                            int temperature = rng.Next(-80, 135);
                            int relhumidity = rng.Next(0, 90);
    
                            #region Protobuf
                            var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                            {
                                Command = PBProtocol.WeatherMessage.CommandType.Weather,
                                Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), 
                            };
                            
                            using (var sm = new MemoryStream())
                            {
                                ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                                publisher.Send(sm.ToArray());
                            }
                            #endregion
    
                            // publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));
                           
                            WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));
                            System.Threading.Thread.Sleep(100);
                        }
                    }
    View Code

      客户端Subscriber:

     using (var context = NetMQContext.Create())
                  using (var subscriber = context.CreateSubscriberSocket())
                  {
                      subscriber.Connect("tcp://127.0.0.1:5556");
                      subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.
    
                      while (true)
                      {
                          if (curIndex > iterations) break;
    
                          var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                          var receivedBytes = subscriber.Receive();
                          using (var sm = new MemoryStream(receivedBytes))
                          {
                              weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                          }
    
                          // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                          string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
                          int cmdId = int.Parse(split[0]);
                          if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather)
                          {
                              if (cmdId == zipToSubscribeTo)
                              {
                                  curIndex++;
                                  WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));
                                  totalTemp += int.Parse(split[1]);
                                  totalHumidity += int.Parse(split[2]);
                              }
                          }
                      }

      好了,其实单独来看,这两部分内容并为涉及的很深入,只是作为一个技术实践、技术储备,希望其中有问题或者有更好的应用场景,还请各位留言,不胜感谢!

      我的示例代码下载

    冷静下来

    这里补充一些不足:

    1. NetMQ中的过滤:默认NetMQ支持过滤,可是当我们摒弃String类型传递而转向Protobuf格式的时候NetMQ通道是无法解析其内容的,所以我们需要先解析内容,然后手写一些过滤代码,放弃了原生的支持。subscriber.SubscribeToAnyTopic()监听所有非过滤模式。
    2. NetMQ消息持久化:基于ZMQ的NetMQ设计理念中均不支持数据持久化(相比MSMQ而言,NetMQ不能接收当客户端不在线情况下的消息),所以如果需要持久化还需要做其他工作或者转战其他MQ家族。

    引用

    ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

    NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

    Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

    Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

    Protobuf-Net:https://code.google.com/p/protobuf-net/

  • 相关阅读:
    Neurosurgeon: Collaborative Intelligence Between the Cloud and Mobile Edge
    Adversarial Attack Type I: Cheat Classifiers by Significant Changes
    Federated Optimization for Heterogeneous Networks
    On the Convergence of FedAvg on Non-IID Data
    联邦学习综述
    Federated Learning: Challenges, Methods, and Future Directions
    Hop: Heterogeneity-aware Decentralized Training
    C++文件操作
    c++: internal compiler error: Killed (program cc1plus)
    SCAFFOLD: Stochastic Controlled Averaging for On-Device Federated Learning
  • 原文地址:https://www.cnblogs.com/cuiyansong/p/4326047.html
Copyright © 2011-2022 走看看