using System; using System.Linq; using System.Net.Sockets; using System.Text; using System.Threading; namespace Player.Common.Sockets { /// <summary> /// Socket连接,双向通信 /// </summary> public class TcpSocketConnection { #region 构造函数 /// <summary> /// 构造函数 /// </summary> /// <param name="socket">维护的Socket对象</param> /// <param name="server">维护此连接的服务对象</param> /// <param name="recLength">接受缓冲区大小</param> public TcpSocketConnection(Socket socket, TcpSocketServer server, int recLength) { _socket = socket; _server = server; _recLength = recLength; } #endregion #region 私有成员 private Socket _socket { get; } private bool _isRec { get; set; } = true; private TcpSocketServer _server { get; set; } = null; private bool _isClosed { get; set; } = false; private string _connectionId { get; set; } = Guid.NewGuid().ToString(); private ManualResetEvent allDone { get; set; } = new ManualResetEvent(false); /// <summary> /// 接收区大小,单位:字节 /// </summary> private int _recLength { get; set; } #endregion #region 外部接口 int headSize = 4;//包头长度 固定4 byte[] surplusBuffer = null;//不完整的数据包,即用户自定义缓冲区 /// <summary> /// 开始接受客户端消息 /// </summary> public void StartRecMsg() { try { while (true) { allDone.Reset(); byte[] container = new byte[_recLength]; _socket.BeginReceive(container, 0, container.Length, SocketFlags.None, asyncResult => { try { //bytes 为系统缓冲区数据 //bytesRead为系统缓冲区长度 int bytesRead = _socket.EndReceive(asyncResult); if (bytesRead > 0) { if (surplusBuffer == null)//判断是不是第一次接收,为空说是第一次 { surplusBuffer = new byte[bytesRead]; Array.Copy(container, 0, surplusBuffer, 0, bytesRead); } else { byte[] container1 = new byte[bytesRead]; Array.Copy(container, 0, container1, 0, bytesRead); surplusBuffer = surplusBuffer.Concat(container1).ToArray();//拼接上一次剩余的包,已经完成读取每个数据包长度 } int haveRead = 0; //这里totalLen的长度有可能大于缓冲区大小的(因为 这里的surplusBuffer 是系统缓冲区+不完整的数据包) int totalLen = surplusBuffer.Length; while (haveRead <= totalLen) { //如果在N此拆解后剩余的数据包连一个包头的长度都不够 //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包 if (totalLen - haveRead < headSize) { byte[] byteSub = new byte[totalLen - haveRead]; //把剩下不够一个完整的数据包存起来 Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead); surplusBuffer = byteSub; totalLen = 0; break; } //如果够了一个完整包,则读取包头的数据 byte[] headByte = new byte[headSize]; Buffer.BlockCopy(surplusBuffer, haveRead, headByte, 0, headSize);//从缓冲区里读取包头的字节 int bodySize = BitConverter.ToInt32(headByte, 0);//从包头里面分析出包体的长度 //这里的 haveRead=等于N个数据包的长度 从0开始;0,1,2,3....N //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存 if (haveRead + headSize + bodySize > totalLen) { byte[] byteSub = new byte[totalLen - haveRead]; Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead); surplusBuffer = byteSub; break; } if (bodySize < 2) { WebLogHelper.Error(LogType.SysLog, $"bytesRead:{bytesRead},bodySize:{bodySize}"); Close(); break; } //处理正常消息 byte[] recBytes = new byte[bodySize + headSize]; Array.Copy(surplusBuffer, haveRead, recBytes, 0, bodySize + headSize); //取出消息内容 HandleRecMsg?.Invoke(_server, this, recBytes); //依次累加当前的数据包的长度 haveRead = haveRead + headSize + bodySize; if (headSize + bodySize == bytesRead)//如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0 { surplusBuffer = null;//设置空 回到原始状态 totalLen = 0;//清0 } } } //开始下一轮接收 if (_isRec && IsSocketConnected()) { allDone.Set(); //StartRecMsg(); } } catch (ObjectDisposedException ) { } catch (Exception ex) { HandleException?.Invoke(ex); Close(); } }, null); allDone.WaitOne(); } } catch (Exception ex) { HandleException?.Invoke(ex); Close(); } } /// <summary> /// 发送数据 /// </summary> /// <param name="bytes">数据字节</param> public void Send(byte[] bytes) { try { _socket.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, asyncResult => { try { int length = _socket.EndSend(asyncResult); HandleSendMsg?.Invoke(_server, this, bytes); } catch (Exception ex) { HandleException?.Invoke(ex); } }, null); } catch (Exception ex) { HandleException?.Invoke(ex); } } /// <summary> /// 发送字符串(默认使用UTF-8编码) /// </summary> /// <param name="msgStr">字符串</param> public void Send(string msgStr) { Send(Encoding.UTF8.GetBytes(msgStr)); } /// <summary> /// 发送字符串(使用自定义编码) /// </summary> /// <param name="msgStr">字符串消息</param> /// <param name="encoding">使用的编码</param> public void Send(string msgStr, Encoding encoding) { Send(encoding.GetBytes(msgStr)); } /// <summary> /// 连接标识Id /// 注:用于标识与客户端的连接 /// </summary> public string ConnectionId { get { return _connectionId; } set { string oldConnectionId = _connectionId; _connectionId = value; _server?.SetConnectionId(this, oldConnectionId, value); } } /// <summary> /// 关闭当前连接 /// </summary> public void Close() { WebLogHelper.Error(LogType.SysLog, "自动关闭"); if (_isClosed) return; try { _isClosed = true; _server.RemoveConnection(this); _isRec = false; _socket.BeginDisconnect(false, (asyncCallback) => { try { _socket.EndDisconnect(asyncCallback); } catch (Exception ex) { HandleException?.Invoke(ex); } finally { _socket.Dispose(); } }, null); } catch (Exception ex) { WebLogHelper.Info(LogType.SysLog, "断开连接报错"); HandleException?.Invoke(ex); } finally { try { HandleClientClose?.Invoke(_server, this); } catch (Exception ex) { WebLogHelper.Info(LogType.SysLog, "调用关闭事件报错"); HandleException?.Invoke(ex); } } } /// <summary> /// 判断是否处于已连接状态 /// </summary> /// <returns></returns> public bool IsSocketConnected() { return !((_socket.Poll(1000, SelectMode.SelectRead) && (_socket.Available == 0)) || !_socket.Connected); } #endregion #region 事件处理 /// <summary> /// 客户端连接接受新的消息后调用 /// </summary> public Action<TcpSocketServer, TcpSocketConnection, byte[]> HandleRecMsg { get; set; } /// <summary> /// 客户端连接发送消息后回调 /// </summary> public Action<TcpSocketServer, TcpSocketConnection, byte[]> HandleSendMsg { get; set; } /// <summary> /// 客户端连接关闭后回调 /// </summary> public Action<TcpSocketServer, TcpSocketConnection> HandleClientClose { get; set; } /// <summary> /// 异常处理程序 /// </summary> public Action<Exception> HandleException { get; set; } #endregion } }