zoukankan      html  css  js  c++  java
  • SuperSocket与SuperSocket.ClientEngine实现Protobuf协议

    • 参考资料说明

    SuperSocket文档 http://docs.supersocket.net/

    Protobuf语言参考 https://developers.google.com/protocol-buffers/docs/proto

    单消息多类型解决方案 https://developers.google.com/protocol-buffers/docs/techniques#

    主要资料(非常感谢) http://www.cnblogs.com/caipeiyu/p/5559112.html

    使用的ProtocolBuffers http://code.google.com/p/protobuf-csharp-port

    关于MsgPack的协议 https://my.oschina.net/caipeiyu/blog/512437


    • Proto
     1 message CallMessage
     2 {
     3     optional string content = 1;
     4 }
     5 
     6 message BackMessage
     7 {
     8     optional string content = 1;
     9 }
    10 
    11 message PersonMessage
    12 {
    13     required int32 id = 1;
    14     required string name = 2;
    15     enum Sex
    16     {
    17         Male = 1;
    18         Female = 2;
    19     }
    20     required Sex sex = 3 [default = Male];
    21     required uint32 age = 4;
    22     required string phone = 5;
    23 }
    24 
    25 import "BackMessage.proto";
    26 import "CallMessage.proto";
    27 import "PersonMessage.proto";
    28 
    29 message DefeatMessage
    30 {
    31     enum Type
    32     {
    33         CallMessage = 1;
    34         BackMessage = 2;
    35         PersonMessage = 3;
    36     }
    37     required Type type = 1;
    38     optional CallMessage callMessage = 2;
    39     optional BackMessage backMessage = 3;
    40     optional PersonMessage personMessage = 4;
    41 }
    • 生成C#代码
    1 protoc --descriptor_set_out=DefeatMessage.protobin --proto_path=./ --include_imports DefeatMessage.proto
    2 protogen DefeatMessage.protobin
    • Server
      1 namespace SuperSocketProtoServer.Protocol
      2 {
      3     public class ProtobufRequestInfo: IRequestInfo
      4     {
      5         public string Key { get; }
      6         public DefeatMessage.Types.Type Type { get; }
      7 
      8         public DefeatMessage Body { get; }
      9 
     10         public ProtobufRequestInfo(DefeatMessage.Types.Type type, DefeatMessage body)
     11         {
     12             Type = type;
     13             Key = type.ToString();
     14             Body = body;
     15         }
     16     }
     17 }
     18 
     19 namespace SuperSocketProtoServer.Protocol
     20 {
     21     public class ProtobufReceiveFilter: IReceiveFilter<ProtobufRequestInfo>, IOffsetAdapter, IReceiveFilterInitializer
     22     {
     23         private int _origOffset;
     24         private int _offsetDelta;
     25         private int _leftBufferSize;
     26 
     27         public void Initialize(IAppServer appServer, IAppSession session)
     28         {
     29             _origOffset = session.SocketSession.OrigReceiveOffset;
     30         }
     31 
     32         public int OffsetDelta
     33         {
     34             get { return _offsetDelta; }
     35         }
     36 
     37         /// <summary>
     38         /// 数据包解析
     39         /// </summary>
     40         /// <param name="readBuffer">接收缓冲区</param>
     41         /// <param name="offset">接收到的数据在缓冲区的起始位置</param>
     42         /// <param name="length">本轮接收到的数据长度</param>
     43         /// <param name="toBeCopied">为接收到的数据重新创建一个备份而不是直接使用接收缓冲区</param>
     44         /// <param name="rest">接收缓冲区未被解析的数据</param>
     45         /// <returns></returns>
     46         public ProtobufRequestInfo Filter(byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest)
     47         {
     48             rest = 0;
     49             // 重新计算缓冲区的起始位置,前一次解析还有剩下没有解析的数据就需要把起始位置移到之前最后要解析的那个位置
     50             var readOffset = offset - _offsetDelta;
     51             // 由google.protocolbuffers提供
     52             CodedInputStream cis = CodedInputStream.CreateInstance(readBuffer, readOffset, length);
     53             // 计算数据包的长度,不包含Length本身
     54             int varint32 = (int) cis.ReadRawVarint32();
     55             if (varint32 <= 0) return null;
     56 
     57             // 计算协议里面Length占用字节
     58             int headLen = (int) (cis.Position - readOffset);
     59             // 本轮解析完缓冲后剩余没有解析的数据大小
     60             rest = length - varint32 - headLen + _leftBufferSize;
     61 
     62             // 缓冲里面的数据足够本轮解析
     63             if (rest >= 0)
     64             {
     65                 byte[] body = cis.ReadRawBytes(varint32);
     66                 DefeatMessage message = DefeatMessage.ParseFrom(body);
     67                 ProtobufRequestInfo requestInfo = new ProtobufRequestInfo(message.Type, message);
     68                 _offsetDelta = 0;
     69                 _leftBufferSize = 0;
     70 
     71                 return requestInfo;
     72             }
     73             // 缓冲里面的数据不够本次解析[tcp分包传送]
     74             else
     75             {
     76                 _leftBufferSize += length;
     77                 _offsetDelta = _leftBufferSize;
     78                 rest = 0;
     79 
     80                 var expectedOffset = offset + length;
     81                 var newOffset = _origOffset + _offsetDelta;
     82                 if (newOffset < expectedOffset) Buffer.BlockCopy(readBuffer, offset - _leftBufferSize + length, readBuffer, _origOffset, _leftBufferSize);
     83             }
     84 
     85             return null;
     86         }
     87 
     88         public void Reset()
     89         {
     90             _offsetDelta = 0;
     91             _leftBufferSize = 0;
     92         }
     93 
     94         public int LeftBufferSize
     95         {
     96             get { return _leftBufferSize; }
     97         }
     98 
     99         public IReceiveFilter<ProtobufRequestInfo> NextReceiveFilter { get; }
    100 
    101         public FilterState State { get; }
    102     }
    103 }
    104 
    105 namespace SuperSocketProtoServer.Protocol
    106 {
    107     public class ProtobufAppSession:AppSession<ProtobufAppSession, ProtobufRequestInfo>
    108     {
    109         public ProtobufAppSession() { }
    110     }
    111 }
    112 
    113 namespace SuperSocketProtoServer.Protocol
    114 {
    115     public class ProtobufAppServer: AppServer<ProtobufAppSession, ProtobufRequestInfo>
    116     {
    117         public ProtobufAppServer()
    118             : base(new DefaultReceiveFilterFactory<ProtobufReceiveFilter, ProtobufRequestInfo>())
    119         {
    120             
    121         }
    122     }
    123 }
    • Server Command
     1 namespace SuperSocketProtoServer.Protocol.Command
     2 {
     3     public class BackMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo>
     4     {
     5         public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
     6         {
     7             Console.WriteLine("BackMessage:{0}", requestInfo.Body.BackMessage.Content);
     8         }
     9     }
    10 }
    11 
    12 namespace SuperSocketProtoServer.Protocol.Command
    13 {
    14     public class CallMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo>
    15     {
    16         public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
    17         {            
    18             Console.WriteLine("CallMessage:{0}", requestInfo.Body.CallMessage.Content);
    19             var backMessage = global::BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket")
    20                 .Build();
    21             var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage)
    22                 .SetBackMessage(backMessage).Build();
    23             using (var stream = new MemoryStream())
    24             {
    25                 CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
    26                 cos.WriteMessageNoTag(message);
    27                 cos.Flush();
    28                 byte[] data = stream.ToArray();
    29                 session.Send(new ArraySegment<byte>(data));
    30             }
    31         }
    32     }
    33 }
    34 
    35 namespace SuperSocketProtoServer.Protocol.Command
    36 {
    37     public class PersonMessage:CommandBase<ProtobufAppSession, ProtobufRequestInfo>
    38     {
    39         public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
    40         {
    41             Console.WriteLine("Recv Person Message From Client.");
    42             Console.WriteLine("person's id = {0}, person's name = {1}, person's sex = {2}, person's phone = {3}", 
    43                 requestInfo.Body.PersonMessage.Id, 
    44                 requestInfo.Body.PersonMessage.Name, 
    45                 requestInfo.Body.PersonMessage.Sex, 
    46                 requestInfo.Body.PersonMessage.Phone);
    47         }
    48     }
    49 }
    • Client
     1 namespace SuperSocketProtoClient.Protocol
     2 {
     3     public class ProtobufPackageInfo: IPackageInfo
     4     {
     5         public string Key { get; }
     6         public DefeatMessage.Types.Type Type { get; }
     7         public DefeatMessage Body { get; }
     8 
     9         public ProtobufPackageInfo(DefeatMessage.Types.Type type, DefeatMessage body)
    10         {
    11             Type = type;
    12             Key = type.ToString();
    13             Body = body;
    14         }
    15     }
    16 }
    17 
    18 namespace SuperSocketProtoClient.Protocol
    19 {
    20     public class ProtobufReceiveFilter: IReceiveFilter<ProtobufPackageInfo>
    21     {
    22         /// <summary>
    23         /// 数据解析
    24         /// BufferList已经实现了分包处理
    25         /// </summary>
    26         /// <param name="data">数据缓冲区</param>
    27         /// <param name="rest">缓冲区剩余数据</param>
    28         public ProtobufPackageInfo Filter(BufferList data, out int rest)
    29         {
    30             rest = 0;
    31             var buffStream = new BufferStream();
    32             buffStream.Initialize(data);
    33 
    34             var stream = CodedInputStream.CreateInstance(buffStream);
    35             var varint32 = (int)stream.ReadRawVarint32();
    36             if (varint32 <= 0) return default(ProtobufPackageInfo);
    37 
    38             var total = data.Total;
    39             var packageLen = varint32 + (int)stream.Position;
    40 
    41             if (total >= packageLen)
    42             {
    43                 rest = total - packageLen;
    44                 var body = stream.ReadRawBytes(varint32);
    45                 var message = DefeatMessage.ParseFrom(body);
    46                 var requestInfo = new ProtobufPackageInfo(message.Type, message);
    47                 return requestInfo;
    48             }
    49 
    50             return default(ProtobufPackageInfo);
    51         }
    52 
    53         public void Reset()
    54         {
    55             NextReceiveFilter = null;
    56             State = FilterState.Normal;
    57         }
    58 
    59         public IReceiveFilter<ProtobufPackageInfo> NextReceiveFilter { get; protected set; }
    60         public FilterState State { get; protected set; }
    61     }
    62 }
    • Server Entrance
     1 namespace SuperSocketProtoServer
     2 {
     3     class Program
     4     {
     5         static void Main(string[] args)
     6         {
     7             Console.WriteLine("Press any key to start the server!");
     8 
     9             Console.ReadKey();
    10             Console.WriteLine();
    11 
    12             var appServer = new ProtobufAppServer();
    13             //appServer.NewRequestReceived += AppServerOnNewRequestReceived;
    14 
    15             try
    16             {
    17                 //Setup the appServer
    18                 if (!appServer.Setup(2017)) //Setup with listening port
    19                 {
    20                     Console.WriteLine("Failed to setup!");
    21                     Console.ReadKey();
    22                     return;
    23                 }
    24             }catch(Exception e) { Console.WriteLine(e);}
    25 
    26             Console.WriteLine();
    27 
    28             //Try to start the appServer
    29             if (!appServer.Start())
    30             {
    31                 Console.WriteLine("Failed to start!");
    32                 Console.ReadKey();
    33                 return;
    34             }
    35 
    36             Console.WriteLine("The server started successfully, press key 'q' to stop it!");
    37 
    38             while (Console.ReadKey().KeyChar != 'q')
    39             {
    40                 Console.WriteLine();
    41                 continue;
    42             }
    43 
    44             //Stop the appServer
    45             appServer.Stop();
    46 
    47             Console.WriteLine("The server was stopped!");
    48             Console.ReadKey();
    49         }
    50 
    51         private static void AppServerOnNewRequestReceived(ProtobufAppSession session, ProtobufRequestInfo requestinfo)
    52         {
    53             switch (requestinfo.Type)
    54             {
    55                 case DefeatMessage.Types.Type.BackMessage:
    56                     Console.WriteLine("BackMessage:{0}", requestinfo.Body.BackMessage.Content);
    57                     break;
    58                 case DefeatMessage.Types.Type.CallMessage:
    59                     Console.WriteLine("CallMessage:{0}", requestinfo.Body.CallMessage.Content);
    60                     var backMessage = BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket")
    61                         .Build();
    62                     var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage)
    63                         .SetBackMessage(backMessage).Build();
    64                     using (var stream = new MemoryStream())
    65                     {
    66                         CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
    67                         cos.WriteMessageNoTag(message);
    68                         cos.Flush();
    69                         byte[] data = stream.ToArray();
    70                         session.Send(new ArraySegment<byte>(data));
    71                     }
    72                     break;
    73             }
    74         }
    75     }
    76 }
    View Code
    • Client Entrance
     1 namespace SuperSocketProtoClient
     2 {
     3     class Program
     4     {
     5         static void Main(string[] args)
     6         {
     7             EasyClient client = new EasyClient();
     8             client.Initialize(new ProtobufReceiveFilter(), packageInfo =>
     9             {
    10                 switch (packageInfo.Type)
    11                 {
    12                     case DefeatMessage.Types.Type.BackMessage:
    13                         Console.WriteLine("BackMessage:{0}", packageInfo.Body.BackMessage.Content);
    14                         break;
    15                     case DefeatMessage.Types.Type.CallMessage:
    16                         Console.WriteLine("CallMessage:{0}", packageInfo.Body.CallMessage.Content);
    17                         break;
    18 
    19                 }
    20             });
    21             var flag = client.ConnectAsync(new DnsEndPoint("127.0.0.1", 2017));
    22             if (flag.Result)
    23             {
    24                 var callMessage = CallMessage.CreateBuilder()
    25                     .SetContent("Hello I am form C# client by SuperSocket ClientEngine").Build();
    26                 var message = DefeatMessage.CreateBuilder()
    27                     .SetType(DefeatMessage.Types.Type.CallMessage)
    28                     .SetCallMessage(callMessage).Build();
    29 
    30                 using (var stream = new MemoryStream())
    31                 {
    32 
    33                     CodedOutputStream os = CodedOutputStream.CreateInstance(stream);
    34 
    35                     os.WriteMessageNoTag(message);
    36 
    37                     os.Flush();
    38 
    39                     byte[] data = stream.ToArray();
    40                     client.Send(new ArraySegment<byte>(data));
    41 
    42                 }
    43 
    44                 Thread.Sleep(2000);
    45 
    46                 // 发送PersonMessage
    47                 var personMessage = PersonMessage.CreateBuilder()
    48                     .SetId(123).SetAge(33).SetSex(PersonMessage.Types.Sex.Male).SetName("zstudio").SetPhone("110").Build();
    49                 message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.PersonMessage)
    50                     .SetPersonMessage(personMessage).Build();
    51                 using (var stream = new MemoryStream())
    52                 {
    53                     CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
    54                     cos.WriteMessageNoTag(message);
    55                     cos.Flush();
    56                     byte[] data = stream.ToArray();
    57                     client.Send(new ArraySegment<byte>(data));
    58                 }
    59 
    60             }
    61             Console.ReadKey();
    62         }
    63     }
    64 }
    View Code
    • 执行结果

    • 工程、资源、资料打包:http://pan.baidu.com/s/1qXB9aEg
    • 更多项目相关细节和详情参考博客:http://www.cnblogs.com/caipeiyu/p/5559112.html, 在此也表示对博主由衷的感谢!!!

     

  • 相关阅读:
    SDOI2008]仪仗队
    洛谷P1414 又是毕业季II
    P3865 【模板】ST表
    [HAOI2007]理想的正方形
    noip 2011 选择客栈
    [AHOI2009]中国象棋
    洛谷P3387 【模板】缩点
    [SCOI2005]最大子矩阵
    [CQOI2009]叶子的染色
    LibreOJ #116. 有源汇有上下界最大流
  • 原文地址:https://www.cnblogs.com/linxmouse/p/7905575.html
Copyright © 2011-2022 走看看