zoukankan      html  css  js  c++  java
  • SuperSocket源码解析之消息处理

    一 简述

      Tcp消息的处理本身是与Tcp消息传输过程独立的,是消息的两个不同阶段,从前面的会话生命周期我们已经知道消息的传输主要有SocketSession实现,而真正处理则交由AppSession实现,SuperSocket的层次划分也是非常清晰明了。

      SuperSocket消息处理主要流程:接收=》原始过滤=》协议解析=》命令路由并执行=》找不到命令则直接一分不动发给客户端

    二 消息接收

    1 开始接收

    代码位置:AsyncSocketSession=》StartReceive

     1  private void StartReceive(SocketAsyncEventArgs e)
     2         {
     3             StartReceive(e, 0);
     4         }
     5 
     6         private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
     7         {
     8             bool willRaiseEvent = false;
     9 
    10             try
    11             {
    12                 if (offsetDelta < 0 || offsetDelta >= Config.ReceiveBufferSize)
    13                     throw new ArgumentException(string.Format("Illigal offsetDelta: {0}", offsetDelta), "offsetDelta");
    14 
    15                 var predictOffset = SocketAsyncProxy.OrigOffset + offsetDelta;
    16 
    17                 if (e.Offset != predictOffset)
    18                 {
    19                     e.SetBuffer(predictOffset, Config.ReceiveBufferSize - offsetDelta);
    20                 }
    21 
    22                 if (IsInClosingOrClosed)
    23                     return;
    24 
    25                 OnReceiveStarted();
    26                 willRaiseEvent = Client.ReceiveAsync(e);
    27             }
    28             catch (Exception exc)
    29             {
    30                 LogError(exc);
    31 
    32                 OnReceiveError(CloseReason.SocketError);
    33                 return;
    34             }
    35 
    36             if (!willRaiseEvent)
    37             {
    38                 ProcessReceive(e);
    39             }
    40         }
    View Code

    在接收数据前后触发ReceiveStarted,ReceiveEnd事件

    2 接收有消息处理并转入处理

     1 public void ProcessReceive(SocketAsyncEventArgs e)
     2         {
     3             if (!ProcessCompleted(e))
     4             {
     5                 OnReceiveError(CloseReason.ClientClosing);
     6                 return;
     7             }
     8 
     9             OnReceiveEnded();
    10 
    11             int offsetDelta;
    12 
    13             try
    14             {
    15                 //交给app会话处理接收到的数据,而appsession又交给接收请求处理器处理
    16                 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
    17             }
    18             catch (Exception exc)
    19             {
    20                 LogError("Protocol error", exc);
    21                 this.Close(CloseReason.ProtocolError);
    22                 return;
    23             }
    24 
    25             //read the next block of data sent from the client
    26             StartReceive(e, offsetDelta);
    27         }
    View Code

    三 消息处理

    1 入口:按照协议解析,每次只处理一个数据包,因此便有了如下的入口代码

     1 int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied)
     2         {
     3             int rest, offsetDelta;
     4 
     5             while (true)
     6             {
     7                 var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta);
     8 
     9                 if (requestInfo != null)
    10                 {
    11                     try
    12                     {
    13                         AppServer.ExecuteCommand(this, requestInfo);
    14                     }
    15                     catch (Exception e)
    16                     {
    17                         HandleException(e);
    18                     }
    19                 }
    20 
    21                 if (rest <= 0)
    22                 {
    23                     return offsetDelta;
    24                 }
    25 
    26                 //Still have data has not been processed
    27                 offset = offset + length - rest;
    28                 length = rest;
    29             }
    30         }
    View Code

    2 原始过滤

     此处以原始数据接收事件方式,预留给AppServer子类处理该原始数据包,意味着可以对原始数据包执行一次拦截处理,如果经过一些逻辑处理后不能满足,则将终止该数据包继续传播

    3 协议解析

    协议解析由AppSession的m_ReceiveFilter成员完成,该成员的实例化有2种实现方式

    默认实例化 :默认命令行协议,该协议举例

    下面是2行命令,行使用 作为结束标识,也就是回车换行,命令行内部使用空格分隔,第一个控制之前 如echo 为消息头也就是key,其后的字符串也使用空格分隔,作为参数,其整体=body

    echo cc xxd    
    cdc ds mmm

    解析后为2个StringRequestInfo对象

     

     默认的协议解析工厂

     

     实例化默认协议解析对象

     通过AppServer构造函数传递解析工厂

     4 命令路由

    上面我们已经解析到客户端发来的2条命令分别为echo 参数为cc xxd;cdc ds mmm;

    其中key分别为echo和cdc,对于命令模式来说命令本身使用name字段进行标识,如果我们的key与name匹配那么我们即可路由到一个已有的命令,进而执行该命令,来看代码

     对于能够路由到的命令我们执行命令

     对于路由失败来说SuperSocket又是怎么做的呢?

    找不到命令来处理该消息,将该命令名字发送会客户端,意思说明服务器没有实现该命令,那么命令从何而来?

    5  命令

    这还的从CommandLoader说起,CommandLoader又追溯到AppServer的构建过程

     

    在没有显示配置CommandLoader的情况下默认为ReflectCommandLoader

     ReflectCommandLoader创建命令

      ReflectCommandLoader将扫描应用程序根目录下所有程序,并将实现了命令接口的实例通过反射创建出来

     1 public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
     2         {
     3             commands = null;
     4 
     5             var commandAssemblies = new List<Assembly>();
     6 
     7             if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
     8                 commandAssemblies.Add(m_AppServer.GetType().Assembly);
     9 
    10             string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
    11 
    12             if (!string.IsNullOrEmpty(commandAssembly))
    13             {
    14                 OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
    15                 return false;
    16             }
    17 
    18 
    19             if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
    20             {
    21                 try
    22                 {
    23                     var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());
    24 
    25                     if (definedAssemblies.Any())
    26                         commandAssemblies.AddRange(definedAssemblies);
    27                 }
    28                 catch (Exception e)
    29                 {
    30                     OnError(new Exception("Failed to load defined command assemblies!", e));
    31                     return false;
    32                 }
    33             }
    34 
    35             if (!commandAssemblies.Any())
    36             {
    37                 commandAssemblies.Add(Assembly.GetEntryAssembly());
    38             }
    39 
    40             var outputCommands = new List<TCommand>();
    41 
    42             foreach (var assembly in commandAssemblies)
    43             {
    44                 try
    45                 {
    46                     outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
    47                 }
    48                 catch (Exception exc)
    49                 {
    50                     OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
    51                     return false;
    52                 }
    53             }
    54 
    55             commands = outputCommands;
    56 
    57             return true;
    58         }
    View Code

    因此默认的我们只需要定义一些实现命令接口ICommand<TAppSession, TRequestInfo>的命令出来,

    6 自定义命令

     直接继承CommandBase抽象类即可

     到此SuperSocket对消息的处理流程差不多就是这样了,SuperSocket的框架的使用 我们只需要自定义自己的AppServer,以及AppServer配套的AppSession,ReciverFilter,以及命令等即可,这些在官方提供的例子中已经很清晰

  • 相关阅读:
    Delphi接口
    delphi cxgrid导出excel去除货币符号
    DelphiXE4- System.IOUtils.TDirectory笔记查询后缀名为dll的文件
    Kivy中文显示
    AppDomain与进程、线程、Assembly之间关系
    Attributes(2): Displaying attributes for a class.(显示类属性)
    Attributes(2): Displaying attributes for a class.(显示类属性)
    Attributes(1):反射Attribute并输出
    大数据乘法
    Qt中利用QDomDocument读写xml小Demo
  • 原文地址:https://www.cnblogs.com/rjjs/p/5623932.html
Copyright © 2011-2022 走看看