zoukankan      html  css  js  c++  java
  • SupperSocket深入浅出

    这篇文章出要是SuperSocket底层如何接收数据

    Process(ArraySegment<byte> segment) 获取加载数据(直到数据全部接收后返回)
    namespace SuperSocket.ProtoBase
    {
        /// <summary>
        /// The default pipeline processor
        /// </summary>
        /// <typeparam name="TPackageInfo">The type of the package info.</typeparam>
        public class DefaultPipelineProcessor<TPackageInfo> : IPipelineProcessor
            where TPackageInfo : IPackageInfo
        {
            private IReceiveFilter<TPackageInfo> m_ReceiveFilter;
    
            private IReceiveFilter<TPackageInfo> m_FirstReceiveFilter;
    
            private BufferList m_ReceiveCache;
    
            private int m_MaxPackageLength;
    
            /// <summary>
            /// Initializes a new instance of the <see cref="DefaultPipelineProcessor{TPackageInfo}"/> class.
            /// </summary>
            /// <param name="receiveFilter">The initializing receive filter.</param>
            /// <param name="maxPackageLength">The max package size.</param>
            public DefaultPipelineProcessor(IReceiveFilter<TPackageInfo> receiveFilter, int maxPackageLength = 0)
            {
                m_FirstReceiveFilter = m_ReceiveFilter = receiveFilter;
                m_ReceiveCache = new BufferList();
                m_MaxPackageLength = maxPackageLength;
            }
    
            private void PushResetData(ArraySegment<byte> raw, int rest)
            {
                var segment = new ArraySegment<byte>(raw.Array, raw.Offset + raw.Count - rest, rest);
                m_ReceiveCache.Add(segment);
            }
    
            private IList<IPackageInfo> GetNotNullOne(IList<IPackageInfo> left, IList<IPackageInfo> right)
            {
                if (left != null)
                    return left;
    
                return right;
            }
    
    
            /// <summary>
            /// Processes the input segment.
            /// </summary>
            /// <param name="segment">The input segment.</param>
            /// <returns>
            /// the processing result
            /// </returns>
            public virtual ProcessResult Process(ArraySegment<byte> segment)
            {
                var receiveCache = m_ReceiveCache;
    
                receiveCache.Add(segment);
    
                var rest = 0;
    
                var currentReceiveFilter = m_ReceiveFilter;
    
                SingleItemList<IPackageInfo> singlePackage = null;
    
                List<IPackageInfo> packageList = null;
    
                while (true)
                {
                    var lastItemLength = receiveCache.Last.Count;
                    var packageInfo = currentReceiveFilter.Filter(receiveCache, out rest);
    
                    if (currentReceiveFilter.State == FilterState.Error)
                    {
                        return ProcessResult.Create(ProcessState.Error);
                    }
                    //最大缓存空间
                    if (m_MaxPackageLength > 0)
                    {
                        var length = receiveCache.Total;
    
                        if (length > m_MaxPackageLength)
                        {
                            return ProcessResult.Create(ProcessState.Error, string.Format("Max package length: {0}, current processed length: {1}", m_MaxPackageLength, length));
                        }
                    }
    
    
                    var nextReceiveFilter = currentReceiveFilter.NextReceiveFilter;
    
                    // don't reset the filter if no request is resolved
                    if(packageInfo != null)
                        currentReceiveFilter.Reset();
    
                    if (nextReceiveFilter != null)
                    {
                        currentReceiveFilter = nextReceiveFilter;
                        m_ReceiveFilter = currentReceiveFilter;
                    }                    
    
                    // continue receive
                    if (packageInfo == null)
                    {
                        if (rest > 0)
                        {
                            var last = receiveCache.Last;
                            
                            if(rest != lastItemLength)
                            {
                                PushResetData(segment, rest);
                            }
                            
                            continue;
                        }
    
                        return ProcessResult.Create(ProcessState.Cached, GetNotNullOne(packageList, singlePackage));
                    }
    
                    if (packageList != null)
                    {
                        packageList.Add(packageInfo);
                    }
                    else if (singlePackage == null)
                        singlePackage = new SingleItemList<IPackageInfo>(packageInfo);
                    else
                    {
                        if (packageList == null)
                            packageList = new List<IPackageInfo>();
    
                        packageList.Add(singlePackage[0]);
                        packageList.Add(packageInfo);
                        singlePackage = null;
                    }
    
                    if (packageInfo is IBufferedPackageInfo // is a buffered package
                            && (packageInfo as IBufferedPackageInfo).Data is BufferList) // and it uses receive buffer directly
                    {
                        // so we need to create a new receive buffer container to use
                        m_ReceiveCache = receiveCache = new BufferList();
    
                        if (rest <= 0)
                        {
                            return ProcessResult.Create(ProcessState.Cached, GetNotNullOne(packageList, singlePackage));
                        }
                    }
                    else
                    {
                        m_ReceiveCache.Clear();
    
                        if (rest <= 0)
                        {
                            return ProcessResult.Create(ProcessState.Completed, GetNotNullOne(packageList, singlePackage));
                        }
                    }
    
                    PushResetData(segment, rest);
                }
            }
    
    
            /// <summary>
            /// cleanup the cached the buffer by resolving them into one package at the end of the piple line
            /// </summary>
            /// <returns>return the processing result</returns>
            public void Reset()
            {
                m_ReceiveCache.Clear();
                m_FirstReceiveFilter.Reset();
    
                if (m_ReceiveFilter != m_FirstReceiveFilter)
                    m_ReceiveFilter = m_FirstReceiveFilter;
            }
    
    
            /// <summary>
            /// Gets the received cache.
            /// </summary>
            /// <value>
            /// The cache.
            /// </value>
            public BufferList Cache
            {
                get { return m_ReceiveCache; }
            }
        }
    }
  • 相关阅读:
    Noip2015总结
    BZOJ2457 BeiJing2011 双端队列
    Noip模拟考第三题——饥饿游戏
    HDU 2196 求树上所有点能到达的最远距离
    O(V*n)的多重背包问题
    Noip2008双栈排序
    USACO 4.1.2 栅栏的木料
    字符串专题
    网络流24题刷题记录
    解模线性方程组 非互质中国剩余定理
  • 原文地址:https://www.cnblogs.com/qq247039968/p/7841088.html
Copyright © 2011-2022 走看看