ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍 和 ESFramework介绍之(23)―― AgileTcp 两篇文章中介绍了Tcp组件,相对于Tcp来说,Udp要简单许多,所以我在这里打算用一篇文章来介绍它。需要首先提出的是,ESFramework.Network命名空间下的所有直接类(即,不包括ESFramework.Network.Tcp和ESFramework.Network.Udp命名空间下的类)即可用于Tcp也可用于Udp。这中可复用性,将Tcp和Udp的区别仅仅隔离在最底层的通信组件中。
由于Udp组件相对简单,所以其实现策略与Tcp组件稍有不同,最主要的不同之处在于,Udp组件不需要分配器组件,而是直接将接收到消息转交给处理器工厂。大家应该还记得这个图:
当将其应用于Udp时,我们可以认为,消息分派器组件已经直接内嵌于Udp组件中了,所以上图对于Udp还是适用的。
下面我们首先看看Udp组件的接口定义IEsbUdp:
对应的代码如下:
public interface IEsbUdp :IEsbUdpSender
{
int Port{set ;}
IContractHelper ContractHelper{set ;}
INetMessageHook NetMessageHook{set ;}
IDataDealerFactory DataDealerFactory{set ;}
void Initialize() ;
void Start() ;
void Stop() ;
event CbInvalidMsg InvalidMsgRecieved ;
event CbServiceCommitted ServiceCommitted ;
}
public delegate void CbServiceCommitted(string userID ,NetMessage msg) ;
public delegate void CbInvalidMsg(byte[] data ,IPEndPoint remoteIPE) ;
public interface IEsbUdpSender
{
void HookAndSendNetMessage(IPEndPoint remoteIpe ,NetMessage msg) ;
}
对于IEsbUdp接口有如下几点说明:
(1)同Tcp组件一样,Udp组件仍然可以使用Hook,由NetMessageHook属性体现。
(2)IEsbUdp有DataDealerFactory属性注入,而没有依赖与分派器组件,这个上面已经提到了。
(3)由于Udp不保证数据传输的正确、完整性,所以当接收到一个无效的消息时会触发InvalidMsgRecieved事件。而ServiceCommitted事件的含义与Tcp组件是完全一致的。
(4)IEsbUdp接口继承了IEsbUdpSender接口,IEsbUdpSender与Tcp组件中的IHookSender接口的目的是一致的。它用于保证在udp组件之外通过udp组件(有点绕口)发送消息时都能经过Hook链而不会有漏网之Message。
通过接口我们了解了Udp组件的职责后,我们就可以实现这个组件了,ESFramework中默认的IEsbUdp接口的实现是EsbUdp,它通过一个循环的接收线程来专门接收消息,这个循环中的流程如下:
(1)接收一个Udp消息。
(2)检查这个消息的消息头的完整/正确性,如果不正确、则触发InvalidMsgRecieved事件。否则,进入下一步。
(3)解析消息头,并从中获取消息主体长度,然后判断消息主体是否完整,如果不完整、则触发InvalidMsgRecieved事件。否则,进入下一步。
(4)调用Hook链CaptureRecievedMsg。
(5)通过异步的方式,将消息的具体处理工作放到后台线程池的某个线程中处理。
(6)回到第一步,继续接收下个消息。
之所以第五步将消息放到后台线程中处理,是为了避免阻塞接收线程,因为处理消息可能需要大量的cpu时间。我们继续看后台线程处理消息的流程:
(1)根据消息类型从处理器工厂获取对应的消息处理器。
(2)调用消息处理器处理消息
(3)调用Hook链CaptureBeforeSendMsg
(4)发送回复消息。
(5)触发ServiceCommitted事件。
上面两个就是Udp组件的核心流程了,由于Udp是基于非连接的,所以不需要在Udp组件中进行像Tcp那样复杂的连接管理,当然,Udp协议仍然可以进行“伪在线”用户管理,这通过另外一个组件IUserManager做到,这个组件将在后面的文章中介绍。
最后,给出EsbUdp的实现源码:
EsbUdp
public class EsbUdp :IEsbUdp
{
private UdpClient udpClient ;
private int port = 10000;
private IContractHelper contractHelper ;
private INetMessageHook netMessageHook = new EmptyNetMessageHook();
private IDataDealerFactory dataDealerFactory ;
private volatile bool goToStop = true ;
private ManualResetEvent stopEvent = new ManualResetEvent(false) ;
public event CbServiceCommitted ServiceCommitted ;
public EsbUdp()
{
this.netMessageHook = new EmptyNetMessageHook() ;
}
public void Initialize()
{
this.udpClient = new UdpClient(this.port) ;
}
#region IEsbUdp 成员
public event CbInvalidMsg InvalidMsgRecieved ;
#region property
public int Port
{
set
{
this.port = value ;
}
}
public IContractHelper ContractHelper
{
set
{
this.contractHelper = value ;
}
}
public IDataDealerFactory DataDealerFactory
{
set
{
this.dataDealerFactory = value ;
}
}
public INetMessageHook NetMessageHook
{
set
{
if(value != null)
{
this.netMessageHook = value ;
}
else
{
this.netMessageHook = new EmptyNetMessageHook() ;
}
}
}
#endregion
#region Method
public void Start()
{
this.goToStop = false ;
this.stopEvent.Reset() ;
CbSimple cb = new CbSimple(this.Worker) ;
cb.BeginInvoke(null ,null) ;
}
public void Stop()
{
this.goToStop = true ;
//发送eof消息给自己,使Receive不再阻塞
byte[] eof = new byte[4] ;
this.udpClient.Send(eof ,4 ,new IPEndPoint(IPAddress.Parse("127.0.0.1") ,this.port)) ;
this.stopEvent.WaitOne() ;
}
public void HookAndSendNetMessage(IPEndPoint remoteIpe ,NetMessage msg)
{
byte[] data = this.netMessageHook.CaptureBeforeSendMsg(msg).ToStream() ;
this.udpClient.Send(data ,data.Length ,remoteIpe) ;
}
#region Worker
private void Worker()
{
while(! this.goToStop)
{
IPEndPoint remoteIPE = null ;
byte[] data = this.udpClient.Receive(ref remoteIPE ) ;
if(data.Length < this.contractHelper.MessageHeaderLength)
{
this.ActivateInvalidMsg(data ,remoteIPE) ;
continue ;
}
IMessageHeader header = this.contractHelper.ParseMessageHeader(data ,0) ;
if(((this.contractHelper.MessageHeaderLength + header.MessageBodyLength) > data.Length) || (! this.contractHelper.ValidateMessageToken(header)))
{
this.ActivateInvalidMsg(data ,remoteIPE) ;
continue ;
}
byte[] body = null ;
if(header.MessageBodyLength > 0)
{
body = new byte[header.MessageBodyLength] ;
for(int i=0 ;i<body.Length ;i++)
{
body[i] = data[this.contractHelper.MessageHeaderLength + i] ;
}
}
NetMessage msg = new NetMessage(header ,body) ;
msg.Tag = remoteIPE ;
NetMessage hookedMsg = this.netMessageHook.CaptureRecievedMsg(msg) ;
if(this.goToStop)
{
break ;
}
CbMessageDealing cb = new CbMessageDealing(this.MessageDealing) ;
cb.BeginInvoke(hookedMsg ,null ,null) ;
}
this.stopEvent.Set() ;
}
//异步处理消息
private void MessageDealing(NetMessage msg)
{
IDataDealer dealer = this.dataDealerFactory.CreateDealer(msg.Header.ServiceKey ,msg.Header.TypeKey) ;
NetMessage resMsg = dealer.DealRequestMessage(msg) ;
if(resMsg != null)
{
NetMessage hookedResMsg = this.netMessageHook.CaptureBeforeSendMsg(resMsg) ;
byte[] bRes = hookedResMsg.ToStream() ;
this.udpClient.Send(bRes,bRes.Length ,(IPEndPoint)msg.Tag) ;
if(this.ServiceCommitted != null)
{
this.ServiceCommitted(resMsg.Header.UserID ,hookedResMsg) ;
}
}
}
private void ActivateInvalidMsg(byte[] data ,IPEndPoint remoteIPE)
{
if(this.InvalidMsgRecieved != null)
{
this.InvalidMsgRecieved(data ,remoteIPE) ;
}
}
#endregion
#endregion
#endregion
}
public delegate void CbMessageDealing(NetMessage msg ) ;
关于ESFramework对Udp的支持,还有几个重要的主题需要介绍,除了上面提到的IUserManager组件,还有IUdpServerAgent组件(还记得ESFramework介绍之(7)-- 服务端代理IServerAgent),另外还有非常重要的一块--基于Udp的NAPT穿透。如果要了解这些内容,请继续关注本系列的文章。
上一篇:ESFramework介绍之(27)-- 支持OverdueMessage
转到 :ESFramework 可复用的通信框架(序)