zoukankan      html  css  js  c++  java
  • 写自己的Socket框架(一)

    本系列仅介绍可用于生产环境的C#异步Socket框架,如果您在其他地方看到类似的代码,不要惊讶,那可能就是我在参考开源代码时,直接“剽窃”过来的。

    1、在脑海里思考一下整个socket的链接的处理流程,于是便有了下图。

    2、首先就开始监听,代码如下:

    public override bool Start()
            {
                this._socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                //设置KeeyAlive,如果客户端不主动发消息时,Tcp本身会发一个心跳包,来通知服务器,这是一个保持通讯的链接。
                //避免等到下一次通讯时,才知道链接已经断开。
                this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
                this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
                try
                {
                    this._socket.Bind(base.SocketConfig.Point);
                    this._socket.Listen(base.SocketConfig.Backlog);
    
                    this._socket_args = new SocketAsyncEventArgs();
                    this._socket_args.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptSocketCompleted);
    
                    //在链接过来的时候,如果IO没有挂起,则AcceptAsync为False,表明同步完成。
                    if (!this._socket.AcceptAsync(this._socket_args))
                    {
                        AcceptSocketCompleted(this._socket, this._socket_args);
                    }
                    return true;
                }
                catch (Exception ex)
                {
                    return false;
                }
            }
    
    
    void AcceptSocketCompleted(object sender, SocketAsyncEventArgs e)
            {
                System.Net.Sockets.Socket socket = null;
                if (e.SocketError != SocketError.Success)
                {
                        return;
                }
                else
                {
                    socket = e.AcceptSocket;
                }
                e.AcceptSocket = null;
                bool willRaiseEvent = false;
                try
                {
                    //继续监听该端口,在处理逻辑时,不影响其他链接的数据传送。
                    willRaiseEvent = this._socket.AcceptAsync(e);
                }
                catch (Exception ex)
                {
                    willRaiseEvent = true;
                }
    
                if (socket != null)
                    OnNewClientAccepted(socket, null);
    
                if (!willRaiseEvent)
                    AcceptSocketCompleted(null, e);
            }
    View Code

    3、这个时候链接过来了,就要开始入队列了,如果没有这方面的需求,这一步可以忽略,代码如下:

    public class SocketProxy
        {
            public System.Net.Sockets.Socket Client;
    
            public DateTime Timeout = DateTime.Now;
    
        }
    
    
    public class SocketConnectionQueue : IDisposable
        {
            private Queue<SocketProxy> _queue;
    
            private readonly object _syncObject = new object();
    
            private bool _isStop = false;
    
            private Thread _thread;
    
            public Action<SocketProxy> Connected;
    
            public SocketConnectionQueue()
            {
                if (_queue == null)
                {
                    _queue = new Queue<SocketProxy>();
                }
    
                if (_thread == null)
                {
                    _thread = new Thread(Thread_Work)
                    {
                        IsBackground = true,
                        Priority = ThreadPriority.Highest
    
                    };
                    _thread.Start();
                }
            }
    
            public void Push(SocketProxy connect)
            {
                lock (_syncObject)
                {
                    if (_queue != null)
                    {
                        _queue.Enqueue(connect);
                    }
                }
            }
    
            public void Thread_Work()
            {
                while (!_isStop)
                {
                    SocketProxy[] socketConnect = null;
                    lock (_syncObject)
                    {
                        if (_queue.Count > 0)
                        {
                            socketConnect = new SocketProxy[_queue.Count];
                            _queue.CopyTo(socketConnect, 0);
                            _queue.Clear();
                        }
                    }
    
                    if (socketConnect != null && socketConnect.Length > 0)
                    {
                        foreach (var client in socketConnect)
                        {
                            if (Connected != null)
                            {
                                Connected.Invoke(client);
                            }
                        }
                    }
                    Thread.Sleep(10);
                }
            }
    
            public void Dispose()
            {
                _isStop = true;
                if (_thread != null)
                {
                    _thread.Join();
                }
            }
        }
    View Code

    4、入完队列,就要开始从链接池子里面分配资源了,你也可以不做链接池,在每次请求过来的时候去实例化一个链接,然后将这个链接入池,我的做法是在程序初始化的时候就分配好一定的资源,代码如下:

    public class SocketConnectionPool : IDisposable
        {
            private ServerConfig _serverConfig;
    
            public IAppServer AppServer;
    
            private ConcurrentStack<SocketConnection> _connectPool;
    
            private long connect_id = 0;
            private byte[] _buffer;
            private readonly object _syncObject = new object();
    
            private SocketConnectionQueue _queue;
    
            public Action<System.Net.Sockets.Socket, SocketConnection> Connected;
    
            public long GenerateId()
            {
                if (connect_id == long.MaxValue)
                {
                    connect_id = 0;
                }
                connect_id++;
                return connect_id;
            }
    
            public SocketConnectionPool(IAppServer server)
            {
                this.AppServer = server;
                this._serverConfig = server.AppConfig;
    
            }
    
            public void Init()
            {
                var connects = new List<SocketConnection>(this._serverConfig.MaxConnectionNumber);
                _buffer = new byte[this._serverConfig.BufferSize];
                SocketAsyncEventArgs arg;
                for (var i = 0; i < this._serverConfig.MaxConnectionNumber; i++)
                {
                    arg = new SocketAsyncEventArgs();
                    arg.SetBuffer(_buffer, 0, _buffer.Length);
                    connects.Add(new SocketConnection(arg, this));
                }
                _connectPool = new ConcurrentStack<SocketConnection>(connects);
                if (_queue == null)
                {
                    _queue = new SocketConnectionQueue();
                }
                
                _queue.Connected = OnConnected;
            }
    
            public void Push(System.Net.Sockets.Socket socket)
            {
                SocketProxy proxy = new SocketProxy()
                {
                    Client = socket
                };
                _queue.Push(proxy);
            }
    
            public void OnConnected(SocketProxy proxy)
            {
                //如果发现队列里面的链接,在Timeout时间内,都没有分配到资源,则关掉链接并丢弃。
                int timeout = (int)(DateTime.Now - proxy.Timeout).TotalSeconds;
                if (timeout >= this._serverConfig.Timeout)
                {
                    proxy.Client.Close();
                    return;
                }
                else
                {
                    //没有分配到资源重新入列。
                    SocketConnection connect = this.GetConnectionFromPool();
                    if (connect == null)
                    {
                        _queue.Push(proxy);
                    }
                    else
                    {
                        if (this.Connected != null)
                        {
                            this.Connected(proxy.Client, connect);
                        }
                    }
                }
            }
    
            /// <summary>
            /// 从链接池去取链接(LIFO)
            /// </summary>
            /// <returns></returns>
            public SocketConnection GetConnectionFromPool()
            {
                //_queue.Push();
                SocketConnection connect;
                if (!_connectPool.TryPop(out connect))
                {
                    return null;
                }
                lock (_syncObject)
                {
                    long connect_id = this.GenerateId();
                    connect.ConnectId = connect_id;
                }
                return connect;
            }
            /// <summary>
            /// 释放链接,并放回链接池
            /// </summary>
            /// <param name="connect"></param>
            public void ReleaseConnection(SocketConnection connect)
            {
                _connectPool.Push(connect);
                LogHelper.Debug(connect.ConnectId + "放回ConnectPool");
            }
    
            public void Dispose()
            {
                _queue.Dispose();
            }
        }
    View Code

    在Init()里面初始化了很多个SocketConnection,这个就是我们用来管理具体的单个链接的class,代码如下:

    public class SocketConnection
        {
            public SocketFlag Flag { get; private set; }
            
            public SocketConnectionPool Pool { get { return _pool; } private set { } }
            private SocketConnectionPool _pool;
    
            public SocketAsyncEventArgs RecevieEventArgs { get; set; }
    
            public long ConnectId { get; set; }
    
            public SocketConnection()
            {
                this.Flag = SocketFlag.Error;
            }
    
            public SocketConnection(SocketAsyncEventArgs args, SocketConnectionPool pool)
            {
                RecevieEventArgs = args;
                RecevieEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);
                
                this.Flag = SocketFlag.Busy;
                this._pool = pool;
            }
    
            void SocketEventArgs_Completed(object sender, SocketAsyncEventArgs e)
            {
                var socketSession = e.UserToken as SocketSession;
                if (socketSession == null)
                {
                    this.Flag = SocketFlag.Error;
                    this.Close();
                    return;
                }
    
                switch (e.LastOperation)
                { 
                    case SocketAsyncOperation.Receive:
                        socketSession.ReceiveData(e);
                        break;
                    default:
                        break;
                }
            }
    
            public void Initialise(SocketSession session)
            {
                this.RecevieEventArgs.UserToken = session;
                this.Flag = SocketFlag.Busy;
    
                session.Closed += () =>
                {
                    this.Close();
                };
            }
    
            public void Reset()
            {
                //ConnectId = 0;
                this.RecevieEventArgs.UserToken = null;
                this.Flag = SocketFlag.Idle;
            }
    
            private void Close()
            {
                this.Reset();
                LogHelper.Debug(ConnectId + " reset");
                this._pool.ReleaseConnection(this);
            }
        }
    View Code
    本人对代码不做任何知识产权限制,也不保证所有的代码皆为原创。
  • 相关阅读:
    Codeforces Round #251 (Div. 2) A
    topcoder SRM 623 DIV2 CatAndRat
    topcoder SRM 623 DIV2 CatchTheBeatEasy
    topcoder SRM 622 DIV2 FibonacciDiv2
    topcoder SRM 622 DIV2 BoxesDiv2
    Leetcode Linked List Cycle II
    leetcode Linked List Cycle
    Leetcode Search Insert Position
    关于vim插件
    Codeforces Round #248 (Div. 2) B. Kuriyama Mirai's Stones
  • 原文地址:https://www.cnblogs.com/selfteam/p/3874761.html
Copyright © 2011-2022 走看看