internal sealed class MySocketAsyncEventArgs : SocketAsyncEventArgs { internal string UID; private string Property; internal MySocketAsyncEventArgs(string property){ this.Property = property; } }
internal sealed class SocketAsyncEventArgsWithId:IDisposable { private string uid = "-1"; private bool state = false; private MySocketAsyncEventArgs receivesaea; private MySocketAsyncEventArgs sendsaea; internal string UID { get { return uid; } set { uid = value; ReceiveSAEA.UID = value; SendSAEA.UID = value; } } }
- Stack<SocketAsyncEventArgsWithId> pool : 从字面意思上就知道这是一个连接栈,用来存放空闲的连接的,使用时pop出来,使用完后push进去。
- IDictionary<string, SocketAsyncEventArgsWithId> busypool :这个也很好理解,busypool是一个字典类型的,用来存放正在使用的连接的,key是用户标识,设计的目的是为了统计在线用户数目和查找相应用户的连接,当然这是很重要的,为什么设计成字典类型的,是因为我们查找时遍历字典的关键字就行了而不用遍历每一项的UID,这样效率会有所提高。
- string[] keys:这是一个存放用户标识的数组,起一个辅助的功能。
- Count属性:返回连接池中可用的连接数。
- OnlineUID属性:返回在线用户的标识列表。
- Pop(string uid)方法:用于获取一个可用连接给用户。
- Push(SocketAsyncEventArgsWithId item)方法:把一个使用完的连接放回连接池。
- FindByUID(string uid)方法:查找在线用户连接,返回这个连接。
- BusyPoolContains(string uid)方法:判断某个用户的连接是否在线。
internal sealed class SocketAsyncEventArgsPool:IDisposable { internal Stack<SocketAsyncEventArgsWithId> pool; internal IDictionary<string, SocketAsyncEventArgsWithId> busypool; private string[] keys; internal Int32 Count { get { lock (this.pool) { return this.pool.Count; } } } internal string[] OnlineUID { get { lock (this.busypool) { busypool.Keys.CopyTo(keys, 0); } return keys; } } internal SocketAsyncEventArgsPool(Int32 capacity) { keys = new string[capacity]; this.pool = new Stack<SocketAsyncEventArgsWithId>(capacity); this.busypool = new Dictionary<string, SocketAsyncEventArgsWithId>(capacity); } internal SocketAsyncEventArgsWithId Pop(string uid) { if (uid == string.Empty || uid == "") return null; SocketAsyncEventArgsWithId si = null; lock (this.pool) { si = this.pool.Pop(); } si.UID = uid; si.State = true; //mark the state of pool is not the initial step busypool.Add(uid, si); return si; } internal void Push(SocketAsyncEventArgsWithId item) { if (item == null) throw new ArgumentNullException("SocketAsyncEventArgsWithId对象为空"); if (item.State == true) { if (busypool.Keys.Count != 0) { if (busypool.Keys.Contains(item.UID)) busypool.Remove(item.UID); else throw new ArgumentException("SocketAsyncEventWithId不在忙碌队列中"); } else throw new ArgumentException("忙碌队列为空"); } item.UID = "-1"; item.State = false; lock (this.pool) { this.pool.Push(item); } } internal SocketAsyncEventArgsWithId FindByUID(string uid) { if (uid == string.Empty || uid == "") return null; SocketAsyncEventArgsWithId si = null; foreach (string key in this.OnlineUID) { if (key == uid) { si = busypool[uid]; break; } } return si; } internal bool BusyPoolContains(string uid) { lock (this.busypool) { return busypool.Keys.Contains(uid); } } }
internal sealed class BufferManager:IDisposable { private Byte[] buffer; private Int32 bufferSize; private Int32 numSize; private Int32 currentIndex; private Stack<Int32> freeIndexPool; internal Boolean SetBuffer(SocketAsyncEventArgs args) { if (this.freeIndexPool.Count > 0) { args.SetBuffer(this.buffer, this.freeIndexPool.Pop(), this.bufferSize); } else { if ((this.numSize - this.bufferSize) < this.currentIndex) { return false; } args.SetBuffer(this.buffer, this.currentIndex, this.bufferSize); this.currentIndex += this.bufferSize; } return true; } }
5.RequestHandler类:这里代码就不贴了,这个类也比较简单。比如发送方要发送的内容为:hello,nice to meet you那么真正发送的内容是:[length=22]hello,nice to meet you,length后的数字是字符串的长度,接收方接收到消息后根据长度检验和获取信息。
public sealed class SocketListener:IDisposable { /// <summary> /// 缓冲区 /// </summary> private BufferManager bufferManager; /// <summary> /// 服务器端Socket /// </summary> private Socket listenSocket; /// <summary> /// 服务同步锁 /// </summary> private static Mutex mutex = new Mutex(); /// <summary> /// 当前连接数 /// </summary> private Int32 numConnections; /// <summary> /// 最大并发量 /// </summary> private Int32 numConcurrence; /// <summary> /// 服务器状态 /// </summary> private ServerState serverstate; /// <summary> /// 读取写入字节 /// </summary> private const Int32 opsToPreAlloc = 1; /// <summary> /// Socket连接池 /// </summary> private SocketAsyncEventArgsPool readWritePool; /// <summary> /// 并发控制信号量 /// </summary> private Semaphore semaphoreAcceptedClients; /// <summary> /// 通信协议 /// </summary> private RequestHandler handler; /// <summary> /// 回调委托 /// </summary> /// <param name="IP"></param> /// <returns></returns> public delegate string GetIDByIPFun(string IP); /// <summary> /// 回调方法实例 /// </summary> private GetIDByIPFun GetIDByIP; /// <summary> /// 接收到信息时的事件委托 /// </summary> /// <param name="info"></param> public delegate void ReceiveMsgHandler(string uid, string info); /// <summary> /// 接收到信息时的事件 /// </summary> public event ReceiveMsgHandler OnMsgReceived; /// <summary> /// 开始监听数据的委托 /// </summary> public delegate void StartListenHandler(); /// <summary> /// 开始监听数据的事件 /// </summary> public event StartListenHandler StartListenThread; /// <summary> /// 发送信息完成后的委托 /// </summary> /// <param name="successorfalse"></param> public delegate void SendCompletedHandler(string uid,string exception); /// <summary> /// 发送信息完成后的事件 /// </summary> public event SendCompletedHandler OnSended; /// <summary> /// 获取当前的并发数 /// </summary> public Int32 NumConnections { get { return this.numConnections; } } /// <summary> /// 最大并发数 /// </summary> public Int32 MaxConcurrence { get { return this.numConcurrence; } } /// <summary> /// 返回服务器状态 /// </summary> public ServerState State { get { return serverstate; } } /// <summary> /// 获取当前在线用户的UID /// </summary> public string[] OnlineUID { get { return readWritePool.OnlineUID; } } /// <summary> /// 初始化服务器端 /// </summary> /// <param name="numConcurrence">并发的连接数量(1000以上)</param> /// <param name="receiveBufferSize">每一个收发缓冲区的大小(32768)</param> public SocketListener(Int32 numConcurrence, Int32 receiveBufferSize, GetIDByIPFun GetIDByIP) { serverstate = ServerState.Initialing; this.numConnections = 0; this.numConcurrence = numConcurrence; this.bufferManager = new BufferManager(receiveBufferSize * numConcurrence * opsToPreAlloc, receiveBufferSize); this.readWritePool = new SocketAsyncEventArgsPool(numConcurrence); this.semaphoreAcceptedClients = new Semaphore(numConcurrence, numConcurrence); handler = new RequestHandler(); this.GetIDByIP = GetIDByIP; } /// <summary> /// 服务端初始化 /// </summary> public void Init() { this.bufferManager.InitBuffer(); SocketAsyncEventArgsWithId readWriteEventArgWithId; for (Int32 i = 0; i < this.numConcurrence; i++) { readWriteEventArgWithId = new SocketAsyncEventArgsWithId(); readWriteEventArgWithId.ReceiveSAEA.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveCompleted); readWriteEventArgWithId.SendSAEA.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted); //只给接收的SocketAsyncEventArgs设置缓冲区 this.bufferManager.SetBuffer(readWriteEventArgWithId.ReceiveSAEA); this.readWritePool.Push(readWriteEventArgWithId); } serverstate = ServerState.Inited; } /// <summary> /// 启动服务器 /// </summary> /// <param name="data">端口号</param> public void Start(Object data) { Int32 port = (Int32)data; IPAddress[] addresslist = Dns.GetHostEntry(Environment.MachineName).AddressList; IPEndPoint localEndPoint = new IPEndPoint(addresslist[addresslist.Length - 1], port); this.listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6) { this.listenSocket.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false); this.listenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port)); } else { this.listenSocket.Bind(localEndPoint); } this.listenSocket.Listen(100); this.StartAccept(null); //开始监听已连接用户的发送数据 StartListenThread(); serverstate = ServerState.Running; mutex.WaitOne(); } /// <summary> /// 开始监听线程的入口函数 /// </summary> public void Listen() { while (true) { string[] keys = readWritePool.OnlineUID; foreach (string uid in keys) { if (uid != null && readWritePool.busypool[uid].ReceiveSAEA.LastOperation != SocketAsyncOperation.Receive) { Boolean willRaiseEvent = (readWritePool.busypool[uid].ReceiveSAEA.UserToken as Socket).ReceiveAsync(readWritePool.busypool[uid].ReceiveSAEA); if (!willRaiseEvent) ProcessReceive(readWritePool.busypool[uid].ReceiveSAEA); } } } } /// <summary> /// 发送信息 /// </summary> /// <param name="uid">要发送的用户的uid</param> /// <param name="msg">消息体</param> public void Send(string uid, string msg) { if (uid == string.Empty || uid == "" || msg == string.Empty || msg == "") return; SocketAsyncEventArgsWithId socketWithId = readWritePool.FindByUID(uid); if (socketWithId == null) //说明用户已经断开 //100 发送成功 //200 发送失败 //300 用户不在线 //其它 表示异常的信息 OnSended(uid, "300"); else { MySocketAsyncEventArgs e = socketWithId.SendSAEA; if (e.SocketError == SocketError.Success) { int i = 0; try { string message = @"[lenght=" + msg.Length + @"]" + msg; byte[] sendbuffer = Encoding.Unicode.GetBytes(message); e.SetBuffer(sendbuffer, 0, sendbuffer.Length); Boolean willRaiseEvent = (e.UserToken as Socket).SendAsync(e); if (!willRaiseEvent) { this.ProcessSend(e); } } catch (Exception ex) { if (i <= 5) { i++; //如果发送出现异常就延迟0.01秒再发 Thread.Sleep(10); Send(uid, msg); } else { OnSended(uid, ex.ToString()); } } } else { OnSended(uid, "200"); this.CloseClientSocket(((MySocketAsyncEventArgs)e).UID); } } } /// <summary> /// 停止服务器 /// </summary> public void Stop() { if(listenSocket!=null) listenSocket.Close(); listenSocket = null; Dispose(); mutex.ReleaseMutex(); serverstate = ServerState.Stoped; } private void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted); } else acceptEventArg.AcceptSocket = null; this.semaphoreAcceptedClients.WaitOne(); Boolean willRaiseEvent = this.listenSocket.AcceptAsync(acceptEventArg); if (!willRaiseEvent) { this.ProcessAccept(acceptEventArg); } } private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e) { this.ProcessAccept(e); } private void ProcessAccept(SocketAsyncEventArgs e) { if (e.LastOperation != SocketAsyncOperation.Accept) //检查上一次操作是否是Accept,不是就返回 return; if (e.BytesTransferred <= 0) //检查发送的长度是否大于0,不是就返回 return; string UID = GetIDByIP((e.AcceptSocket.RemoteEndPoint as IPEndPoint).Address.ToString()); //根据IP获取用户的UID if (UID == string.Empty || UID == null || UID == "") return; if (readWritePool.BusyPoolContains(UID)) //判断现在的用户是否已经连接,避免同一用户开两个连接 return; SocketAsyncEventArgsWithId readEventArgsWithId = this.readWritePool.Pop(UID); readEventArgsWithId.ReceiveSAEA.UserToken = e.AcceptSocket; readEventArgsWithId.SendSAEA.UserToken = e.AcceptSocket; Interlocked.Increment(ref this.numConnections); this.StartAccept(e); } private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e) { ProcessReceive(e); } private void OnSendCompleted(object sender, SocketAsyncEventArgs e) { ProcessSend(e); } private void ProcessReceive(SocketAsyncEventArgs e) { if (e.LastOperation != SocketAsyncOperation.Receive) return; if (e.BytesTransferred > 0) { if (e.SocketError == SocketError.Success) { Int32 byteTransferred = e.BytesTransferred; string received = Encoding.Unicode.GetString(e.Buffer, e.Offset, byteTransferred); //检查消息的准确性 string[] msg = handler.GetActualString(received); foreach (string m in msg) OnMsgReceived(((MySocketAsyncEventArgs)e).UID, m); //可以在这里设一个停顿来实现间隔时间段监听,这里的停顿是单个用户间的监听间隔 //发送一个异步接受请求,并获取请求是否为成功 Boolean willRaiseEvent = (e.UserToken as Socket).ReceiveAsync(e); if (!willRaiseEvent) ProcessReceive(e); } } else this.CloseClientSocket(((MySocketAsyncEventArgs)e).UID); } private void ProcessSend(SocketAsyncEventArgs e) { if (e.LastOperation != SocketAsyncOperation.Send) return; if (e.BytesTransferred > 0) { if (e.SocketError == SocketError.Success) OnSended(((MySocketAsyncEventArgs)e).UID, "100"); else OnSended(((MySocketAsyncEventArgs)e).UID, "200"); } else this.CloseClientSocket(((MySocketAsyncEventArgs)e).UID); } private void CloseClientSocket(string uid) { if (uid == string.Empty || uid == "") return; SocketAsyncEventArgsWithId saeaw = readWritePool.FindByUID(uid); if (saeaw == null) return; Socket s = saeaw.ReceiveSAEA.UserToken as Socket; try { s.Shutdown(SocketShutdown.Both); } catch (Exception) { //客户端已经关闭 } this.semaphoreAcceptedClients.Release(); Interlocked.Decrement(ref this.numConnections); this.readWritePool.Push(saeaw); } #region IDisposable Members public void Dispose() { bufferManager.Dispose(); bufferManager = null; readWritePool.Dispose(); readWritePool = null; } #endregion }