处理原理:
半包:即一条消息底层分几次发送,先有个头包读取整条消息的长度,当不满足长度时,将消息临时缓存起来,直到满足长度再解码
粘包:两条完整/不完整消息粘在一起,一般是解码完上一条消息,然后再判断是否有剩余字节,有的话缓存起来,循环半包处理
客户端接收代码:
private void callReceived(object sender, SocketAsyncEventArgs args) { var socket = sender as Socket; var bb = args.UserToken as ByteBuffer; if (args.SocketError == SocketError.Success) { bb.WriteBytes(args.Buffer, args.Offset, args.BytesTransferred); bb.MarkReaderIndex(); int headLength = bb.ReadInt(); int msgLength = headLength;//长度已-4 int readByteLength = bb.ReadableBytes(); //解决半包 if (msgLength > readByteLength) { //还原读取索引记录 bb.ResetReaderIndex(); } else { //是否去掉包头 byte[] filthyBytes= bb.ToArray(); System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(filthyBytes)); //解决粘包剩余 bb.Clear(); int useLength = filthyBytes.Length; int lastOffSetLength = filthyBytes.Length - useLength; if (lastOffSetLength > 0) { bb.WriteBytes(filthyBytes, lastOffSetLength, filthyBytes.Length); } } } else { //丢去byte处理 System.Console.WriteLine("error callReceived"); } _socket.ReceiveAsync(args); }
服务端发送代码:
ByteBuffer bb = ByteBuffer.Allocate(50); byte[] sendBytes=System.Text.Encoding.UTF8.GetBytes("1234567890abcdefg"); Console.WriteLine("send msg length : " + sendBytes.Length); bb.WriteInt(sendBytes.Length); bb.WriteBytes(sendBytes); Send(bb.ToArray(), _clients.ToArray()); public void Send(byte[] msg, params SocketAsyncEventArgs[] sockets) { System.Console.WriteLine(" Send msg :"); foreach (SocketAsyncEventArgs s in sockets) { SocketAsyncEventArgs args = new SocketAsyncEventArgs(); args.SetBuffer(msg, 0, msg.Length); //args.RemoteEndPoint = s.RemoteEndPoint; args.AcceptSocket = s.AcceptSocket; args.Completed += new EventHandler<SocketAsyncEventArgs>(this.callSended); System.Console.WriteLine(" AcceptSocket :" + s.AcceptSocket.RemoteEndPoint.ToString()); args.AcceptSocket.SendAsync(args); } }
ByteBuffer 类基础跟netty相同,网上复制的
using System; public class ByteBuffer { //字节缓存区 private byte[] buf; //读取索引 private int readIndex = 0; //写入索引 private int writeIndex = 0; //读取索引标记 private int markReadIndex = 0; //写入索引标记 private int markWirteIndex = 0; //缓存区字节数组的长度 private int capacity; /** * 构造方法 */ private ByteBuffer(int capacity) { buf = new byte[capacity]; this.capacity = capacity; } /** * 构造方法 */ private ByteBuffer(byte[] bytes) { buf = bytes; this.capacity = bytes.Length; } /** * 构建一个capacity长度的字节缓存区ByteBuffer对象 */ public static ByteBuffer Allocate(int capacity) { return new ByteBuffer(capacity); } /** * 构建一个以bytes为字节缓存区的ByteBuffer对象,一般不推荐使用 */ public static ByteBuffer Allocate(byte[] bytes) { return new ByteBuffer(bytes); } /** * 翻转字节数组,如果本地字节序列为低字节序列,则进行翻转以转换为高字节序列 */ private byte[] flip(byte[] bytes) { if (BitConverter.IsLittleEndian) { Array.Reverse(bytes); } return bytes; } /** * 确定内部字节缓存数组的大小 */ private int FixSizeAndReset(int currLen, int futureLen) { if (futureLen > currLen) { //以原大小的2次方数的两倍确定内部字节缓存区大小 int size = FixLength(currLen) * 2; if (futureLen > size) { //以将来的大小的2次方的两倍确定内部字节缓存区大小 size = FixLength(futureLen) * 2; } byte[] newbuf = new byte[size]; Array.Copy(buf, 0, newbuf, 0, currLen); buf = newbuf; capacity = newbuf.Length; } return futureLen; } /** * 根据length长度,确定大于此leng的最近的2次方数,如length=7,则返回值为8 */ private int FixLength(int length) { int n = 2; int b = 2; while (b < length) { b = 2 << n; n++; } return b; } /** * 将bytes字节数组从startIndex开始的length字节写入到此缓存区 */ public ByteBuffer WriteBytes(byte[] bytes, int startIndex, int length) { lock (this) { int offset = length - startIndex; if (offset <= 0) return this; int total = offset + writeIndex; int len = buf.Length; FixSizeAndReset(len, total); for (int i = writeIndex, j = startIndex; i < total; i++, j++) { this.buf[i] = bytes[j]; } writeIndex = total; } return this; } /** * 将字节数组中从0到length的元素写入缓存区 */ public ByteBuffer WriteBytes(byte[] bytes, int length) { return WriteBytes(bytes, 0, length); } /** * 将字节数组全部写入缓存区 */ public ByteBuffer WriteBytes(byte[] bytes) { return WriteBytes(bytes, bytes.Length); } /** * 将一个ByteBuffer的有效字节区写入此缓存区中 */ public ByteBuffer Write(ByteBuffer buffer) { if (buffer == null) return this; if (buffer.ReadableBytes() <= 0) return this; return WriteBytes(buffer.ToArray()); } /** * 写入一个int16数据 */ public ByteBuffer WriteShort(short value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个uint16数据 */ public ByteBuffer WriteUshort(ushort value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /**写入字符串*/ public ByteBuffer WriteString(string value) { int len = value.Length; WriteInt(len); //System.Text.Encoding.BigEndianUnicode.GetBytes WriteBytes(System.Text.Encoding.UTF8.GetBytes(value)); return this; } /**读取字符串*/ public String ReadString() { int len =ReadInt(); byte[] bytes =new byte[len]; ReadBytes(bytes,0,len); return System.Text.Encoding.UTF8.GetString(bytes); } /** * 写入一个int32数据 */ public ByteBuffer WriteInt(int value) { //byte[] array = new byte[4]; //for (int i = 3; i >= 0; i--) //{ // array[i] = (byte)(value & 0xff); // value = value >> 8; //} //Array.Reverse(array); //Write(array); return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个uint32数据 */ public ByteBuffer WriteUint(uint value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个int64数据 */ public ByteBuffer WriteLong(long value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个uint64数据 */ public ByteBuffer WriteUlong(ulong value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个float数据 */ public ByteBuffer WriteFloat(float value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 写入一个byte数据 */ public ByteBuffer WriteByte(byte value) { lock (this) { int afterLen = writeIndex + 1; int len = buf.Length; FixSizeAndReset(len, afterLen); buf[writeIndex] = value; writeIndex = afterLen; } return this; } /** * 写入一个double类型数据 */ public ByteBuffer WriteDouble(double value) { return WriteBytes(flip(BitConverter.GetBytes(value))); } /** * 读取一个字节 */ public byte ReadByte() { byte b = buf[readIndex]; readIndex++; return b; } /** * 从读取索引位置开始读取len长度的字节数组 */ private byte[] Read(int len) { byte[] bytes = new byte[len]; Array.Copy(buf, readIndex, bytes, 0, len); if (BitConverter.IsLittleEndian) { Array.Reverse(bytes); } readIndex += len; return bytes; } /** * 读取一个uint16数据 */ public ushort ReadUshort() { return BitConverter.ToUInt16(Read(2), 0); } /** * 读取一个int16数据 */ public short ReadShort() { return BitConverter.ToInt16(Read(2), 0); } /** * 读取一个uint32数据 */ public uint ReadUint() { return BitConverter.ToUInt32(Read(4), 0); } /** * 读取一个int32数据 */ public int ReadInt() { return BitConverter.ToInt32(Read(4), 0); } /** * 读取一个uint64数据 */ public ulong ReadUlong() { return BitConverter.ToUInt64(Read(8), 0); } /** * 读取一个long数据 */ public long ReadLong() { return BitConverter.ToInt64(Read(8), 0); } /** * 读取一个float数据 */ public float ReadFloat() { return BitConverter.ToSingle(Read(4), 0); } /** * 读取一个double数据 */ public double ReadDouble() { return BitConverter.ToDouble(Read(8), 0); } /** * 从读取索引位置开始读取len长度的字节到disbytes目标字节数组中 * @params disstart 目标字节数组的写入索引 */ public void ReadBytes(byte[] disbytes, int disstart, int len) { int size = disstart + len; for (int i = disstart; i < size; i++) { disbytes[i] = this.ReadByte(); } } /** * 清除已读字节并重建缓存区 */ public void DiscardReadBytes() { if(readIndex <= 0) return; int len = buf.Length - readIndex; byte[] newbuf = new byte[len]; Array.Copy(buf, readIndex, newbuf, 0, len); buf = newbuf; writeIndex -= readIndex; markReadIndex -= readIndex; if (markReadIndex < 0) { markReadIndex = readIndex; } markWirteIndex -= readIndex; if (markWirteIndex < 0 || markWirteIndex < readIndex || markWirteIndex < markReadIndex) { markWirteIndex = writeIndex; } readIndex = 0; } /** * 清空此对象 */ public void Clear() { buf = new byte[buf.Length]; readIndex = 0; writeIndex = 0; markReadIndex = 0; markWirteIndex = 0; } /** * 设置开始读取的索引 */ public void SetReaderIndex(int index) { if (index < 0) return; readIndex = index; } /** * 标记读取的索引位置 */ public int MarkReaderIndex() { markReadIndex = readIndex; return markReadIndex; } /** * 标记写入的索引位置 */ public void MarkWriterIndex() { markWirteIndex = writeIndex; } /** * 将读取的索引位置重置为标记的读取索引位置 */ public void ResetReaderIndex() { readIndex = markReadIndex; } /** * 将写入的索引位置重置为标记的写入索引位置 */ public void ResetWriterIndex() { writeIndex = markWirteIndex; } /** * 可读的有效字节数 */ public int ReadableBytes() { return writeIndex - readIndex; } /** * 获取可读的字节数组 */ public byte[] ToArray() { byte[] bytes = new byte[writeIndex]; Array.Copy(buf, 0, bytes, 0, bytes.Length); return bytes; } /** * 获取缓存区大小 */ public int GetCapacity() { return this.capacity; } }
最后小结一下:
相信大家都看过TCP编程的书,书本上的理论废话长篇,说了一堆又给出一堆无用的代码。只有看源码才知道原理是怎样,源码实现一切。。。
在看的过程发现原作者写得也不是很好,简单的事情总是搞得那么复杂。。。。。
这行真是水深火热啊