namespace Test { using System; using System.Net; using System.Net.Sockets; using System.Text; using System.IO; using System.Threading; using Microshaoft; public class SyncSocketAsyncQueueHandlerEchoServer { public static void StartListening() { //IPHostEntry ipHostInfo = Dns.Resolve(Dns.GetHostName()); IPAddress ipAddress; IPAddress.TryParse("127.0.0.1", out ipAddress); IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 12021); Socket listener = new Socket ( AddressFamily.InterNetwork , SocketType.Stream , ProtocolType.Tcp ); listener.Bind(localEndPoint); Console.WriteLine("Listening ..."); listener.Listen(10); int i = 0; while (true) { int id = i++; UserToken token = new UserToken(); token.ConnectionID = id; Socket socket = listener.Accept(); Console.WriteLine("accept"); SocketDataHandler<UserToken> handler = new SocketDataHandler<UserToken>(socket, token); handler.BufferLength = 10; handler.ID = id; handler.DataReceivedAsync += new SocketDataHandler<UserToken>.DataReceivedEventHandler(DataReceivedProcessAsync); handler.StartReceiveData(); } } public static void DataReceivedProcessAsync(SocketDataHandler<UserToken> sender, byte[] data) { string s = Encoding.ASCII.GetString(data); Console.WriteLine("[{0}],[{1}],[{2}]", sender.Token.ConnectionID, s, data.Length); sender.SendDataAsync(data); } public static int Main(String[] args) { StartListening(); return 0; } } public class UserToken { public string UserID; public int ConnectionID; } } namespace Microshaoft { using System; using System.Net; using System.Net.Sockets; using System.Text; using System.IO; using System.Threading; public class SocketDataHandler<T> { private AsyncQueue<byte[]> _sendQueue; private AsyncQueue<byte[]> _receiveQueue; private T _token; public T Token { get { return _token; } } public SocketDataHandler(Socket socket, T token) { _sendQueue = new AsyncQueue<byte[]>(); _sendQueue.OnDequeue += new AsyncQueue<byte[]>.QueueEventHandler(SendQueue_OnDequeue); //_sendQueue.OnQueueLog += new AsyncQueue<byte[]>.QueueLogEventHandler(Queue_OnQueueLog); _sendQueue.OnException += new AsyncQueue<byte[]>.ExceptionEventHandler(Queue_OnException); _sendQueue.MaxConcurrentThreadsCount = 1; _receiveQueue = new AsyncQueue<byte[]>(); _receiveQueue.OnDequeue += new AsyncQueue<byte[]>.QueueEventHandler(ReceiveQueue_OnDequeue); //_receiveQueue.OnQueueLog += new AsyncQueue<byte[]>.QueueLogEventHandler(Queue_OnQueueLog); _receiveQueue.OnException += new AsyncQueue<byte[]>.ExceptionEventHandler(Queue_OnException); _socket = socket; _token = token; } void Queue_OnException(Exception e) { Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine(e.Message); Console.ResetColor(); } void Queue_OnQueueLog(string logMessage) { // Console.WriteLine(logMessage); } void SendQueue_OnDequeue(byte[] element) { _socket.Send(element); } void ReceiveQueue_OnDequeue(byte[] element) { if (DataReceivedAsync != null) { DataReceivedAsync(this, element); } } public enum ExceptionHandleAction : int { Ignore , ReThrow , Abort } private Thread _thread; private Socket _socket; private int _id; public int ID { set { _id = value; } get { return _id; } } private int _bufferLength = 1024; public int BufferLength { set { _bufferLength = value; } } public void SendDataAsync(byte[] data) { _sendQueue.Enqueue(data); } public void SendData(byte[] data) { _socket.Send(data); } public delegate void DataReceivedEventHandler ( SocketDataHandler<T> sender , byte[] data ); public event DataReceivedEventHandler DataReceived; public event DataReceivedEventHandler DataReceivedAsync; public delegate void ExceptionOccursEventHandler ( SocketDataHandler<T> sender , Exception exception , ExceptionHandleAction action ); public ExceptionOccursEventHandler ExceptionOccurs; public void StartReceiveData() { ThreadStart ts = new ThreadStart(RecieveDataThreadProcess); _thread = new Thread(ts); _thread.Start(); } private void RecieveDataThreadProcess() { while (true) { byte[] buffer = null; try { buffer = SocketDataHelper.ReadDataToBytes(_bufferLength, _socket); } catch (Exception e) { if (ExceptionOccurs != null) { ExceptionHandleAction a = ExceptionHandleAction.Ignore; ExceptionOccurs(this, e, a); if (a == ExceptionHandleAction.ReThrow) { throw; } else if (a == ExceptionHandleAction.Abort) { break; } } } if (buffer != null) { if (buffer.Length > 0) { _receiveQueue.Enqueue(buffer); if (DataReceived != null) { DataReceived(this, buffer); } } } Thread.Sleep(100); } } } } namespace Microshaoft { using System; using System.IO; using System.Net.Sockets; public static class SocketDataHelper { public static void ReadDataToFixedLengthBytes ( Socket socket , ref byte[] buffer ) { int p = 0; int l = buffer.Length; while (p < l) { int r = socket.Receive ( buffer , p , l - p , SocketFlags.None ); p += r; } } public static byte[] ReadDataToFixedLengthBytes ( int length , Socket socket ) { int p = 0; byte[] data = new byte[length]; while (p < length) { int r = socket.Receive ( data , p , length - p , SocketFlags.None ); p += r; } return data; } public static byte[] ReadDataToBytes ( int length, Socket socket ) { byte[] data = new byte[length]; int r = 0; //SocketError error; r = socket.Receive ( data , 0 , length , SocketFlags.None //, out error ); //if (error != SocketError.Success) //{ // Console.WriteLine("socket error: {0}", Enum.GetName(typeof(SocketError), error)); // Thread.Sleep(100); //} if (r > 0) { Array.Resize<byte>(ref data, r); } else { data = null; } return data; } public static int ReadDataToBytes ( Socket socket , ref byte[] buffer ) { int r = 0; //SocketError error; int l = buffer.Length; r = socket.Receive ( buffer , 0 , l , SocketFlags.None //, out error ); //if (error != SocketError.Success) //{ // Console.WriteLine("socket error: {0}", Enum.GetName(typeof(SocketError), error)); // Thread.Sleep(100); //} if (r > 0) { Array.Resize<byte>(ref buffer, r); } else { buffer = null; } return r; } } public static class StreamDataHelper { private static byte[] ReadDataToBytes(Stream stream) { byte[] buffer = new byte[64 * 1024]; MemoryStream ms = new MemoryStream(); int r = 0; int l = 0; long position = -1; if (stream.CanSeek) { position = stream.Position; stream.Position = 0; } while (true) { r = stream.Read(buffer, 0, buffer.Length); if (r > 0) { l += r; ms.Write(buffer, 0, r); } else { break; } } byte[] bytes = new byte[l]; ms.Position = 0; ms.Read(bytes, 0, (int) l); ms.Close(); ms = null; if (position >= 0) { stream.Position = position; } return bytes; } } } namespace Microshaoft { using System; using System.Threading; using System.Collections.Generic; public class AsyncQueue<T> where T : class { public delegate void QueueEventHandler(T element); public event QueueEventHandler OnDequeue; public delegate void QueueLogEventHandler(string logMessage); public event QueueLogEventHandler OnQueueLog; public delegate void ExceptionEventHandler(Exception exception); public event ExceptionEventHandler OnException; private Queue<T> _queue = new Queue<T>(); private static object _SyncLockObject = new object(); private int _concurrentThreadsCount = 0; //Microshaoft 用于控制并发线程数 private volatile bool _queueRuning = false; private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1 public int MaxConcurrentThreadsCount { set { _maxConcurrentThreadsCount = value; } get { return _maxConcurrentThreadsCount; } } private long _EnqueueCount = 0; //入列计数器 public long EnqueueCount { get { return _EnqueueCount; } } private long _DequeueCount = 0; //出列计数器 public long DequeueCount { get { return _DequeueCount; } } //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环) private void QueueRun() //Microshaoft ThreadStart { if (!_queueRuning) { _queueRuning = true; lock (_SyncLockObject) { ThreadStart ts = new ThreadStart(QueueRunThreadProcess); Thread t = new Thread(ts); t.Name = "QueueRunThreadProcess"; t.Start(); } } } public int Count { get { return _queue.Count; } } public int ConcurrentThreadsCount { get { return _concurrentThreadsCount; } } private void QueueRunThreadProcess() { if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "Queue Runing Start ..." , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } while (_queue.Count > 0) //Microshaoft 死循环 { T element = null; int threadID = -1; lock (_SyncLockObject) { if (_concurrentThreadsCount < _maxConcurrentThreadsCount) { if (_queue.Count > 0) { Interlocked.Increment(ref _concurrentThreadsCount); threadID = _concurrentThreadsCount; if (_concurrentThreadsCount >= _maxConcurrentThreadsCount) { if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "Threads is Full!" , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } } if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "Threads ++ !" , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } element = _queue.Dequeue(); } } } if (element != null) { //Microshaoft ThreadPool.QueueUserWorkelement(new WaitCallback(OnDequeueThreadProcess), element); ThreadProcessState tps = new ThreadProcessState(); tps.element = element; tps.Sender = this; Thread t = new Thread(new ThreadStart(tps.ThreadProcess)); t.Name = string.Format("ConcurrentThread[{0}]", threadID); t.Start(); } } _queueRuning = false; if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "Queue Runing Stopped!" , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } } public void Enqueue(T element) { try { lock (_SyncLockObject) //还算并发吗? { _queue.Enqueue(element); } Interlocked.Increment(ref _EnqueueCount); } catch (Exception e) { if (OnException != null) { OnException(e); } } if (!_queueRuning) { QueueRun(); } } private void OnDequeueThreadProcess(T element) { try { if (OnDequeue != null) { OnDequeue(element); } Interlocked.Increment(ref _DequeueCount); DequeueProcess(); } catch (Exception e) { if (OnException != null) { OnException(e); } } finally { Interlocked.Decrement(ref _concurrentThreadsCount); if (_concurrentThreadsCount == 0) { if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "All Threads Finished!" , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } } if (OnQueueLog != null) { OnQueueLog ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}" , "Threads -- !" , _concurrentThreadsCount , _queue.Count , Thread.CurrentThread.Name ) ); } } } private void DequeueProcess() { while (_queue.Count > 0) { T element = null; lock (_SyncLockObject) { if (_queue.Count > 0) { element = _queue.Dequeue(); } } if (element != null) { if (OnDequeue != null) { OnDequeue(element); } Interlocked.Increment(ref _DequeueCount); } } } private class ThreadProcessState { private AsyncQueue<T> _sender; public AsyncQueue<T> Sender { get { return _sender; } set { _sender = value; } } private T _element; public T element { get { return _element; } set { _element = value; } } public void ThreadProcess() { _sender.OnDequeueThreadProcess(_element); } } } }