zoukankan      html  css  js  c++  java
  • SocketAsyncDataHandler SocketAsyncEventArgs ConcurrentAsyncQueue Server/Client

    
    //===========================================================================================
    // Share.cs
    // csc.exe /t:library Share.cs
    //#define c4 //C# 4.0+
    #define c4
    namespace Microshaoft
    {
        using System;
        using System.Threading;
        using System.Diagnostics;
        using System.Collections.Generic;
    #if c4
        using System.Collections.Concurrent;
    #endif
        using Microshaoft;
        public class ConcurrentAsyncQueue<T>
    #if c2
                                    where T : class
    #endif
        {
            public delegate void QueueEventHandler(T element);
            public event QueueEventHandler OnDequeue;
            public delegate void QueueLogEventHandler(string logMessage);
            //public event QueueLogEventHandler OnQueueLog;
            public event QueueLogEventHandler OnQueueRunningThreadStart;
            public event QueueLogEventHandler OnQueueRunningThreadEnd;
            public event QueueLogEventHandler OnDequeueThreadStart;
            public event QueueLogEventHandler OnDequeueThreadEnd;
            public event QueueLogEventHandler OnDequeueAllThreadsEnd;
            public delegate void ExceptionEventHandler(Exception exception);
            public event ExceptionEventHandler OnException;
    #if c2
            private Queue<T> _queue = new Queue<T>();
            private object _syncQueueLockObject = new object();
    #elif c4
            private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    #endif
            //private object _syncQueueRunningLockObject = new object();
            private long _isQueueRunning = 0;
            private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数
            private PerformanceCounter _enqueuePerformanceCounter;
            private PerformanceCounter _dequeuePerformanceCounter;
            private PerformanceCounter _dequeueProcessedPerformanceCounter;
            private PerformanceCounter _queueLengthPerformanceCounter;
            private PerformanceCounter _dequeueThreadStartPerformanceCounter;
            private PerformanceCounter _dequeueThreadEndPerformanceCounter;
            private PerformanceCounter _dequeueThreadsCountPerformanceCounter;
            private PerformanceCounter _queueRunningThreadStartPerformanceCounter;
            private PerformanceCounter _queueRunningThreadEndPerformanceCounter;
            private PerformanceCounter _queueRunningThreadsCountPerformanceCounter;
            private bool _isAttachedPerformanceCounters = false;
            public void AttachPerformanceCounters(string instanceNamePrefix)
            {
                string category = "Microshaoft AsyncConurrentQueue Counters";
                string counter = string.Empty;
                Process process = Process.GetCurrentProcess();
                //int processID = 0;//process.Id;
                string processName = process.ProcessName;
                //string processStartTime = "";//process.StartTime;
                string instanceName = string.Empty;
                instanceName = string.Format
                                        (
                                            "{0}-{1}"
                                            , instanceNamePrefix
                                            , processName
                    //, processID
                    //, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
                                        );
                CounterCreationDataCollection ccdc = new CounterCreationDataCollection();
                if (PerformanceCounterCategory.Exists(category))
                {
                    PerformanceCounterCategory.Delete(category);
                }
                CounterCreationData ccd = null;
                counter = "EnqueueCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueLengthCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueProcessedCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadStartCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadEndCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadsCountCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadStartCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadEndCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadsCountCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                PerformanceCounterCategory.Create
                                                (
                                                    category,
                                                    string.Format("{0} Category Help.", category),
                                                    PerformanceCounterCategoryType.MultiInstance,
                                                    ccdc
                                                );
                counter = "EnqueueCounter";
                _enqueuePerformanceCounter = new PerformanceCounter();
                _enqueuePerformanceCounter.CategoryName = category;
                _enqueuePerformanceCounter.CounterName = counter;
                _enqueuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _enqueuePerformanceCounter.InstanceName = instanceName;
                _enqueuePerformanceCounter.ReadOnly = false;
                _enqueuePerformanceCounter.RawValue = 0;
                counter = "DequeueCounter";
                _dequeuePerformanceCounter = new PerformanceCounter();
                _dequeuePerformanceCounter.CategoryName = category;
                _dequeuePerformanceCounter.CounterName = counter;
                _dequeuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeuePerformanceCounter.InstanceName = instanceName;
                _dequeuePerformanceCounter.ReadOnly = false;
                _dequeuePerformanceCounter.RawValue = 0;
                counter = "DequeueProcessedCounter";
                _dequeueProcessedPerformanceCounter = new PerformanceCounter();
                _dequeueProcessedPerformanceCounter.CategoryName = category;
                _dequeueProcessedPerformanceCounter.CounterName = counter;
                _dequeueProcessedPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueProcessedPerformanceCounter.InstanceName = instanceName;
                _dequeueProcessedPerformanceCounter.ReadOnly = false;
                _dequeueProcessedPerformanceCounter.RawValue = 0;
                counter = "QueueLengthCounter";
                _queueLengthPerformanceCounter = new PerformanceCounter();
                _queueLengthPerformanceCounter.CategoryName = category;
                _queueLengthPerformanceCounter.CounterName = counter;
                _queueLengthPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueLengthPerformanceCounter.InstanceName = instanceName;
                _queueLengthPerformanceCounter.ReadOnly = false;
                _queueLengthPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadStartCounter";
                _dequeueThreadStartPerformanceCounter = new PerformanceCounter();
                _dequeueThreadStartPerformanceCounter.CategoryName = category;
                _dequeueThreadStartPerformanceCounter.CounterName = counter;
                _dequeueThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadStartPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadStartPerformanceCounter.ReadOnly = false;
                _dequeueThreadStartPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadEndCounter";
                _dequeueThreadEndPerformanceCounter = new PerformanceCounter();
                _dequeueThreadEndPerformanceCounter.CategoryName = category;
                _dequeueThreadEndPerformanceCounter.CounterName = counter;
                _dequeueThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadEndPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadEndPerformanceCounter.ReadOnly = false;
                _dequeueThreadEndPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadsCountCounter";
                _dequeueThreadsCountPerformanceCounter = new PerformanceCounter();
                _dequeueThreadsCountPerformanceCounter.CategoryName = category;
                _dequeueThreadsCountPerformanceCounter.CounterName = counter;
                _dequeueThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadsCountPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadsCountPerformanceCounter.ReadOnly = false;
                _dequeueThreadsCountPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadStartCounter";
                _queueRunningThreadStartPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadStartPerformanceCounter.CategoryName = category;
                _queueRunningThreadStartPerformanceCounter.CounterName = counter;
                _queueRunningThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadStartPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadStartPerformanceCounter.ReadOnly = false;
                _queueRunningThreadStartPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadEndCounter";
                _queueRunningThreadEndPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadEndPerformanceCounter.CategoryName = category;
                _queueRunningThreadEndPerformanceCounter.CounterName = counter;
                _queueRunningThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadEndPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadEndPerformanceCounter.ReadOnly = false;
                _queueRunningThreadEndPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadsCountCounter";
                _queueRunningThreadsCountPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadsCountPerformanceCounter.CategoryName = category;
                _queueRunningThreadsCountPerformanceCounter.CounterName = counter;
                _queueRunningThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadsCountPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadsCountPerformanceCounter.ReadOnly = false;
                _queueRunningThreadsCountPerformanceCounter.RawValue = 0;
                _isAttachedPerformanceCounters = true;
            }
            private int _maxConcurrentDequeueThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
            public int MaxConcurrentDequeueThreadsCount
            {
                set
                {
                    _maxConcurrentDequeueThreadsCount = value;
                }
                get
                {
                    return _maxConcurrentDequeueThreadsCount;
                }
            }
            //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
            private void QueueRun() //Microshaoft ThreadStart
            {
                if (Interlocked.Read(ref _concurrentDequeueThreadsCount) < _maxConcurrentDequeueThreadsCount)
                {
                    if (Interlocked.CompareExchange(ref _isQueueRunning, 0, 1) == 0)
                    {
                        ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                        Thread t = new Thread(ts);
                        t.Name = "QueueRunningThreadProcess";
                        t.Start();
                    }
                }
            }
            public int Count
            {
                get
                {
                    return _queue.Count;
                }
            }
            public long ConcurrentThreadsCount
            {
                get
                {
                    return _concurrentDequeueThreadsCount;
                }
            }
            private void QueueRunThreadProcess()
            {
                if (_isAttachedPerformanceCounters)
                {
                    _queueRunningThreadStartPerformanceCounter.Increment();
                    _queueRunningThreadsCountPerformanceCounter.Increment();
                }
                if (OnQueueRunningThreadStart != null)
                {
                    OnQueueRunningThreadStart
                        (
                            string.Format
                                    (
                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}({4}) at {5}"
                                        , "Queue Running Start ..."
                                        , _concurrentDequeueThreadsCount
                                        , _queue.Count
                                        , Thread.CurrentThread.Name
                                        , Thread.CurrentThread.ManagedThreadId
                                        , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                    )
                        );
                }
    #if c2
                while ((_queue.Count > 0)) //Microshaoft 死循环
    #elif c4
                while (!_queue.IsEmpty) //Microshaoft 死循环
    #endif
                {
                    int threadID = -1;
                    {
                        int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                        if (r < _maxConcurrentDequeueThreadsCount)
                        {
                            //if (_queue.Count > 0)
                            {
                                r = (int)Interlocked.Increment(ref _concurrentDequeueThreadsCount);
                                threadID = (int)_concurrentDequeueThreadsCount;
                                //ThreadProcessState tps = new ThreadProcessState();
                                //tps.element = element;
                                //tps.Sender = this;
                                Thread t = new Thread(new ThreadStart(DequeueThreadProcess));
                                t.TrySetApartmentState(ApartmentState.STA);
                                t.Name = string.Format("ConcurrentDequeueProcessThread[{0}]", threadID);
                                t.Start();
                            }
                            ///                        else
                            ///                        {
                            ///                            break;
                            ///                        }
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                //Interlocked.CompareExchange(ref _queueRuning, 0, 1);
                if (OnQueueRunningThreadEnd != null)
                {
                    int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                    OnQueueRunningThreadEnd
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1}, Queue Count {2}, Current Thread: {3}({4}) at {5}"
                                                , "Queue Running Stop ..."
                                                , r
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                                , Thread.CurrentThread.ManagedThreadId
                                                , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                            )
                                );
                }
                if (_isAttachedPerformanceCounters)
                {
                    _queueRunningThreadEndPerformanceCounter.Increment();
                    _queueRunningThreadsCountPerformanceCounter.Decrement();
                }
                Interlocked.Exchange(ref _isQueueRunning, 0);
            }
            public void Enqueue(T element)
            {
                try
                {
    #if c2
                    lock (_syncQueueLockObject) //还算并发吗?
    #endif
                    {
                        _queue.Enqueue(element);
                    }
                    if (_isAttachedPerformanceCounters)
                    {
                        _enqueuePerformanceCounter.Increment();
                        _queueLengthPerformanceCounter.Increment();
                    }
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                //int r = Interlocked.CompareExchange(ref _queueRuning, 1, 0))
                //if (r == 1)
                //{
                QueueRun();
                //}
            }
            private void DequeueThreadProcess()
            {
                if (_isAttachedPerformanceCounters)
                {
                    _dequeueThreadStartPerformanceCounter.Increment();
                    _dequeueThreadsCountPerformanceCounter.Increment();
                }
                if (OnDequeueThreadStart != null)
                {
                    int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                    OnDequeueThreadStart
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                    , "Threads ++ !"
                                                    , r
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                )
                                    );
                }
                bool queueWasNotEmpty = false;
                try
                {
    #if c2
                    while (true)
    #elif c4
                    while (!_queue.IsEmpty)
    #endif
                    {
                        T element
    #if c2
                                = null
    #endif
    ;
    #if c2
                        lock (_syncQueueLockObject)
                        {
                            if (_queue.Count > 0)
                            {
                                element = _queue.Dequeue();
                            }
                            else
                            {
                                //避免QueueRun 死循环
                                break;
                            }
                        }
    #elif c4
                        if (_queue.TryDequeue(out element))
                        {
    #elif c2
                            if (element != null)
                            {
    #endif
                            if (!queueWasNotEmpty)
                            {
                                queueWasNotEmpty = true;
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeuePerformanceCounter.Increment();
                                _queueLengthPerformanceCounter.Decrement();
                            }
                            if (OnDequeue != null)
                            {
                                OnDequeue(element);
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeueProcessedPerformanceCounter.Increment();
                            }
    #if c2
                            }
    #elif c4
                        }
                    }
    #endif
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                finally
                {
                    int r = (int)Interlocked.Decrement(ref _concurrentDequeueThreadsCount);
                    if (OnDequeueThreadEnd != null)
                    {
                        OnDequeueThreadEnd
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                    , "Threads--"
                                                    , r
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                )
                                    );
                    }
                    if (r == 0)
                    {
                        if (OnDequeueAllThreadsEnd != null)
                        {
                            OnDequeueAllThreadsEnd
                                        (
                                            string.Format
                                                    (
                                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                        , "All Threads End"
                                                        , r
                                                        , _queue.Count
                                                        , Thread.CurrentThread.Name
                                                        , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                    )
                                        );
                        }
                    }
                    if (_isAttachedPerformanceCounters)
                    {
                        _dequeueThreadEndPerformanceCounter.Increment();
                        _dequeueThreadsCountPerformanceCounter.Decrement();
                    }
                    if (queueWasNotEmpty)
                    {
                        QueueRun(); //死循环???
                    }
                }
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Diagnostics;
        public static class PerformanceCounterHelper
        {
            public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType)
            {
                CounterCreationData ccd = new CounterCreationData();
                ccd.CounterName = counterName;
                ccd.CounterHelp = string.Format("{0} Help", counterName);
                ccd.CounterType = performanceCounterType;
                return ccd;
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Xml;
        using System.Xml.Schema;
        using System.Xml.Serialization;
        public class CDATA : IXmlSerializable
        {
            private string _text;
            public CDATA()
            {
            }
            public CDATA(string text)
            {
                this._text = text;
            }
            public string Text
            {
                get
                {
                    return _text;
                }
            }
            XmlSchema IXmlSerializable.GetSchema()
            {
                return null;
            }
            void IXmlSerializable.ReadXml(XmlReader reader)
            {
                string s = reader.ReadInnerXml();
                string startTag = "<![CDATA[";
                string endTag = "]]>";
                s = s.Trim(new char[] { '\r', '\n', '\t', ' ' });
                if (s.StartsWith(startTag) && s.EndsWith(endTag))
                {
                    s = s.Substring(startTag.Length, s.LastIndexOf(endTag) - startTag.Length);
                }
                this._text = s;
            }
            void IXmlSerializable.WriteXml(XmlWriter writer)
            {
                writer.WriteCData(this._text);
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Net;
        using System.Net.Sockets;
        //using System.Threading;
        public class SocketAsyncDataHandler<T>
        {
            public Socket _socket;
            public int ReceiveBufferLength
            {
                get;
                private set;
            }
            public T ConnectionToken
            {
                get;
                set;
            }
            private ConcurrentAsyncQueue<byte[]> _receivedAsyncQueue;//= new ConcurrentAsyncQueue<byte[]>();
            private ConcurrentAsyncQueue<byte[]> _sendAsyncQueue;// = new ConcurrentAsyncQueue<byte[]>();
            public delegate void DataPackReceivedEventHandler(SocketAsyncDataHandler<T> sender, byte[] data);
            public event DataPackReceivedEventHandler OnSyncDataPackReceived;
            public event DataPackReceivedEventHandler OnAsyncQueueDataPackReceived;
            public IPAddress RemoteIPAddress
            {
                get
                {
                    return ((IPEndPoint)_socket.RemoteEndPoint).Address;
                }
            }
            public IPAddress LocalIPAddress
            {
                get
                {
                    return ((IPEndPoint)_socket.LocalEndPoint).Address;
                }
            }
            public int SocketID
            {
                get;
                private set;
            }
            public SocketAsyncDataHandler
                                (
                                    Socket socket
                                    , int socketID
                                    , int receiveBufferLength
                                )
            {
                _socket = socket;
                ReceiveBufferLength = receiveBufferLength;
                _sendSocketAsyncEventArgs = new SocketAsyncEventArgs();
                SocketID = socketID;
                _receivedAsyncQueue = new ConcurrentAsyncQueue<byte[]>();
                _receivedAsyncQueue.OnDequeue += new ConcurrentAsyncQueue<byte[]>.QueueEventHandler(ReceivedQueue_OnDequeue);
                _sendAsyncQueue = new ConcurrentAsyncQueue<byte[]>();
                _sendAsyncQueue.OnDequeue += new ConcurrentAsyncQueue<byte[]>.QueueEventHandler(SendAsyncQueue_OnDequeue);
                var buffer = new byte[1024];
                _sendSocketAsyncEventArgs.SetBuffer(buffer, _sendSocketAsyncEventArgsBufferOffset, buffer.Length);
                _sendSocketAsyncEventArgs.Completed += send_OnCompleted;
            }
            void send_OnCompleted(object sender, SocketAsyncEventArgs e)
            {
                if (true)
                {
                }
            }
            private bool _isSendingAsync = false;
            private SocketAsyncEventArgs _sendSocketAsyncEventArgs;
            private int _sendSocketAsyncEventArgsBufferOffset = 0;
            private bool SendDataAsync(byte[] data)
            {
                //lock (_sendSyncLockObject)
                {
                    var r = false;
                    var offset = _sendSocketAsyncEventArgs.Offset;
                    var count = data.Length;
                    byte[] buffer = _sendSocketAsyncEventArgs.Buffer;
                    if (offset + count <= buffer.Length)
                    {
                        Buffer.BlockCopy(data, 0, buffer, _sendSocketAsyncEventArgsBufferOffset, data.Length);
                        if (!_isSendingAsync)
                        {
                            _socket.SendAsync(_sendSocketAsyncEventArgs);
                        }
                    }
                    return r;
                }
            }
            public int HeaderBytesLength
            {
                get;
                private set;
            }
            private bool _isStartedReceiveData = false;
            public void StartReceiveData(int headerBytesLength)
            {
                if (!_isStartedReceiveData)
                {
                    HeaderBytesLength = headerBytesLength;
                    var saeaReceive = new SocketAsyncEventArgs();
                    saeaReceive.Completed += receive_OnCompleted;
                    var buffer = new byte[1024];
                    saeaReceive.SetBuffer(buffer, 0, HeaderBytesLength);
                    _socket.ReceiveAsync(saeaReceive);
                    _isStartedReceiveData = true;
                }
            }
            private bool _isHeader = true;
            void receive_OnCompleted(object sender, SocketAsyncEventArgs e)
            {
                var socket = sender as Socket;
                if (e.BytesTransferred >= 0)
                {
                    byte[] buffer = e.Buffer;
                    int r = e.BytesTransferred;
                    int p = e.Offset;
                    int l = e.Count;
                    if (r < l)
                    {
                        p += r;
                        e.SetBuffer(p, l - p);
                    }
                    else if (r == l)
                    {
                        if (_isHeader)
                        {
                            byte[] data = new byte[l];
                            Buffer.BlockCopy(buffer, 0, data, 0, data.Length);
                            byte[] bytes = new byte[4];
                            Buffer.BlockCopy(data, 0, bytes, 0, bytes.Length);
                            int i = BitConverter.ToInt32(bytes, 0);
                            p += r;
                            e.SetBuffer(p, i);
                            _isHeader = false;
                        }
                        else
                        {
                            byte[] data = new byte[l + HeaderBytesLength];
                            Buffer.BlockCopy(buffer, 0, data, 0, data.Length);
                            _isHeader = true;
                            e.SetBuffer(0, HeaderBytesLength);
                            if (OnAsyncQueueDataPackReceived != null)
                            {
                                _receivedAsyncQueue.Enqueue(data);
                            }
                            if (OnSyncDataPackReceived != null)
                            {
                                OnSyncDataPackReceived(this, data);
                            }
                        }
                    }
                    else
                    {
                        Console.WriteLine("err");
                    }
                }
                socket.ReceiveAsync(e);
            }
            private object _sendSyncLockObject = new object();
            private void SendDataSync(byte[] data)
            {
                lock (_sendSyncLockObject)
                {
                    _socket.Send(data);
                }
            }
            public void SendDataAsyncQueue(byte[] data)
            {
                _sendAsyncQueue.Enqueue(data);
            }
            private void SendAsyncQueue_OnDequeue(byte[] element)
            {
                SendDataSync(element);
            }
            private void ReceivedQueue_OnDequeue(byte[] element)
            {
                if (OnAsyncQueueDataPackReceived != null)
                {
                    OnAsyncQueueDataPackReceived(this, element);
                }
            }
        }
    }
    namespace Test.Share
    {
        using System;
        using System.Xml.Serialization;
        using System.Collections.Generic;
        using Microshaoft;
        [XmlRoot("XmlObjectMessage")]
        [Serializable]
        public class XmlObjectMessage
        {
            [XmlAttribute("From")]
            public string From;
            [XmlArrayItem("To", typeof(string))]
            [XmlArray("ToList")]
            public string[] ToList;
            [XmlElement("Text", typeof(CDATA))]
            public CDATA Text;
        }
        [XmlRoot("PublishSubscribeData")]
        [Serializable]
        public class PublishSubscribeData
        {
            [XmlArrayItem("Publisher", typeof(Publisher))]
            [XmlArray("Publishers")]
            public List<Publisher> Publishers = new List<Publisher>();
        }
        [Serializable]
        public class Publisher
        {
            [XmlAttribute("Name")]
            public string Name;
            [XmlArrayItem("Subscriber", typeof(Subscriber))]
            [XmlArray("Subscribers")]
            public List<Subscriber> Subscribers = new List<Subscriber>();
        }
        [Serializable]
        public class Subscriber
        {
            [XmlAttribute("Name")]
            public string Name;
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.IO;
        using System.Text;
        using System.Xml;
        using System.Xml.Serialization;
        using System.Runtime.Serialization;
        using System.Runtime.Serialization.Json;
        using System.Runtime.Serialization.Formatters.Binary;
        using System.Runtime.Serialization.Formatters.Soap;
        public static class SerializerHelper
        {
            public static T XmlSerializerXmlToObject<T>(string Xml)
            {
                XmlSerializer serializer = new XmlSerializer(typeof(T));
                T Object = XmlSerializerXmlToObject<T>(Xml, serializer);
                return Object;
            }
            public static T XmlSerializerXmlToObject<T>(string Xml, XmlSerializer serializer)
            {
                StringReader stringReader = new StringReader(Xml);
                XmlReader xmlReader = XmlReader.Create(stringReader);
                return (T) serializer.Deserialize(xmlReader);
            }
            public static string XmlSerializerObjectToXml<T>
                                        (
                                            T Object
                                            , XmlTextWriter writer
                                            , XmlSerializer serializer
                                        )
            {
                serializer.Serialize(writer, Object);
                MemoryStream stream = writer.BaseStream as MemoryStream;
                byte[] bytes = stream.ToArray();
                Encoding e = EncodingHelper.IdentifyEncoding
                                                (
                                                    bytes
                                                    , Encoding.GetEncoding("gb2312")
                    ///                                                , new Encoding[]
                    ///                                                        {
                    ///                                                            Encoding.UTF8
                    ///                                                            , Encoding.Unicode
                    ///                                                        }
                                                );
                byte[] buffer = e.GetPreamble();
                int offset = buffer.Length;
                buffer = new byte[bytes.Length - offset];
                Buffer.BlockCopy(bytes, offset, buffer, 0, buffer.Length);
                string s = e.GetString(buffer);
                return s;
            }
            public static string XmlSerializerObjectToXml<T>(T Object, XmlSerializer serializer)
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    Encoding e = Encoding.UTF8;
                    XmlTextWriter writer = new XmlTextWriter(stream, e);
                    string s = XmlSerializerObjectToXml<T>
                                        (
                                            Object
                                            , writer
                                            , serializer
                                        );
                    writer.Close();
                    writer = null;
                    return s;
                }
            }
            public static string XmlSerializerObjectToXml<T>(T Object, Encoding e, XmlSerializer serializer)
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    XmlTextWriter writer = new XmlTextWriter(stream, e);
                    string s = XmlSerializerObjectToXml<T>
                                        (
                                            Object
                                            , writer
                                            , serializer
                                        );
                    writer.Close();
                    writer = null;
                    return s;
                }
            }
            public static string XmlSerializerObjectToXml<T>(T Object, Encoding e)
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    XmlSerializer serializer = new XmlSerializer(typeof(T));
                    XmlTextWriter writer = new XmlTextWriter(stream, e);
                    string s = XmlSerializerObjectToXml<T>
                                        (
                                            Object
                                            , writer
                                            , serializer
                                        );
                    writer.Close();
                    writer = null;
                    return s;
                }
            }
            public static string DataContractSerializerObjectToXml<T>(T Object, DataContractSerializer serializer)
            {
                MemoryStream ms = new MemoryStream();
                serializer.WriteObject(ms, Object);
                byte[] buffer = StreamDataHelper.ReadDataToBytes(ms);
                string xml = Encoding.UTF8.GetString(buffer);
                ms.Close();
                ms.Dispose();
                ms = null;
                return xml;
            }
            public static string DataContractSerializerObjectToXml<T>(T Object)
            {
                DataContractSerializer serializer = new DataContractSerializer(typeof(T));
                string xml = DataContractSerializerObjectToXml<T>(Object, serializer);
                return xml;
            }
            public static T DataContractSerializerXmlToObject<T>(string Xml, DataContractSerializer serializer)
            {
                byte[] buffer = Encoding.UTF8.GetBytes(Xml);
                MemoryStream ms = new MemoryStream(buffer);
                //ms.Position = 0;
                T Object = (T)serializer.ReadObject(ms);
                ms.Close();
                ms.Dispose();
                ms = null;
                return Object;
            }
            public static T DataContractSerializerXmlToObject<T>(string Xml)
            {
                DataContractSerializer serializer = new DataContractSerializer(typeof(T));
                byte[] buffer = Encoding.UTF8.GetBytes(Xml);
                MemoryStream ms = new MemoryStream(buffer);
                //ms.Position = 0;
                T Object = (T) serializer.ReadObject(ms);
                ms.Close();
                ms.Dispose();
                ms = null;
                return Object;
            }
            public static string FormatterObjectToSoap<T>
                                 (
                                     T Object
                                 )
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    SoapFormatter formatter = new SoapFormatter();
                    formatter.Serialize(stream, Object);
                    string soap = Encoding.UTF8.GetString(stream.GetBuffer());
                    return soap;
                }
            }
            public static T FormatterSoapToObject<T>
                                        (
                                            string soap
                                        )
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    SoapFormatter formater = new SoapFormatter();
                    byte[] data = Encoding.UTF8.GetBytes(soap);
                    stream.Write(data, 0, data.Length);
                    stream.Position = 0;
                    T Object = (T) formater.Deserialize(stream);
                    return Object;
                }
            }
            public static byte[] FormatterObjectToBinary<T>
                                        (
                                            T Object
                                        )
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    BinaryFormatter formater = new BinaryFormatter();
                    formater.Serialize(stream, Object);
                    byte[] buffer = stream.ToArray();
                    return buffer;
                }
            }
            public static T FormatterBinaryToObject<T>
                                        (
                                            byte[] data
                                        )
            {
                using (MemoryStream stream = new MemoryStream())
                {
                    BinaryFormatter formater = new BinaryFormatter();
                    stream.Write(data, 0, data.Length);
                    stream.Position = 0;
                    T Object = (T)formater.Deserialize(stream);
                    return Object;
                }
            }
            public static string DataContractSerializerObjectToJson<T>(T Object)
            {
                DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
                string json = DataContractSerializerObjectToJson<T>(Object);
                return json;
            }
            public static string DataContractSerializerObjectToJson<T>(T Object, DataContractJsonSerializer serializer)
            {
                MemoryStream ms = new MemoryStream();
                serializer.WriteObject(ms, Object);
                string json = Encoding.UTF8.GetString(ms.GetBuffer());
                ms.Close();
                ms.Dispose();
                ms = null;
                return json;
            }
            public static T DataContractSerializerJsonToObject<T>(string json)
            {
                DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(T));
                T Object = DataContractSerializerJsonToObject<T>(json, serializer);
                return Object;
            }
            public static T DataContractSerializerJsonToObject<T>(string json, DataContractJsonSerializer serializer)
            {
                MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(json));
                T Object = (T) serializer.ReadObject(ms);
                ms.Close();
                ms.Dispose();
                ms = null;
                return Object;
            }
          }
    }
    namespace Microshaoft
    {
        using System.IO;
        using System.Text;
        using System.Collections.Generic;
        public static class EncodingHelper
        {
            public static Encoding IdentifyEncoding
                                        (
                                            Stream stream
                                            , Encoding defaultEncoding
                                            , Encoding[] identifyEncodings
                                        )
            {
                byte[] data = StreamDataHelper.ReadDataToBytes(stream);
                return IdentifyEncoding
                            (
                                data
                                , defaultEncoding
                                , identifyEncodings
                            );
            }
            public static Encoding IdentifyEncoding
                                        (
                                            Stream stream
                                            , Encoding defaultEncoding
                                        )
            {
                byte[] data = StreamDataHelper.ReadDataToBytes(stream);
                return IdentifyEncoding
                            (
                                data
                                , defaultEncoding
                            );
            }
            public static Encoding IdentifyEncoding
                                        (
                                            byte[] data
                                            , Encoding defaultEncoding
                                        )
            {
                EncodingInfo[] encodingInfos = Encoding.GetEncodings();
                List<Encoding> list = new List<Encoding>();
                foreach (EncodingInfo info in encodingInfos)
                {
                    Encoding e = info.GetEncoding();
                    if (e.GetPreamble().Length > 0)
                    {
                        list.Add(e);
                        //System.Console.WriteLine(e.EncodingName);
                    }
                }
                Encoding[] encodings = new Encoding[list.Count];
                list.CopyTo(encodings);
                return IdentifyEncoding
                            (
                                data
                                , defaultEncoding
                                , encodings
                            );
            }
            public static Encoding IdentifyEncoding
                                        (
                                            byte[] data
                                            , Encoding defaultEncoding
                                            , Encoding[] identifyEncodings
                                        )
            {
                Encoding encoding = defaultEncoding;
                foreach (Encoding e in identifyEncodings)
                {
                    byte[] buffer = e.GetPreamble();
                    int l = buffer.Length;
                    if (l == 0)
                    {
                        continue;
                    }
                    bool flag = false;
                    for (int i = 0; i < l; i++)
                    {
                        if (buffer[i] != data[i])
                        {
                            flag = true;
                            break;
                        }
                    }
                    if (flag)
                    {
                        continue;
                    }
                    else
                    {
                        encoding = e;
                    }
                }
                return encoding;
            }
        }
    }
    namespace Microshaoft
    {
        using System.IO;
        public static class StreamDataHelper
        {
            public static byte[] ReadDataToBytes(Stream stream)
            {
                byte[] buffer = new byte[64 * 1024];
                MemoryStream ms = new MemoryStream();
                int r = 0;
                int l = 0;
                long position = -1;
                if (stream.CanSeek)
                {
                    position = stream.Position;
                    stream.Position = 0;
                }
                while (true)
                {
                    r = stream.Read(buffer, 0, buffer.Length);
                    if (r > 0)
                    {
                        l += r;
                        ms.Write(buffer, 0, r);
                    }
                    else
                    {
                        break;
                    }
                }
                byte[] bytes = new byte[l];
                ms.Position = 0;
                ms.Read(bytes, 0, (int)l);
                ms.Close();
                ms.Dispose();
                ms = null;
                if (position >= 0)
                {
                    stream.Position = position;
                }
                return bytes;
            }
        }
    }
    //===========================================================================================
    //===========================================================================================
    // Server.cs
    // csc.exe Server.cs /r:Share.dll
    namespace Server
    {
        using System;
        using System.Text;
        using System.Net;
        using System.Net.Sockets;
        using System.Linq;
        using System.IO;
        using System.Collections.Concurrent;
        using System.Collections.Generic;
        using System.Threading;
        using Microshaoft;
        using Test.Share;
        public class SyncSocketAsyncQueueHandlerServer
        {
            public static void StartListening()
            {
                Console.Title = "Server";
                IPAddress ipAddress;
                IPAddress.TryParse("127.0.0.1", out ipAddress);
                IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 12021);
                Socket listener = new Socket
                                        (
                                            AddressFamily.InterNetwork
                                            , SocketType.Stream
                                            , ProtocolType.Tcp
                                        );
                listener.Bind(localEndPoint);
                listener.Listen(5);
                Console.WriteLine("Listening ...");
                accept_Async(listener);
            }
            private static void accept_Async(Socket listener)
            {
                var acceptSocketAsyncEventArgs = new SocketAsyncEventArgs();
                acceptSocketAsyncEventArgs.Completed += accept_OnCompleted;
                listener.AcceptAsync(acceptSocketAsyncEventArgs);
            }
            private static int _i = 0;
            static void accept_OnCompleted(object sender, SocketAsyncEventArgs e)
            {
                e.Completed -= accept_OnCompleted;
                Socket listener = sender as Socket;
                accept_Async(listener);
                Socket client = e.AcceptSocket;
                SocketAsyncDataHandler<int> handler = new SocketAsyncDataHandler<int>(client, _i++, 1024);
                handler.OnAsyncQueueDataPackReceived += new SocketAsyncDataHandler<int>.DataPackReceivedEventHandler(DataPackAsyncReceivedProcess);
                handler.StartReceiveData(10);
            }
            public static void DataPackAsyncReceivedProcess(SocketAsyncDataHandler<int> sender, byte[] data)
            {
                string s = Encoding.UTF8.GetString(data, sender.HeaderBytesLength, data.Length - sender.HeaderBytesLength);
                Console.WriteLine
                                (
                                    "received: {1}{0}{2}{0}{3}{0}[{4}]"
                                    , "\n"
                                    , sender.RemoteIPAddress.ToString()
                                    , sender.SocketID
                                    , data.Length
                                    , s
                                );
                XmlObjectMessage message = SerializerHelper.XmlSerializerXmlToObject<XmlObjectMessage>(s);
                SessionsManager.SendDataTo
                                        (
                                            sender
                                            , message.From
                                            , message.ToList
                                            , data
                                        );
            }
            public static int Main(String[] args)
            {
                //PublishSubscribeData xx = new PublishSubscribeData();
                //Publisher yy = new Publisher();
                //yy.Name = "asdasd";
                //yy.Subscribers.Add(new Subscriber() { Name = "zzz" });
                //xx.Publishers.Add(yy);
                //string sss = SerializerHelper.ObjectToXml<PublishSubscribeData>(xx, Encoding.UTF8);
                //Console.WriteLine(sss);
                StreamReader sr = File.OpenText(@"PublishSubscribeData.xml");
                string xml = sr.ReadToEnd();
                PublishSubscribeData data = SerializerHelper.XmlSerializerXmlToObject<PublishSubscribeData>(xml);
                var pubsub = new ConcurrentDictionary<string, List<string>>();
                data.Publishers.ForEach
                                    (
                                        entry =>
                                            {
                                                string publisher = entry.Name;
                                                List<string> subscribers = new List<string>();
                                                entry.Subscribers.ForEach
                                                                    (
                                                                        x =>
                                                                        {
                                                                            subscribers.Add(x.Name);
                                                                        }
                                                                    );
                                                pubsub.AddOrUpdate
                                                                    (
                                                                        publisher
                                                                        , subscribers
                                                                        , (k, v) => subscribers
                                                                    );
                                            }
                                    );
                SessionsManager.Subscriptions = pubsub;
                StartListening();
                Console.ReadLine();
                return 0;
            }
        }
    }
    namespace Server
    {
        using System;
        using System.Threading;
        using System.Threading.Tasks;
        using System.Collections.Generic;
        using System.Collections.Concurrent;
        using Microshaoft;
        public static class SessionsManager
        {
            private static ConcurrentDictionary<string, List<string>> _subscriptions = new ConcurrentDictionary<string, List<string>>();
            public static ConcurrentDictionary<string, List<string>> Subscriptions
            {
                get
                {
                    return _subscriptions;
                }
                set
                {
                    Interlocked.Exchange<ConcurrentDictionary<string, List<string>>>(ref _subscriptions, value);
                }
            }
            private static ConcurrentDictionary<string, SocketAsyncDataHandler<int>> _connections = new ConcurrentDictionary<string, SocketAsyncDataHandler<int>>();
            public static ConcurrentDictionary<string, List<string>> Connections
            {
                get
                {
                    return _subscriptions;
                }
            }
            public static void SendDataTo
                                        (
                                            SocketAsyncDataHandler<int> sender
                                            , string from
                                            , string[] toList
                                            , byte[] data
                                        )
            {
                _connections.GetOrAdd
                                (
                                    from
                                    , key => sender
                                );
                List<string> tos;
                if (SessionsManager.Subscriptions.TryGetValue(from, out tos))
                {
                    Parallel.ForEach
                                (
                                    tos
                                    , to =>
                                        {
                                            SocketAsyncDataHandler<int> toHandler;
                                            if (_connections.TryGetValue(to, out toHandler))
                                            {
                                                Console.WriteLine
                                                                (
                                                                    "send: {1}{0}{2}{0}{3}"
                                                                    , "\n"
                                                                    , toHandler.RemoteIPAddress.ToString()
                                                                    , toHandler.SocketID
                                                                    , data.Length
                                                                );
                                                toHandler.SendDataAsyncQueue(data);
                                            }
                                        }
                                );
                }
            }
        }
    }
    //===========================================================================================
    //===========================================================================================
    // Client.cs
    // csc.exe Client.cs /r:Share.dll
    namespace Client
    {
        using System;
        using System.Collections.Generic;
        using System.Linq;
        using System.Text;
        using System.Net.Sockets;
        using Microshaoft;
        using Test.Share;
        class Program
        {
            static void Main(string[] args)
            {
                Socket socket = new Socket
                                        (
                                            AddressFamily.InterNetwork
                                            , SocketType.Stream
                                            , ProtocolType.Tcp
                                        );
                socket.Connect("127.0.0.1", 12021);
                SocketAsyncDataHandler<int> handler = new SocketAsyncDataHandler<int>(socket, 1, 512);
                handler.OnSyncDataPackReceived += new SocketAsyncDataHandler<int>.DataPackReceivedEventHandler(handler_OnSyncDataPackReceived);
                handler.StartReceiveData(10);
                Console.WriteLine("please input your name:");
                string from = Console.ReadLine();
                Console.Title = "Client: " + from;
                string s = string.Empty;
                string text = string.Empty;
                string[] to = null;
                while ((s = Console.ReadLine()) != "q")
                {
                    if (s.StartsWith("to:"))
                    {
                        s = s.Substring(3); 
                        to = s.Split(new char[] {  ';' });
                        Console.WriteLine("\"to\" are ready, please input your text");
                    }
                    else if (to == null)
                    {
                        Console.WriteLine("please input \"to\" like: \n\"to:user1;user2;user3 \"");
                    }
                    else
                    {
                        text = s;
                        XmlObjectMessage message = new XmlObjectMessage();
                        message.ToList = to;
                        message.From = from;
                        message.Text = new CDATA(text);
                        text = SerializerHelper.XmlSerializerObjectToXml<XmlObjectMessage>
                                                                (
                                                                    message
                                                                    , Encoding.UTF8
                                                                );
                        byte[] bodyData = Encoding.UTF8.GetBytes(text);
                        byte[] headerData = new byte[handler.HeaderBytesLength];
                        BitConverter.GetBytes(bodyData.Length).CopyTo(headerData, 0);
                        socket.Send(headerData);
                        socket.Send(bodyData);
                        Console.WriteLine("Sended!");
                    }
                }
            }
            static void handler_OnSyncDataPackReceived(SocketAsyncDataHandler<int> sender, byte[] data)
            {
                string s = Encoding.UTF8.GetString
                                            (
                                                data
                                                , sender.HeaderBytesLength
                                                , data.Length - sender.HeaderBytesLength
                                            );
                Console.WriteLine
                            (
                                "received: {0},{1},[{2}]"
                                , sender.SocketID
                                , data.Length
                                , s
                            );
            }
        }
    }
    //===========================================================================================
    //===========================================================================================
    /*
    PublishSubscribeData.xml
    <?xml version="1.0" encoding="utf-8"?>
    <PublishSubscribeData xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <Publishers>
            <Publisher Name="yxy0">
                <Subscribers>
                    <Subscriber Name="yxy1" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
            <Publisher Name="yxy1">
                <Subscribers>
                    <Subscriber Name="yxy0" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
            <Publisher Name="yxy2">
                <Subscribers>
                    <Subscriber Name="yxy3" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
            <Publisher Name="yxy3">
                <Subscribers>
                    <Subscriber Name="yxy2" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
            <Publisher Name="yxy4">
                <Subscribers>
                    <Subscriber Name="yxy4" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
            <Publisher Name="yxy5">
                <Subscribers>
                    <Subscriber Name="yxy4" />
                    <Subscriber Name="yxy" />
                </Subscribers>
            </Publisher>
        </Publishers>
    </PublishSubscribeData>
    */
    
    
  • 相关阅读:
    搭建es7.5的配置文件
    kafka的暂停消费和重新开始消费问题
    hive sparksession查询只显示defalt库问题
    flink widow&window funcion&水印
    flink支持的数据类型讲解(可序列化) 和 内置累加器的运用
    mysql tar安装模式
    Permission denied: user=root, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
    错误Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream排查思路
    SPSS非参数检验
    SPSS回归分析
  • 原文地址:https://www.cnblogs.com/Microshaoft/p/2365369.html
Copyright © 2011-2022 走看看