zoukankan      html  css  js  c++  java
  • Sync Socket but Async Queue handler Echo Server

    
    namespace Test
    {
        using System;
        using System.Net;
        using System.Net.Sockets;
        using System.Text;
        using System.IO;
        using System.Threading;
        using Microshaoft;
        public class SyncSocketAsyncQueueHandlerEchoServer
        {
            public static void StartListening()
            {
                //IPHostEntry ipHostInfo = Dns.Resolve(Dns.GetHostName());
                IPAddress ipAddress;
                IPAddress.TryParse("127.0.0.1", out ipAddress);
                IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 12021);
                Socket listener = new Socket
                                        (
                                            AddressFamily.InterNetwork
                                            , SocketType.Stream
                                            , ProtocolType.Tcp
                                        );
                    listener.Bind(localEndPoint);
                    Console.WriteLine("Listening ...");
                    listener.Listen(10);
                    int i = 0;
                    while (true)
                    {
                        int id = i++;
                        UserToken token = new UserToken();
                        token.ConnectionID = id;
                        Socket socket = listener.Accept();
                        Console.WriteLine("accept");
                        SocketDataHandler<UserToken> handler = new SocketDataHandler<UserToken>(socket, token);
                        handler.BufferLength = 10;
                        handler.ID = id;
                        handler.DataReceivedAsync += new SocketDataHandler<UserToken>.DataReceivedEventHandler(DataReceivedProcessAsync);
                        handler.StartReceiveData();
                    }
            }
            public static void DataReceivedProcessAsync(SocketDataHandler<UserToken> sender, byte[] data)
            {
                string s = Encoding.ASCII.GetString(data);
                Console.WriteLine("[{0}],[{1}],[{2}]", sender.Token.ConnectionID, s, data.Length);
                sender.SendDataAsync(data);
            }
            public static int Main(String[] args)
            {
                StartListening();
                return 0;
            }
        }
        public class UserToken
        {
            public string UserID;
            public int ConnectionID;
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Net;
        using System.Net.Sockets;
        using System.Text;
        using System.IO;
        using System.Threading;
        public class SocketDataHandler<T>
        {
            private AsyncQueue<byte[]> _sendQueue;
            private AsyncQueue<byte[]> _receiveQueue;
            private T _token;
            public T Token
            {
                get
                {
                    return _token;
                }
            }
            public SocketDataHandler(Socket socket, T token)
            {
                _sendQueue = new AsyncQueue<byte[]>();
                _sendQueue.OnDequeue += new AsyncQueue<byte[]>.QueueEventHandler(SendQueue_OnDequeue);
                //_sendQueue.OnQueueLog += new AsyncQueue<byte[]>.QueueLogEventHandler(Queue_OnQueueLog);
                _sendQueue.OnException += new AsyncQueue<byte[]>.ExceptionEventHandler(Queue_OnException);
                _sendQueue.MaxConcurrentThreadsCount = 1;
                _receiveQueue = new AsyncQueue<byte[]>();
                _receiveQueue.OnDequeue += new AsyncQueue<byte[]>.QueueEventHandler(ReceiveQueue_OnDequeue);
                //_receiveQueue.OnQueueLog += new AsyncQueue<byte[]>.QueueLogEventHandler(Queue_OnQueueLog);
                _receiveQueue.OnException += new AsyncQueue<byte[]>.ExceptionEventHandler(Queue_OnException);
                _socket = socket;
                _token = token;
            }
            void Queue_OnException(Exception e)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.WriteLine(e.Message);
                Console.ResetColor();
            }
            void Queue_OnQueueLog(string logMessage)
            {
    //            Console.WriteLine(logMessage);
            }
            void SendQueue_OnDequeue(byte[] element)
            {
                _socket.Send(element);
            }
            void ReceiveQueue_OnDequeue(byte[] element)
            {
                if (DataReceivedAsync != null)
                {
                    DataReceivedAsync(this, element);
                }
            }
            public enum ExceptionHandleAction : int
            {
                Ignore
                ,
                ReThrow
                ,
                Abort
            }
            private Thread _thread;
            private Socket _socket;
            private int _id;
            public int ID
            {
                set
                {
                    _id = value;
                }
                get
                {
                    return _id;
                }
            }
            private int _bufferLength = 1024;
            public int BufferLength
            {
                set
                {
                    _bufferLength = value;
                }
            }
            public void SendDataAsync(byte[] data)
            {
                _sendQueue.Enqueue(data);
            }
            public void SendData(byte[] data)
            {
                _socket.Send(data);
            }
            public delegate void DataReceivedEventHandler
                                                    (
                                                        SocketDataHandler<T> sender
                                                        , byte[] data
                                                    );
            public event DataReceivedEventHandler DataReceived;
            public event DataReceivedEventHandler DataReceivedAsync;
            public delegate void ExceptionOccursEventHandler
                                                    (
                                                        SocketDataHandler<T> sender
                                                        , Exception exception
                                                        , ExceptionHandleAction action
                                                    );
            public ExceptionOccursEventHandler ExceptionOccurs;
            public void StartReceiveData()
            {
                ThreadStart ts = new ThreadStart(RecieveDataThreadProcess);
                _thread = new Thread(ts);
                _thread.Start();
            }
            private void RecieveDataThreadProcess()
            {
                while (true)
                {
                    byte[] buffer = null;
                    try
                    {
                        buffer = SocketDataHelper.ReadDataToBytes(_bufferLength, _socket);
                    }
                    catch (Exception e)
                    {
                        if (ExceptionOccurs != null)
                        {
                            ExceptionHandleAction a = ExceptionHandleAction.Ignore;
                            ExceptionOccurs(this, e, a);
                            if (a == ExceptionHandleAction.ReThrow)
                            {
                                throw;
                            }
                            else if (a == ExceptionHandleAction.Abort)
                            {
                                break;
                            }
                        }
                    }
                    if (buffer != null)
                    {
                        if (buffer.Length > 0)
                        {
                            _receiveQueue.Enqueue(buffer);
                            if (DataReceived != null)
                            {
                                DataReceived(this, buffer);
                            }
                        }
                    }
                    Thread.Sleep(100);
                }
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.IO;
        using System.Net.Sockets;
        public static class SocketDataHelper
        {
            public static void ReadDataToFixedLengthBytes
                                    (
                                        Socket socket ,
                                        ref byte[] buffer
                                    )
            {
                int p = 0;
                int l = buffer.Length;
                while (p < l)
                {
                    int r = socket.Receive
                                        (
                                            buffer
                                            , p
                                            , l - p
                                            , SocketFlags.None
                                        );
                    p += r;
                }
            }
            public static byte[] ReadDataToFixedLengthBytes
                                        (
                                            int length ,
                                            Socket socket 
                                        )
            {
                int p = 0;
                byte[] data = new byte[length];
                while (p < length)
                {
                    int r = socket.Receive
                                        (
                                            data
                                            , p
                                            , length - p
                                            , SocketFlags.None
                                        );
                    p += r;
                }
                return data;
            }
            public static byte[] ReadDataToBytes
                                        (
                                            int length,
                                            Socket socket
                                        )
            {
                byte[] data = new byte[length];
                int r = 0;
                //SocketError error;
                r = socket.Receive
                                    (
                                        data
                                        , 0
                                        , length
                                        , SocketFlags.None
                                        //, out error
                                    );
                //if (error != SocketError.Success)
                //{
                //    Console.WriteLine("socket error: {0}", Enum.GetName(typeof(SocketError), error));
                //    Thread.Sleep(100);
                //}
                if (r > 0)
                {
                    Array.Resize<byte>(ref data, r);
                }
                else
                {
                    data = null;
                }
                return data;
            }
            public static int ReadDataToBytes
                                    (
                                        Socket socket ,
                                        ref byte[] buffer
                                    )
            {
                int r = 0;
                //SocketError error;
                int l = buffer.Length;
                r = socket.Receive
                                (
                                    buffer
                                    , 0
                                    , l
                                    , SocketFlags.None
                                    //, out error
                                );
                //if (error != SocketError.Success)
                //{
                //    Console.WriteLine("socket error: {0}", Enum.GetName(typeof(SocketError), error));
                //    Thread.Sleep(100);
                //}
                if (r > 0)
                {
                    Array.Resize<byte>(ref buffer, r);
                }
                else
                {
                    buffer = null;
                }
                return r;
            }
        }
        public static class StreamDataHelper
        {
            private static byte[] ReadDataToBytes(Stream stream)
            {
                byte[] buffer = new byte[64 * 1024];
                MemoryStream ms = new MemoryStream();
                int r = 0;
                int l = 0;
                long position = -1;
                if (stream.CanSeek)
                {
                    position = stream.Position;
                    stream.Position = 0;
                }
                while (true)
                {
                    r = stream.Read(buffer, 0, buffer.Length);
                    if (r > 0)
                    {
                        l += r;
                        ms.Write(buffer, 0, r);
                    }
                    else
                    {
                        break;
                    }
                }
                byte[] bytes = new byte[l];
                ms.Position = 0;
                ms.Read(bytes, 0, (int) l);
                ms.Close();
                ms = null;
                if (position >= 0)
                {
                    stream.Position = position;
                }
                return bytes;
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Threading;
        using System.Collections.Generic;
        public class AsyncQueue<T>
                            where T : class
        {
            public delegate void QueueEventHandler(T element);
            public event QueueEventHandler OnDequeue;
            public delegate void QueueLogEventHandler(string logMessage);
            public event QueueLogEventHandler OnQueueLog;
            public delegate void ExceptionEventHandler(Exception exception);
            public event ExceptionEventHandler OnException;
            private Queue<T> _queue = new Queue<T>();
            private static object _SyncLockObject = new object();
            private int _concurrentThreadsCount = 0; //Microshaoft 用于控制并发线程数
            private volatile bool _queueRuning = false;
            private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
            public int MaxConcurrentThreadsCount
            {
                set
                {
                    _maxConcurrentThreadsCount = value;
                }
                get
                {
                    return _maxConcurrentThreadsCount;
                }
            }
            private long _EnqueueCount = 0; //入列计数器
            public long EnqueueCount
            {
                get
                {
                    return _EnqueueCount;
                }
            }
            private long _DequeueCount = 0; //出列计数器
            public long DequeueCount
            {
                get
                {
                    return _DequeueCount;
                }
            }
            //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
            private void QueueRun() //Microshaoft ThreadStart
            {
                if (!_queueRuning)
                {
                    _queueRuning = true;
                    lock (_SyncLockObject)
                    {
                        ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                        Thread t = new Thread(ts);
                        t.Name = "QueueRunThreadProcess";
                        t.Start();
                    }
                }
            }
            public int Count
            {
                get
                {
                    return _queue.Count;
                }
            }
            public int ConcurrentThreadsCount
            {
                get
                {
                    return _concurrentThreadsCount;
                }
            }
            private void QueueRunThreadProcess()
            {
                if (OnQueueLog != null)
                {
                    OnQueueLog
                        (
                            string.Format
                                    (
                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                        , "Queue Runing Start ..."
                                        , _concurrentThreadsCount
                                        , _queue.Count
                                        , Thread.CurrentThread.Name
                                    )
                        );
                }
                while (_queue.Count > 0) //Microshaoft 死循环
                {
                    T element = null;
                    int threadID = -1;
                    lock (_SyncLockObject)
                    {
                        if (_concurrentThreadsCount < _maxConcurrentThreadsCount)
                        {
                            if (_queue.Count > 0)
                            {
                                Interlocked.Increment(ref _concurrentThreadsCount);
                                threadID = _concurrentThreadsCount;
                                if (_concurrentThreadsCount >= _maxConcurrentThreadsCount)
                                {
                                    if (OnQueueLog != null)
                                    {
                                        OnQueueLog
                                            (
                                                string.Format
                                                        (
                                                            "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                            , "Threads is Full!"
                                                            , _concurrentThreadsCount
                                                            , _queue.Count
                                                            , Thread.CurrentThread.Name
                                                        )
                                            );
                                    }
                                }
                                if (OnQueueLog != null)
                                {
                                    OnQueueLog
                                        (
                                            string.Format
                                                    (
                                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                        , "Threads ++ !"
                                                        , _concurrentThreadsCount
                                                        , _queue.Count
                                                        , Thread.CurrentThread.Name
                                                    )
                                        );
                                }
                                element = _queue.Dequeue();
                            }
                        }
                    }
                    if (element != null)
                    {
                        //Microshaoft ThreadPool.QueueUserWorkelement(new WaitCallback(OnDequeueThreadProcess), element);
                        ThreadProcessState tps = new ThreadProcessState();
                        tps.element = element;
                        tps.Sender = this;
                        Thread t = new Thread(new ThreadStart(tps.ThreadProcess));
                        t.Name = string.Format("ConcurrentThread[{0}]", threadID);
                        t.Start();
                    }
                }
                _queueRuning = false;
                if (OnQueueLog != null)
                {
                    OnQueueLog
                        (
                            string.Format
                                (
                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                    , "Queue Runing Stopped!"
                                    , _concurrentThreadsCount
                                    , _queue.Count
                                    , Thread.CurrentThread.Name
                                )
                        );
                }
            }
            public void Enqueue(T element)
            {
                try
                {
                    lock (_SyncLockObject) //还算并发吗?
                    {
                        _queue.Enqueue(element);
                    }
                    Interlocked.Increment(ref _EnqueueCount);
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                if (!_queueRuning)
                {
                    QueueRun();
                }
            }
            private void OnDequeueThreadProcess(T element)
            {
                try
                {
                    if (OnDequeue != null)
                    {
                        OnDequeue(element);
                    }
                    Interlocked.Increment(ref _DequeueCount);
                    DequeueProcess();
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                finally
                {
                    Interlocked.Decrement(ref _concurrentThreadsCount);
                    if (_concurrentThreadsCount == 0)
                    {
                        if (OnQueueLog != null)
                        {
                            OnQueueLog
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                , "All Threads Finished!"
                                                , _concurrentThreadsCount
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                            )
                                );
                        }
                    }
                    if (OnQueueLog != null)
                    {
                        OnQueueLog
                            (
                                string.Format
                                        (
                                            "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                            , "Threads -- !"
                                            , _concurrentThreadsCount
                                            , _queue.Count
                                            , Thread.CurrentThread.Name
                                        )
                            );
                    }
                }
            }
            private void DequeueProcess()
            {
                while (_queue.Count > 0)
                {
                    T element = null;
                    lock (_SyncLockObject)
                    {
                        if (_queue.Count > 0)
                        {
                            element = _queue.Dequeue();
                        }
                    }
                    if (element != null)
                    {
                        if (OnDequeue != null)
                        {
                            OnDequeue(element);
                        }
                        Interlocked.Increment(ref _DequeueCount);
                    }
                }
            }
            private class ThreadProcessState
            {
                private AsyncQueue<T> _sender;
                public AsyncQueue<T> Sender
                {
                    get
                    {
                        return _sender;
                    }
                    set
                    {
                        _sender = value;
                    }
                }
                private T _element;
                public T element
                {
                    get
                    {
                        return _element;
                    }
                    set
                    {
                        _element = value;
                    }
                }
                public void ThreadProcess()
                {
                    _sender.OnDequeueThreadProcess(_element);
                }
            }
        }
    }
    
    
  • 相关阅读:
    c#中对rgb的使用
    编写高质量代码改善C#程序的157个建议——建议141:不知道该不该用大括号时,就用
    编写高质量代码改善C#程序的157个建议——建议140:使用默认的访问修饰符
    编写高质量代码改善C#程序的157个建议——建议139:事件处理器命名采用组合方式
    编写高质量代码改善C#程序的157个建议——建议138:事件和委托变量使用动词或形容词短语命名
    编写高质量代码改善C#程序的157个建议——建议137:委托和事件类型应添加上级后缀
    编写高质量代码改善C#程序的157个建议——建议136:优先使用后缀表示已有类型的新版本
    编写高质量代码改善C#程序的157个建议——建议135: 考虑使用肯定性的短语命名布尔属性
    编写高质量代码改善C#程序的157个建议——建议134:有条件地使用前缀
    编写高质量代码改善C#程序的157个建议——建议133:用camelCasing命名私有字段和局部变量
  • 原文地址:https://www.cnblogs.com/Microshaoft/p/1411216.html
Copyright © 2011-2022 走看看