zoukankan      html  css  js  c++  java
  • Beetle使用Protobuf.net进行对象序列化传输

        Beetle虽然提供了性能出色的二进制序列化功能,但毕竟需要用户通过writer和reader的方法来手动描述过程;事实上计较这些性能的场景并不多,很多时候一个自动序列化功能对程序的编写和维护都起到极其方便的作用。在设计的时候组件是通过接口的方式来描述消息读写操作,因此在扩展对Protobuf支持也是比较方便的。

        Protobuf则Googler制定的一种对象序列化和反序列化方案,他在c++,java,net等不同语言平台都有相关的实现。而在.net下的实现分别有protobuf-net和protobuf-csharp-port;在这里选择了protobuf-net。从测试来看protobuf-net序列化体积和性能都比较出色,详细可以查看:http://www.servicestack.net/benchmarks/NorthwindDatabaseRowsSerialization.1000000-times.2010-02-06.html 

        由于组件只支持实现了IMessage的对象进行处理,所以必须对protobuf-net序列化进行一个IMessage的实现。protobuf-net序列化的信息紧紧是对象的内容不包括对象类型信息在内,简单的说就是protobuf-net是无法根据序列化的信息来反序列化对象,在反序列化的过程必须明确指定这些数据是基于那个类型对象,因此用protobuf-net序列化对象进行网络传输还需制定一个简单协议。以下实现一个MessageAdapter来使用Protobuf.net序列化对象进行对象数据传输。

        以下是实现一个基于protobuf-net的Message

        public class ProtobufAdapter:IMessage
        {
            public object Message
            {
                get;
                set;
            }
            public static bool Send(Beetle.TcpChannel channel, object msg)
            {
                ProtobufAdapter adapter = new ProtobufAdapter();
                adapter.Message = msg;
                return channel.Send(adapter);
    
            }
            public void Load(Beetle.BufferReader reader)
            {
                string type = reader.ReadString();
                Beetle.ByteArraySegment segment = mArrayPool.Pop();
                reader.ReadByteArray(segment);
                using (System.IO.Stream stream = new System.IO.MemoryStream(segment.Array,0,segment.Count))
                {
                    Message = ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, Type.GetType(type));
                }
                mArrayPool.Push(segment);
            }
            public void Save(Beetle.BufferWriter writer)
            {
                writer.Write(Message.GetType().FullName);
                Beetle.ByteArraySegment segment = mArrayPool.Pop();
                using(System.IO.Stream stream = new System.IO.MemoryStream(segment.Array))
                {
                    ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, Message);
                    segment.SetInfo(0, (int)stream.Position);
                    
                }
                writer.Write(segment);
                mArrayPool.Push(segment);
                
            }
            private static ByteArrayPool mArrayPool = new ByteArrayPool(100, 1024 * 8);
        }

         协议的封装很简单,在写入 Protobuf序列化对象之前先写入一个带头长度描述的消息类型名称,然后再写入一个带头长度描述的Protobuf对象序列信息流。而在读取的时候则先把类型描述名称读取出来,然后构建相关名称的类型结合信息流反序列化成具体的对象。

        接下来是针对这个消息实现一个基于头4字节描述大小的协义分包器

        public class HeadSizePackage:Beetle.HeadSizeOfPackage
        {
            public HeadSizePackage(Beetle.TcpChannel channel)
                : base(channel)
            {
            }
            protected override Beetle.IMessage ReadMessageByType(Beetle.BufferReader reader, out object typeTag)
            {
                typeTag = "ProtobufAdapter";
                return new ProtobufAdapter();
              
            }
    
            protected override void WriteMessageType(Beetle.IMessage msg, Beetle.BufferWriter writer)
            {
               
            }
        }

        通过以上协议封装后,具体所产生持协议如下

        协议制定后就可似使用Beetle搭建基于protobuf序列化的对象传输了。

    首先是制定一个Tcp服务

                TcpUtils.Setup(200, 1, 1);
                TcpServer server = new TcpServer();
                server.ChannelConnected += OnConnected;
                server.ChannelDisposed += OnDisposed;
                server.Open(8340);

        以上代码很简单初始化组件信息,构建一个TcpServer并绑定连接接入事件和连接断开事件;然后在所有IP的8340端绑定tcp服务。在连接接入的时候我们需要做些事情。

            static void OnConnected(object sender, ChannelEventArgs e)
            {
                e.Channel.SetPackage<Messages.HeadSizePackage>().ReceiveMessage = OnMessageReceive;
                e.Channel.ChannelError += OnError;
                e.Channel.BeginReceive();
                Console.WriteLine("{0} connected!", e.Channel.EndPoint);
            }

        在接入的事件里针对当前的Tcp通道设置一个协议分包器,并指定对应接收消息事件;同样给Tcp通道绑定一件错误事件,在消息事件处理过程中只要你不开启线程去处理逻辑,其过程发生的异常都会触发这个事件;不必担心异常会影响整个服这方面组件都做到足够安全保证服务可以不间断运行。Tcp通道相关信息设置完成后就可以调用BeginReceive()方法进入数据接收状态。接下来是消息处理事件的代码:

        static void OnMessageReceive(PacketRecieveMessagerArgs e)
            {
                Messages.ProtobufAdapter adapter = (Messages.ProtobufAdapter)e.Message;
                if (adapter.Message is Messages.Register)
                {
                    OnRegister(e.Channel, (Messages.Register)adapter.Message);
                }
                else if (adapter.Message is Messages.Query)
                {
                    OnQuery(e.Channel, (Messages.Query)adapter.Message);
                }
                else{
                }
                
            }
            static void OnRegister(Beetle.TcpChannel channel, Messages.Register e)
            {
                Console.WriteLine("{0} Register\t UserName:{1};PWD:{2};EMail:{3}", channel.EndPoint, e.UserName, e.PWD, e.EMail);
            }
            static void OnQuery(Beetle.TcpChannel channel, Messages.Query e)
            {
                Console.WriteLine("{0} Query \t Customer:{1}", channel.EndPoint, e.CustomerID);
                Messages.Order order = new Messages.Order();
                order.OrderID = 10248;
                order.CustomerID = "WILMK";
                order.EmployeeID = 5;
                order.OrderDate = 629720352000000000;
                order.RequiredDate = 629744544000000000;
                order.ShipAddress = "59 rue de l'Abbaye";
                order.ShipCity = "Reims";
                order.ShipCountry = "France";
                order.ShipName = "Vins et alcools Chevalier";
                order.ShipPostalCode = "51100";
                order.ShipRegion = "RJ";
                Messages.ProtobufAdapter.Send(channel, order);
            }
    

        在这个例子中只处理了两种消息对象,分别是Register和Query;接收到Register只做了一个简单的输出,而在接收到Query则会返回一个Order对象。

    Client实现

        Client要做的事件是先接入这个服务

    private void cmdConnect_Click(object sender, EventArgs e)
            {
                try
                {
                    mChannel = TcpServer.CreateClient(txtIPAddress.Text, 8340);
                    mChannel.SetPackage<Messages.HeadSizePackage>().ReceiveMessage = ReceiveMessage;
                    mChannel.ChannelDisposed += OnDisplsed;
                    mChannel.BeginReceive();
                    cmdRegister.Enabled = true;
                    cmdQuery.Enabled = true;
                    cmdConnect.Enabled = false;
    
                }
                catch (Exception e_)
                {
                    MessageBox.Show(e_.Message);
                }
            }

        在连接创建成功后,同样要做的事情就是设置分包对象和消息接收事件,完成后就调用BeginReceive()连接进行接收数据状态。这个时候我们就可以向服务器发送相关对象。  

    private void cmdRegister_Click(object sender, EventArgs e)
            {
                Messages.Register reg = new Messages.Register();
                reg.EMail = txtEMail.Text;
                reg.PWD = txtPWD.Text;
                reg.UserName = txtUserName.Text;
                Messages.ProtobufAdapter.Send(mChannel, reg);
            }
            private void cmdQuery_Click(object sender, EventArgs e)
            {
                Messages.Query query = new Messages.Query();
                query.CustomerID = txtCustomerID.Text;
                Messages.ProtobufAdapter.Send(mChannel, query);
            }

        这样通过Beetle对Protobuf.net序列化对象进行传输的例子就完成了

    下载完整代码

    C#游戏服务器MMRPG交流群:136485198

    访问Beetlex的Github
  • 相关阅读:
    Android 最火框架XUtils之注解机制具体解释
    Oracle GoldenGate从oracle db 到非oracle db的初始化数据同步的方法
    Java中接口和抽象类的比較
    spring+springmvc+hibernate架构、maven分模块开发样例小项目案例
    配置Java连接池的两种方式:tomcat方式以及spring方式
    Ant报错之out of memory
    Mybatis 框架文档 超具体笔记
    jsp
    HDU 1251 统计难题(字典树)
    HDU 1251 统计难题(字典树)
  • 原文地址:https://www.cnblogs.com/smark/p/2520844.html
Copyright © 2011-2022 走看看