zoukankan      html  css  js  c++  java
  • 一个Socket数据处理模型

    Socket编程中,如何高效地接收和处理数据,这里介绍一个简单的编程模型。

    Socket索引 - SocketId


    在给出编程模型之前,先提这样一个问题,程序中如何描述Socket连接?

    为什么这么问呢,大家可以翻看我之前在项目总结(一)中给出的一个简单的基本架构,其中的网络层用来管理Socket的连接,并负责接收发送Socket数据,这个模块中可以直接使用建立的Socket连接对象。但如果上层需要给某个Socket发送数据怎么办,如果直接把Socket对象传送给上层,就破坏了面向对象中封装原则,上层甚至可以直接绕过网络层操作Socket数据收发,显然不是我们希望看到的。

    既然这样不能直接传递Socket对象,那么就要给上层传递一个能够标识这个对象的一个标识,这就是我要说的这个SocketId。

    SocketId实际上就是一个无符号整形数据,在网络层维护一个SocketId与Socket对象的映射表,上层通过SocketId通知网络层向对应的Socket发送数据。

    Socket索引 - 如何建立SocketId


    SocketId并不是说简单的从1开始,然后来一个Socket连接就直接加1作为对应的SocketId,我希望能够标识更多的东西。

    如图所示,我在网络层建立了一个SocketMark数组,长度是经过配置的允许Socket连接的最大个数。其中每个SocketMark包含两个主要的成员,连接的Socket对象,和对应的索引SocketId,如上所示。对于SocketId我不仅希望能够标识出在SocketMark数组中的的位置index(最终找到Socket发出数据),还希望标识出这个SocketMark被使用了多少次(在项目中有特殊用处,在这不做过多说明)。

    那么,怎么用index和usetimes表示SocketId呢?具体来说,SocketId是一个无符号整形数据,也就是有4个字节,我使用2个高字节来表示index,两个低字节来表示usetimes。那么,SocketId就是 index * 65536 + usetimes % 65536,相应的index = socketId / 65536, usetimes = socketId % 65536。

    SocketMark代码如下所示:

    public sealed class SocketMark
    {
    //用于线程同步
    public readonly object m_SyncLock = new object(); public uint SocketId = 0; public Socket WorkSocket = null;
    //用于接收Socket数据
    public byte[] Buffer;
    //当前WorkSocket是否连接
    public bool Connected = false; public SocketMark(int index, int bufferSize) {
    //默认情况下usetimes为0 SocketId
    = Convert.ToUInt32(index * 65536); Buffer = new byte[bufferSize]; } public void IncReuseTimes() { int reuseTimes = GetReuseTimes(SocketId) + 1; SocketId = Convert.ToUInt32(GetIndex(SocketId) * 65536 + reuseTimes % 65536); } public static int GetIndex(uint socketId) { return Convert.ToInt32(socketId / 65536); } public static int GetReuseTimes(uint socketId) { return Convert.ToInt32(socketId % 65536); } }

    当有Socket连接建立时,通过查询SocketMark数组中Connected字段值为false的元素(可以直接遍历查找,也可以采取其他方式,我使用的是建立一个对应SOcketMark的栈,保存index,有新连接index就出栈,然后设置SocketMark[index]的WorkSocket为这个连接;Socket断开后index再入栈),设置相应的WorkSocket后,同时要调用一次IncReuseTimes()函数,使用次数加1,并更新SocketId。

    在这里,网络层就可以使用Socket连接对象接收数据存储在Buffer中,并把数据连同SocketId传送给数据协议层。

    数据模型


    接下来是我要说的重点,在数据协议层这里,我需要定义一个新的结构,用来接收Socket数据,并尽可能地使处理高效。

    public sealed class ConnCache
    {
        public uint SocketId;                     // 连接标识
    
        public byte[] RecvBuffer;                 // 接收数据缓存,传输层抵达的数据,首先进入此缓存       
        public int RecvLen;                       // 接收数据缓存中的有效数据长度
        public readonly object RecvLock;          // 数据接收锁
    
        public byte[] WaitBuffer;                 // 待处理数据缓存,首先要将数据从RecvBuffer转移到该缓存,数据处理线程才能进行处理
        public int WaitLen;                       // 待处理数据缓存中的有效数据长度
        public readonly object AnalyzeLock;       // 数据分析锁
    
        public ConnCache(int recvBuffSize, int waitBuffSize)
        {
            SocketId = 65535;
            RecvBuffer = new byte[recvBuffSize];
            RecvLen = 0;
            RecvLock = new object();
            WaitBuffer = new byte[waitBuffSize];
            WaitLen = 0;
            AnalyzeLock = new object();
        }
    }

    解释一下:ConnCache用于管理从网络层接收的数据,并维护一个SocketId用来标识数据的归属。

    在这其中,包括上面SocketMark都定义了一个公共的只读对象,用来提供多线程时数据同步,但你应该注意到这几个锁的对象全都是public类型的,实际上这样并不好。因为这样,对象就无法控制程序对锁的使用,一旦锁的使用不符合预期,就很有可能造成程序出现死锁,所以建议大家在使用的时候还是考虑使用private修饰符,尽量由对象来完成资源的同步。

    但是,很不幸,我在项目中发现这样做有点不现实,使用private可能破坏了整个系统的结构。。而使用public只要能完全掌控代码,对不产生死锁有信心,还是非常方便的,基于这个理由,最终放弃了使用private的想法。

    继续回来说这个结构,在这里,我把从网络层接收的数据存储在RecvBuffer中,但我的解析线程并不直接访问这个数组,而是另外建立一个新的数据WaitBuffer,这个WaitBuffer的用处就是从RecvBuffer Copy一定的数据,然后提供给解析线程处理。这样做有两个好处,第一,避免了接收线程和处理线程直接争抢Buffer资源,能够提高处理性能。第二,额。。我看着挺清晰的,一个用来接收,一个用来处理,不是么

    注:当初在设计这个模型的时候,还不知道专门有个ReaderWriterLockSlim,我想如果能够代替上面的接收锁和分析锁,效果应该更好一点。

    模型使用


    介绍了上面两个主要的结构后,我们来看下如何写代码简单使用上述模型

    首先,实现客户端,参考项目总结 - 异步中的客户端代码,将代码修改为定时发送数据到服务器,并删除一些无关的代码

    class Program
    {
        static Socket socket;
        static void Main(string[] args)
        {
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            socket.Connect(IPAddress.Parse("127.0.0.1"), 1234);
    
            //启动一个线程定时向服务器发送数据
            ThreadPool.QueueUserWorkItem(state => {
                int index = 0;
    
                while (true)
                {
                    byte[] senddata = Encoding.Default.GetBytes("我们都有" + index++ + "个家,名字叫中国");
                    socket.BeginSend(senddata, 0, senddata.Length, SocketFlags.None, new AsyncCallback(Send), null);
    
                    Thread.Sleep(1000);
                }
            });
    
            Console.ReadKey();
        }
    
        static void Send(IAsyncResult ar)
        {
            socket.EndSend(ar);
        }
    }

    对于服务器,新建一个类NetLayer,用来模拟网络层,网络层建立Socket连接后,启动异步接收,并把接收到的数据通过委托传给处理层,传送时发送SocketId,代码如下:

    public delegate void ArrivedData(uint socketId, byte[] buffer);
    
    class NetLayer
    {
        Socket socket;
        SocketMark[] socketMarks;
    
        public event ArrivedData arrivedData;
    
        public NetLayer(int maxConnNum)
        {
            //初始化SocketMark
            //最大允许连接数
            socketMarks = new SocketMark[maxConnNum];
            for (int i = 0; i < socketMarks.Length; i++)
                socketMarks[i] = new SocketMark(i, 1024);
        }
    
        public void Start()
        {
            //新建Socket,并开始监听连接
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            socket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234));
            socket.Listen(100);
            socket.BeginAccept(new AsyncCallback(Accept), "new socket connect");
        }
    
        public void Accept(IAsyncResult ar)
        {
            Console.WriteLine(ar.AsyncState.ToString());
    
            //结束监听
            Socket _socket = socket.EndAccept(ar);
    
            //这里为了方便直接通过循环的方式查找可用的SocketMark
            SocketMark socketMark = null;
            for (int i = 0; i < socketMarks.Length; i++)
            {
                if (!socketMarks[i].Connected)
                {
                    socketMark = socketMarks[i];
                    break;
                }
            }
            //如果没有找到可用的SocketMark,说明达到最大连接数,关闭该连接
            if (socketMark == null)
            {
                _socket.Close();
                return;
            }
    
            socketMark.WorkSocket = _socket;
            socketMark.Connected = true;
            socketMark.IncReuseTimes();
    
            _socket.BeginReceive(socketMark.Buffer, 0, socketMark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), socketMark);
        }
    
        public void Receive(IAsyncResult ar)
        {
            SocketMark mark = (SocketMark)ar.AsyncState;
    
            int length = mark.WorkSocket.EndReceive(ar);
            if (length > 0)
            {
                //多线程下资源同步
                lock (mark.m_SyncLock)
                {
                    byte[] data = new byte[length];
                    Buffer.BlockCopy(mark.Buffer, 0, data, 0, length);
    
                    if (arrivedData != null)
                        arrivedData(mark.SocketId, data);
    
                    //再次投递接收申请
                    mark.WorkSocket.BeginReceive(mark.Buffer, 0, mark.Buffer.Length, SocketFlags.None, new AsyncCallback(Receive), mark);
                }
            }
        }
    }

    数据处理层收到数据后,先把数据存到对应ConnCache中的RecvBuffer中,并向队列Queue<ConnCache>写入一个标记,告诉处理线程应该处理哪个ConnCache的数据,在这里大家会看到,我在之前的文章中讨论的lock和Monitor是如何使用的。

    class Program
    {
        static ConnCache[] connCaches;
        
        //处理线程通过这个队列知道有数据需要处理
        static Queue<ConnCache> tokenQueue;
        //接收到数据后,同时通知处理线程处理数据
        static AutoResetEvent tokenEvent;
    
        static void Main(string[] args)
        {
            //最大允许连接数
            int maxConnNum = 10;
    
            //要和底层SocketMark数组的个数相同
            connCaches = new ConnCache[maxConnNum];
            for (int i = 0; i < maxConnNum; i++)
                connCaches[i] = new ConnCache(1024, 2048);
            
            tokenQueue = new Queue<ConnCache>();
            tokenEvent = new AutoResetEvent(false);
    
            NetLayer netLayer = new NetLayer(maxConnNum);
            netLayer.arrivedData += new ArrivedData(netLayer_arrivedData);
            netLayer.Start();
    
            //处理线程
            ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyzeThrd), null);
    
            Console.ReadKey();
        }
    
        static void netLayer_arrivedData(uint socketId, byte[] buffer)
        {
            int index = (int)(socketId / 65536);
            int reusetimes = (int)(socketId % 65536);
    
            Console.WriteLine("recv data from - index = {0}, reusetimes = {1}", index, reusetimes);
    
            int dataLen = buffer.Length;
            //仅使用了RecvLock,不影响WaitBuffer中的数据处理
            lock (connCaches[index].RecvLock)
            {
                //说明已经是一个新的Socket连接了,需要清理之前的数据
                if (connCaches[index].SocketId != socketId)
                {
                    connCaches[index].SocketId = socketId;
                    connCaches[index].RecvLen = 0;
                    connCaches[index].WaitLen = 0;
                }
    
                //如果收到的数据超过了可以接收的长度,截断
                if (dataLen > connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen)
                    dataLen = connCaches[index].RecvBuffer.Length - connCaches[index].RecvLen;
                if (dataLen > 0)
                {
                    //接收数据到RecvBuffer中,并更新已接收的长度值
                    Buffer.BlockCopy(buffer, 0, connCaches[index].RecvBuffer, connCaches[index].RecvLen, dataLen);
                    connCaches[index].RecvLen += dataLen;
                }
            }
    
            lock (((ICollection)tokenQueue).SyncRoot)
            {
                tokenQueue.Enqueue(connCaches[index]);
            }
            tokenEvent.Set();
        }
    
        static void AnalyzeThrd(object state)
        {
            ConnCache connCache;
    
            while (true)
            {
                Monitor.Enter(((ICollection)tokenQueue).SyncRoot);
                if (tokenQueue.Count > 0)
                {
                    connCache = tokenQueue.Dequeue();
                    Monitor.Exit(((ICollection)tokenQueue).SyncRoot);
                }
                else
                {
                    Monitor.Exit(((ICollection)tokenQueue).SyncRoot);
                    //如果没有需要处理的数据,等待15秒后再运行
                    tokenEvent.WaitOne(15000, false);
                    continue;
                }
    
                //这里就需要使用两个锁,只要保证使用这两个锁的顺序不变,就不会出现死锁问题
                lock (connCache.AnalyzeLock)
                {
                    while (connCache.RecvLen > 0)
                    {
                        lock (connCache.RecvBuffer)
                        {
                            //这里把接收到的数据COPY到待处理数组
                            int copyLen = connCache.WaitBuffer.Length - connCache.WaitLen;
                            if (copyLen > connCache.RecvLen)
                                copyLen = connCache.RecvLen;
                            Buffer.BlockCopy(connCache.RecvBuffer, 0, connCache.WaitBuffer, connCache.WaitLen, copyLen);
                            connCache.WaitLen += copyLen;
                            connCache.RecvLen -= copyLen;
                            //如果RecvBuffer中还有数据没有COPY完,把它们提到数组开始位置
                            if (connCache.RecvLen > 0)
                                Buffer.BlockCopy(connCache.RecvBuffer, copyLen, connCache.RecvBuffer, 0, connCache.RecvLen);
                        }
                    }
    
                    //这里就是解析数据的地方,在这我直接把收到的数据打印出来(注意:如果客户端数据发送很快,有可能打印出乱码)
                    //还在AnalyzeLock锁中
                    {
                        string data = Encoding.Default.GetString(connCache.WaitBuffer, 0, connCache.WaitLen);
                        Console.WriteLine("analyzed: " + data);
    
                        //WaitLen置0,相当于清理了WaitBuffer中的数据
                        connCache.WaitLen = 0;
                    }
                }
    
            }
        }
    }

    至此,整个模型的使用就完成了。代码图省事就直接放上去了,见谅!

    结果如下:

    大家可以试着修改下代码使发送更快,一次发送数据更多,再来多个客户端试一下效果。 

    注:本文中的代码是用来进行演示的简化后的代码,并不保证没有缺陷,仅为了阐述这一模型。

  • 相关阅读:
    在IDEA中使用maven
    使用IDEA创建JavaWeb项目 部署本地tomcat并运行
    Java
    c++
    Vue学习
    svn 小程序地址
    SVN 上传代码
    eclipse插件 --js
    https抓包
    eclipse 断点位置发生莫名其妙的位移
  • 原文地址:https://www.cnblogs.com/houkui/p/4236923.html
Copyright © 2011-2022 走看看