zoukankan      html  css  js  c++  java
  • asp.net core 3.1 源码分析之KestrelServer

    public class KestrelServer : IServer
        {
            private readonly List<(IConnectionListener, Task)> _transports = new List<(IConnectionListener, Task)>();
            private readonly IServerAddressesFeature _serverAddresses;
            private readonly IConnectionListenerFactory _transportFactory;
    
            private bool _hasStarted;
            private int _stopping;
            private readonly TaskCompletionSource<object> _stoppedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
    
            public KestrelServer(
                IOptions<KestrelServerOptions> options,
                IConnectionListenerFactory transportFactory,
                ILoggerFactory loggerFactory)
                : this(transportFactory, CreateServiceContext(options, loggerFactory))
            {
            }
    
            // For testing
            internal KestrelServer(IConnectionListenerFactory transportFactory, ServiceContext serviceContext)
            {
                if (transportFactory == null)
                {
                    throw new ArgumentNullException(nameof(transportFactory));
                }
    
                _transportFactory = transportFactory;
                ServiceContext = serviceContext;
    
                Features = new FeatureCollection();
                _serverAddresses = new ServerAddressesFeature();
                Features.Set(_serverAddresses);
    
                HttpCharacters.Initialize();
            }
    
            private static ServiceContext CreateServiceContext(IOptions<KestrelServerOptions> options, ILoggerFactory loggerFactory)
            {
                if (options == null)
                {
                    throw new ArgumentNullException(nameof(options));
                }
                if (loggerFactory == null)
                {
                    throw new ArgumentNullException(nameof(loggerFactory));
                }
    
                var serverOptions = options.Value ?? new KestrelServerOptions();
                var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel");
                var trace = new KestrelTrace(logger);
                var connectionManager = new ConnectionManager(
                    trace,
                    serverOptions.Limits.MaxConcurrentUpgradedConnections);
    
                var heartbeatManager = new HeartbeatManager(connectionManager);
                var dateHeaderValueManager = new DateHeaderValueManager();
                var heartbeat = new Heartbeat(
                    new IHeartbeatHandler[] { dateHeaderValueManager, heartbeatManager },
                    new SystemClock(),
                    DebuggerWrapper.Singleton,
                    trace);
    
                return new ServiceContext
                {
                    Log = trace,
                    HttpParser = new HttpParser<Http1ParsingHandler>(trace.IsEnabled(LogLevel.Information)),
                    Scheduler = PipeScheduler.ThreadPool,
                    SystemClock = heartbeatManager,
                    DateHeaderValueManager = dateHeaderValueManager,
                    ConnectionManager = connectionManager,
                    Heartbeat = heartbeat,
                    ServerOptions = serverOptions,
                };
            }
    
            public IFeatureCollection Features { get; }
    
            public KestrelServerOptions Options => ServiceContext.ServerOptions;
    
            private ServiceContext ServiceContext { get; }
    
            private IKestrelTrace Trace => ServiceContext.Log;
    
            private ConnectionManager ConnectionManager => ServiceContext.ConnectionManager;
    
            public async Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken)
            {
                try
                {
                    if (!BitConverter.IsLittleEndian)
                    {
                        throw new PlatformNotSupportedException(CoreStrings.BigEndianNotSupported);
                    }
    
                    ValidateOptions();
    
                    if (_hasStarted)
                    {
                        // The server has already started and/or has not been cleaned up yet
                        throw new InvalidOperationException(CoreStrings.ServerAlreadyStarted);
                    }
                    _hasStarted = true;
    
                    ServiceContext.Heartbeat?.Start();
    
                    async Task OnBind(ListenOptions options)
                    {
                        // Add the HTTP middleware as the terminal connection middleware
                        options.UseHttpServer(ServiceContext, application, options.Protocols);
    
                        var connectionDelegate = options.Build();
    
                        // Add the connection limit middleware
                        if (Options.Limits.MaxConcurrentConnections.HasValue)
                        {
                            connectionDelegate = new ConnectionLimitMiddleware(connectionDelegate, Options.Limits.MaxConcurrentConnections.Value, Trace).OnConnectionAsync;
                        }
    
                        var connectionDispatcher = new ConnectionDispatcher(ServiceContext, connectionDelegate);
                        var transport = await _transportFactory.BindAsync(options.EndPoint).ConfigureAwait(false);
    
                        // Update the endpoint
                        options.EndPoint = transport.EndPoint;
                        var acceptLoopTask = connectionDispatcher.StartAcceptingConnections(transport);
    
                        _transports.Add((transport, acceptLoopTask));
                    }
    
                    await AddressBinder.BindAsync(_serverAddresses, Options, Trace, OnBind).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    Trace.LogCritical(0, ex, "Unable to start Kestrel.");
                    Dispose();
                    throw;
                }
            }
    
            // Graceful shutdown if possible
            public async Task StopAsync(CancellationToken cancellationToken)
            {
                if (Interlocked.Exchange(ref _stopping, 1) == 1)
                {
                    await _stoppedTcs.Task.ConfigureAwait(false);
                    return;
                }
    
                try
                {
                    var tasks = new Task[_transports.Count];
                    for (int i = 0; i < _transports.Count; i++)
                    {
                        (IConnectionListener listener, Task acceptLoop) = _transports[i];
                        tasks[i] = Task.WhenAll(listener.UnbindAsync(cancellationToken).AsTask(), acceptLoop);
                    }
    
                    await Task.WhenAll(tasks).ConfigureAwait(false);
    
                    if (!await ConnectionManager.CloseAllConnectionsAsync(cancellationToken).ConfigureAwait(false))
                    {
                        Trace.NotAllConnectionsClosedGracefully();
    
                        if (!await ConnectionManager.AbortAllConnectionsAsync().ConfigureAwait(false))
                        {
                            Trace.NotAllConnectionsAborted();
                        }
                    }
    
                    for (int i = 0; i < _transports.Count; i++)
                    {
                        (IConnectionListener listener, Task acceptLoop) = _transports[i];
                        tasks[i] = listener.DisposeAsync().AsTask();
                    }
    
                    await Task.WhenAll(tasks).ConfigureAwait(false);
    
                    ServiceContext.Heartbeat?.Dispose();
                }
                catch (Exception ex)
                {
                    _stoppedTcs.TrySetException(ex);
                    throw;
                }
    
                _stoppedTcs.TrySetResult(null);
            }
    
            // Ungraceful shutdown
            public void Dispose()
            {
                var cancelledTokenSource = new CancellationTokenSource();
                cancelledTokenSource.Cancel();
                StopAsync(cancelledTokenSource.Token).GetAwaiter().GetResult();
            }
    
            private void ValidateOptions()
            {
                Options.ConfigurationLoader?.Load();
    
                if (Options.Limits.MaxRequestBufferSize.HasValue &&
                    Options.Limits.MaxRequestBufferSize < Options.Limits.MaxRequestLineSize)
                {
                    throw new InvalidOperationException(
                        CoreStrings.FormatMaxRequestBufferSmallerThanRequestLineBuffer(Options.Limits.MaxRequestBufferSize.Value, Options.Limits.MaxRequestLineSize));
                }
    
                if (Options.Limits.MaxRequestBufferSize.HasValue &&
                    Options.Limits.MaxRequestBufferSize < Options.Limits.MaxRequestHeadersTotalSize)
                {
                    throw new InvalidOperationException(
                        CoreStrings.FormatMaxRequestBufferSmallerThanRequestHeaderBuffer(Options.Limits.MaxRequestBufferSize.Value, Options.Limits.MaxRequestHeadersTotalSize));
                }
            }
        }

    KestrelServer类本身的代码并不多

    主要看下StartAsync核心方法,内部有个OnBind方法

    async Task OnBind(ListenOptions options)
                    {
                        // Add the HTTP middleware as the terminal connection middleware
                        options.UseHttpServer(ServiceContext, application, options.Protocols);
    
                        var connectionDelegate = options.Build();
    
                        // Add the connection limit middleware
                        if (Options.Limits.MaxConcurrentConnections.HasValue)
                        {
                            connectionDelegate = new ConnectionLimitMiddleware(connectionDelegate, Options.Limits.MaxConcurrentConnections.Value, Trace).OnConnectionAsync;
                        }
    
                        var connectionDispatcher = new ConnectionDispatcher(ServiceContext, connectionDelegate);
                        var transport = await _transportFactory.BindAsync(options.EndPoint).ConfigureAwait(false);
    
                        // Update the endpoint
                        options.EndPoint = transport.EndPoint;
                        var acceptLoopTask = connectionDispatcher.StartAcceptingConnections(transport);
    
                        _transports.Add((transport, acceptLoopTask));
                    }

    看下ListenOptions参数

    internal static class HttpConnectionBuilderExtensions
        {
            public static IConnectionBuilder UseHttpServer<TContext>(
                this IConnectionBuilder builder,
                ServiceContext serviceContext,
                IHttpApplication<TContext> application,
                HttpProtocols protocols)
            {
                var middleware = new HttpConnectionMiddleware<TContext>(serviceContext, application, protocols);
                return builder.Use(next =>
                {
                    return middleware.OnConnectionAsync;
                });
            }
        }
    public interface IConnectionBuilder
        {
            /// <summary>
            /// Gets the <see cref="IServiceProvider"/> that provides access to the application's service container.
            /// </summary>
            IServiceProvider ApplicationServices { get; }
    
            /// <summary>
            /// Adds a middleware delegate to the application's connection pipeline.
            /// </summary>
            /// <param name="middleware">The middleware delegate.</param>
            /// <returns>The <see cref="IConnectionBuilder"/>.</returns>
            IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware);
    
            /// <summary>
            /// Builds the delegate used by this application to process connections.
            /// </summary>
            /// <returns>The connection handling delegate.</returns>
            ConnectionDelegate Build();
        }
    public class ConnectionBuilder : IConnectionBuilder
        {
            private readonly IList<Func<ConnectionDelegate, ConnectionDelegate>> _components = new List<Func<ConnectionDelegate, ConnectionDelegate>>();
    
            public IServiceProvider ApplicationServices { get; }
    
            public ConnectionBuilder(IServiceProvider applicationServices)
            {
                ApplicationServices = applicationServices;
            }
    
            public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware)
            {
                _components.Add(middleware);
                return this;
            }
    
            public ConnectionDelegate Build()
            {
                ConnectionDelegate app = features =>
                {
                    return Task.CompletedTask;
                };
    
                foreach (var component in _components.Reverse())
                {
                    app = component(app);
                }
    
                return app;
            }
        }

    ConnectionBuilder构建一个处理http连接的委托链

    public delegate Task ConnectionDelegate(ConnectionContext connection);

    HttpConnectionMiddleware建立一个HttpConnection对象,调用ProcessRequestsAsync方法处理请求

    先看下服务器是如何监听http请求的

    public interface IConnectionListenerFactory
        {
            /// <summary>
            /// Creates an <see cref="IConnectionListener"/> bound to the specified <see cref="EndPoint"/>.
            /// </summary>
            /// <param name="endpoint">The <see cref="EndPoint" /> to bind to.</param>
            /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
            /// <returns>A <see cref="ValueTask{IConnectionListener}"/> that completes when the listener has been bound, yielding a <see cref="IConnectionListener" /> representing the new listener.</returns>
            ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, CancellationToken cancellationToken = default);
        }
    public sealed class SocketTransportFactory : IConnectionListenerFactory
        {
            private readonly SocketTransportOptions _options;
            private readonly SocketsTrace _trace;
    
            public SocketTransportFactory(
                IOptions<SocketTransportOptions> options,
                ILoggerFactory loggerFactory)
            {
                if (options == null)
                {
                    throw new ArgumentNullException(nameof(options));
                }
    
                if (loggerFactory == null)
                {
                    throw new ArgumentNullException(nameof(loggerFactory));
                }
    
                _options = options.Value;
                var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets");
                _trace = new SocketsTrace(logger);
            }
    
            public ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
            {
                var transport = new SocketConnectionListener(endpoint, _options, _trace);
                transport.Bind();
                return new ValueTask<IConnectionListener>(transport);
            }
        }
    internal sealed class SocketConnectionListener : IConnectionListener
        {
            private readonly MemoryPool<byte> _memoryPool;
            private readonly int _numSchedulers;
            private readonly PipeScheduler[] _schedulers;
            private readonly ISocketsTrace _trace;
            private Socket _listenSocket;
            private int _schedulerIndex;
            private readonly SocketTransportOptions _options;
    
            public EndPoint EndPoint { get; private set; }
    
            internal SocketConnectionListener(
                EndPoint endpoint,
                SocketTransportOptions options,
                ISocketsTrace trace)
            {
                EndPoint = endpoint;
                _trace = trace;
                _options = options;
                _memoryPool = _options.MemoryPoolFactory();
                var ioQueueCount = options.IOQueueCount;
    
                if (ioQueueCount > 0)
                {
                    _numSchedulers = ioQueueCount;
                    _schedulers = new IOQueue[_numSchedulers];
    
                    for (var i = 0; i < _numSchedulers; i++)
                    {
                        _schedulers[i] = new IOQueue();
                    }
                }
                else
                {
                    var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool };
                    _numSchedulers = directScheduler.Length;
                    _schedulers = directScheduler;
                }
            }
    
            internal void Bind()
            {
                if (_listenSocket != null)
                {
                    throw new InvalidOperationException(SocketsStrings.TransportAlreadyBound);
                }
    
                Socket listenSocket;
    
                // Unix domain sockets are unspecified
                var protocolType = EndPoint is UnixDomainSocketEndPoint ? ProtocolType.Unspecified : ProtocolType.Tcp;
    
                listenSocket = new Socket(EndPoint.AddressFamily, SocketType.Stream, protocolType);
    
                // Kestrel expects IPv6Any to bind to both IPv6 and IPv4
                if (EndPoint is IPEndPoint ip && ip.Address == IPAddress.IPv6Any)
                {
                    listenSocket.DualMode = true;
                }
    
                try
                {
                    listenSocket.Bind(EndPoint);
                }
                catch (SocketException e) when (e.SocketErrorCode == SocketError.AddressAlreadyInUse)
                {
                    throw new AddressInUseException(e.Message, e);
                }
    
                EndPoint = listenSocket.LocalEndPoint;
    
                listenSocket.Listen(512);
    
                _listenSocket = listenSocket;
            }
    
            public async ValueTask<ConnectionContext> AcceptAsync(CancellationToken cancellationToken = default)
            {
                while (true)
                {
                    try
                    {
                        var acceptSocket = await _listenSocket.AcceptAsync();
    
                        // Only apply no delay to Tcp based endpoints
                        if (acceptSocket.LocalEndPoint is IPEndPoint)
                        {
                            acceptSocket.NoDelay = _options.NoDelay;
                        }
    
                        var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[_schedulerIndex], _trace, _options.MaxReadBufferSize, _options.MaxWriteBufferSize);
    
                        connection.Start();
    
                        _schedulerIndex = (_schedulerIndex + 1) % _numSchedulers;
    
                        return connection;
                    }
                    catch (ObjectDisposedException)
                    {
                        // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
                        return null;
                    }
                    catch (SocketException e) when (e.SocketErrorCode == SocketError.OperationAborted)
                    {
                        // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
                        return null;
                    }
                    catch (SocketException)
                    {
                        // The connection got reset while it was in the backlog, so we try again.
                        _trace.ConnectionReset(connectionId: "(null)");
                    }
                }
            }
    
            public ValueTask UnbindAsync(CancellationToken cancellationToken = default)
            {
                _listenSocket?.Dispose();
                return default;
            }
    
            public ValueTask DisposeAsync()
            {
                _listenSocket?.Dispose();
                // Dispose the memory pool
                _memoryPool.Dispose();
                return default;
            }
        }

    SocketTransportFactory创建SocketConnectionListener,监听对应的端口

    并返回SocketConnection

     /// <summary>
        /// Encapsulates all information about an individual connection.
        /// </summary>
        public abstract class ConnectionContext : IAsyncDisposable
        {
            /// <summary>
            /// Gets or sets a unique identifier to represent this connection in trace logs.
            /// </summary>
            public abstract string ConnectionId { get; set; }
    
            /// <summary>
            /// Gets the collection of features provided by the server and middleware available on this connection.
            /// </summary>
            public abstract IFeatureCollection Features { get; }
    
            /// <summary>
            /// Gets or sets a key/value collection that can be used to share data within the scope of this connection.
            /// </summary>
            public abstract IDictionary<object, object> Items { get; set; }
    
            /// <summary>
            /// Gets or sets the <see cref="IDuplexPipe"/> that can be used to read or write data on this connection.
            /// </summary>
            public abstract IDuplexPipe Transport { get; set; }
    
            /// <summary>
            /// Triggered when the client connection is closed.
            /// </summary>
            public virtual CancellationToken ConnectionClosed { get; set; }
    
            /// <summary>
            /// Gets or sets the local endpoint for this connection.
            /// </summary>
            public virtual EndPoint LocalEndPoint { get; set; }
    
            /// <summary>
            /// Gets or sets the remote endpoint for this connection.
            /// </summary>
            public virtual EndPoint RemoteEndPoint { get; set; }
    
            /// <summary>
            /// Aborts the underlying connection.
            /// </summary>
            /// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
            public virtual void Abort(ConnectionAbortedException abortReason)
            {
                // We expect this to be overridden, but this helps maintain back compat
                // with implementations of ConnectionContext that predate the addition of
                // ConnectionContext.Abort()
                Features.Get<IConnectionLifetimeFeature>()?.Abort();
            }
    
            /// <summary>
            /// Aborts the underlying connection.
            /// </summary>
            public virtual void Abort() => Abort(new ConnectionAbortedException("The connection was aborted by the application via ConnectionContext.Abort()."));
    
            /// <summary>
            /// Releases resources for the underlying connection.
            /// </summary>
            /// <returns>A <see cref="ValueTask"/> that completes when resources have been released.</returns>
            public virtual ValueTask DisposeAsync()
            {
                return default;
            }
        }
    internal abstract partial class TransportConnection : ConnectionContext
        {
            private IDictionary<object, object> _items;
            private string _connectionId;
    
            public TransportConnection()
            {
                FastReset();
            }
    
            public override EndPoint LocalEndPoint { get; set; }
            public override EndPoint RemoteEndPoint { get; set; }
    
            public override string ConnectionId
            {
                get
                {
                    if (_connectionId == null)
                    {
                        _connectionId = CorrelationIdGenerator.GetNextId();
                    }
    
                    return _connectionId;
                }
                set
                {
                    _connectionId = value;
                }
            }
    
            public override IFeatureCollection Features => this;
    
            public virtual MemoryPool<byte> MemoryPool { get; }
    
            public override IDuplexPipe Transport { get; set; }
    
            public IDuplexPipe Application { get; set; }
    
            public override IDictionary<object, object> Items
            {
                get
                {
                    // Lazily allocate connection metadata
                    return _items ?? (_items = new ConnectionItems());
                }
                set
                {
                    _items = value;
                }
            }
    
            public override CancellationToken ConnectionClosed { get; set; }
    
            // DO NOT remove this override to ConnectionContext.Abort. Doing so would cause
            // any TransportConnection that does not override Abort or calls base.Abort
            // to stack overflow when IConnectionLifetimeFeature.Abort() is called.
            // That said, all derived types should override this method should override
            // this implementation of Abort because canceling pending output reads is not
            // sufficient to abort the connection if there is backpressure.
            public override void Abort(ConnectionAbortedException abortReason)
            {
                Application.Input.CancelPendingRead();
            }
        }
    internal partial class TransportConnection : IConnectionIdFeature,
                                                     IConnectionTransportFeature,
                                                     IConnectionItemsFeature,
                                                     IMemoryPoolFeature,
                                                     IConnectionLifetimeFeature
        {
            // NOTE: When feature interfaces are added to or removed from this TransportConnection class implementation,
            // then the list of `features` in the generated code project MUST also be updated.
            // See also: tools/CodeGenerator/TransportConnectionFeatureCollection.cs
    
            MemoryPool<byte> IMemoryPoolFeature.MemoryPool => MemoryPool;
    
            IDuplexPipe IConnectionTransportFeature.Transport
            {
                get => Transport;
                set => Transport = value;
            }
    
            IDictionary<object, object> IConnectionItemsFeature.Items
            {
                get => Items;
                set => Items = value;
            }
    
            CancellationToken IConnectionLifetimeFeature.ConnectionClosed
            {
                get => ConnectionClosed;
                set => ConnectionClosed = value;
            }
    
            void IConnectionLifetimeFeature.Abort() => Abort(new ConnectionAbortedException("The connection was aborted by the application via IConnectionLifetimeFeature.Abort()."));
        }
     internal partial class TransportConnection : IFeatureCollection
        {
            private static readonly Type IConnectionIdFeatureType = typeof(IConnectionIdFeature);
            private static readonly Type IConnectionTransportFeatureType = typeof(IConnectionTransportFeature);
            private static readonly Type IConnectionItemsFeatureType = typeof(IConnectionItemsFeature);
            private static readonly Type IMemoryPoolFeatureType = typeof(IMemoryPoolFeature);
            private static readonly Type IConnectionLifetimeFeatureType = typeof(IConnectionLifetimeFeature);
    
            private object _currentIConnectionIdFeature;
            private object _currentIConnectionTransportFeature;
            private object _currentIConnectionItemsFeature;
            private object _currentIMemoryPoolFeature;
            private object _currentIConnectionLifetimeFeature;
    
            private int _featureRevision;
    
            private List<KeyValuePair<Type, object>> MaybeExtra;
    
            private void FastReset()
            {
                _currentIConnectionIdFeature = this;
                _currentIConnectionTransportFeature = this;
                _currentIConnectionItemsFeature = this;
                _currentIMemoryPoolFeature = this;
                _currentIConnectionLifetimeFeature = this;
    
            }
    
            // Internal for testing
            internal void ResetFeatureCollection()
            {
                FastReset();
                MaybeExtra?.Clear();
                _featureRevision++;
            }
    
            private object ExtraFeatureGet(Type key)
            {
                if (MaybeExtra == null)
                {
                    return null;
                }
                for (var i = 0; i < MaybeExtra.Count; i++)
                {
                    var kv = MaybeExtra[i];
                    if (kv.Key == key)
                    {
                        return kv.Value;
                    }
                }
                return null;
            }
    
            private void ExtraFeatureSet(Type key, object value)
            {
                if (MaybeExtra == null)
                {
                    MaybeExtra = new List<KeyValuePair<Type, object>>(2);
                }
    
                for (var i = 0; i < MaybeExtra.Count; i++)
                {
                    if (MaybeExtra[i].Key == key)
                    {
                        MaybeExtra[i] = new KeyValuePair<Type, object>(key, value);
                        return;
                    }
                }
                MaybeExtra.Add(new KeyValuePair<Type, object>(key, value));
            }
    
            bool IFeatureCollection.IsReadOnly => false;
    
            int IFeatureCollection.Revision => _featureRevision;
    
            object IFeatureCollection.this[Type key]
            {
                get
                {
                    object feature = null;
                    if (key == IConnectionIdFeatureType)
                    {
                        feature = _currentIConnectionIdFeature;
                    }
                    else if (key == IConnectionTransportFeatureType)
                    {
                        feature = _currentIConnectionTransportFeature;
                    }
                    else if (key == IConnectionItemsFeatureType)
                    {
                        feature = _currentIConnectionItemsFeature;
                    }
                    else if (key == IMemoryPoolFeatureType)
                    {
                        feature = _currentIMemoryPoolFeature;
                    }
                    else if (key == IConnectionLifetimeFeatureType)
                    {
                        feature = _currentIConnectionLifetimeFeature;
                    }
                    else if (MaybeExtra != null)
                    {
                        feature = ExtraFeatureGet(key);
                    }
    
                    return feature;
                }
    
                set
                {
                    _featureRevision++;
    
                    if (key == IConnectionIdFeatureType)
                    {
                        _currentIConnectionIdFeature = value;
                    }
                    else if (key == IConnectionTransportFeatureType)
                    {
                        _currentIConnectionTransportFeature = value;
                    }
                    else if (key == IConnectionItemsFeatureType)
                    {
                        _currentIConnectionItemsFeature = value;
                    }
                    else if (key == IMemoryPoolFeatureType)
                    {
                        _currentIMemoryPoolFeature = value;
                    }
                    else if (key == IConnectionLifetimeFeatureType)
                    {
                        _currentIConnectionLifetimeFeature = value;
                    }
                    else
                    {
                        ExtraFeatureSet(key, value);
                    }
                }
            }
    
            TFeature IFeatureCollection.Get<TFeature>()
            {
                TFeature feature = default;
                if (typeof(TFeature) == typeof(IConnectionIdFeature))
                {
                    feature = (TFeature)_currentIConnectionIdFeature;
                }
                else if (typeof(TFeature) == typeof(IConnectionTransportFeature))
                {
                    feature = (TFeature)_currentIConnectionTransportFeature;
                }
                else if (typeof(TFeature) == typeof(IConnectionItemsFeature))
                {
                    feature = (TFeature)_currentIConnectionItemsFeature;
                }
                else if (typeof(TFeature) == typeof(IMemoryPoolFeature))
                {
                    feature = (TFeature)_currentIMemoryPoolFeature;
                }
                else if (typeof(TFeature) == typeof(IConnectionLifetimeFeature))
                {
                    feature = (TFeature)_currentIConnectionLifetimeFeature;
                }
                else if (MaybeExtra != null)
                {
                    feature = (TFeature)(ExtraFeatureGet(typeof(TFeature)));
                }
    
                return feature;
            }
    
            void IFeatureCollection.Set<TFeature>(TFeature feature)
            {
                _featureRevision++;
                if (typeof(TFeature) == typeof(IConnectionIdFeature))
                {
                    _currentIConnectionIdFeature = feature;
                }
                else if (typeof(TFeature) == typeof(IConnectionTransportFeature))
                {
                    _currentIConnectionTransportFeature = feature;
                }
                else if (typeof(TFeature) == typeof(IConnectionItemsFeature))
                {
                    _currentIConnectionItemsFeature = feature;
                }
                else if (typeof(TFeature) == typeof(IMemoryPoolFeature))
                {
                    _currentIMemoryPoolFeature = feature;
                }
                else if (typeof(TFeature) == typeof(IConnectionLifetimeFeature))
                {
                    _currentIConnectionLifetimeFeature = feature;
                }
                else
                {
                    ExtraFeatureSet(typeof(TFeature), feature);
                }
            }
    
            private IEnumerable<KeyValuePair<Type, object>> FastEnumerable()
            {
                if (_currentIConnectionIdFeature != null)
                {
                    yield return new KeyValuePair<Type, object>(IConnectionIdFeatureType, _currentIConnectionIdFeature);
                }
                if (_currentIConnectionTransportFeature != null)
                {
                    yield return new KeyValuePair<Type, object>(IConnectionTransportFeatureType, _currentIConnectionTransportFeature);
                }
                if (_currentIConnectionItemsFeature != null)
                {
                    yield return new KeyValuePair<Type, object>(IConnectionItemsFeatureType, _currentIConnectionItemsFeature);
                }
                if (_currentIMemoryPoolFeature != null)
                {
                    yield return new KeyValuePair<Type, object>(IMemoryPoolFeatureType, _currentIMemoryPoolFeature);
                }
                if (_currentIConnectionLifetimeFeature != null)
                {
                    yield return new KeyValuePair<Type, object>(IConnectionLifetimeFeatureType, _currentIConnectionLifetimeFeature);
                }
    
                if (MaybeExtra != null)
                {
                    foreach (var item in MaybeExtra)
                    {
                        yield return item;
                    }
                }
            }
    
            IEnumerator<KeyValuePair<Type, object>> IEnumerable<KeyValuePair<Type, object>>.GetEnumerator() => FastEnumerable().GetEnumerator();
    
            IEnumerator IEnumerable.GetEnumerator() => FastEnumerable().GetEnumerator();
        }
    internal sealed class SocketConnection : TransportConnection
        {
            private static readonly int MinAllocBufferSize = SlabMemoryPool.BlockSize / 2;
            private static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
            private static readonly bool IsMacOS = RuntimeInformation.IsOSPlatform(OSPlatform.OSX);
    
            private readonly Socket _socket;
            private readonly ISocketsTrace _trace;
            private readonly SocketReceiver _receiver;
            private readonly SocketSender _sender;
            private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
    
            private readonly object _shutdownLock = new object();
            private volatile bool _socketDisposed;
            private volatile Exception _shutdownReason;
            private Task _processingTask;
            private readonly TaskCompletionSource<object> _waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
            private bool _connectionClosed;
    
            internal SocketConnection(Socket socket,
                                      MemoryPool<byte> memoryPool,
                                      PipeScheduler scheduler,
                                      ISocketsTrace trace,
                                      long? maxReadBufferSize = null,
                                      long? maxWriteBufferSize = null)
            {
                Debug.Assert(socket != null);
                Debug.Assert(memoryPool != null);
                Debug.Assert(trace != null);
    
                _socket = socket;
                MemoryPool = memoryPool;
                _trace = trace;
    
                LocalEndPoint = _socket.LocalEndPoint;
                RemoteEndPoint = _socket.RemoteEndPoint;
    
                ConnectionClosed = _connectionClosedTokenSource.Token;
    
                // On *nix platforms, Sockets already dispatches to the ThreadPool.
                // Yes, the IOQueues are still used for the PipeSchedulers. This is intentional.
                // https://github.com/aspnet/KestrelHttpServer/issues/2573
                var awaiterScheduler = IsWindows ? scheduler : PipeScheduler.Inline;
    
                _receiver = new SocketReceiver(_socket, awaiterScheduler);
                _sender = new SocketSender(_socket, awaiterScheduler);
    
                maxReadBufferSize ??= 0;
                maxWriteBufferSize ??= 0;
    
                var inputOptions = new PipeOptions(MemoryPool, PipeScheduler.ThreadPool, scheduler, maxReadBufferSize.Value, maxReadBufferSize.Value / 2, useSynchronizationContext: false);
                var outputOptions = new PipeOptions(MemoryPool, scheduler, PipeScheduler.ThreadPool, maxWriteBufferSize.Value, maxWriteBufferSize.Value / 2, useSynchronizationContext: false);
    
                var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);
    
                // Set the transport and connection id
                Transport = pair.Transport;
                Application = pair.Application;
            }
    
            public PipeWriter Input => Application.Output;
    
            public PipeReader Output => Application.Input;
    
            public override MemoryPool<byte> MemoryPool { get; }
    
            public void Start()
            {
                _processingTask = StartAsync();
            }
    
            private async Task StartAsync()
            {
                try
                {
                    // Spawn send and receive logic
                    var receiveTask = DoReceive();
                    var sendTask = DoSend();
    
                    // Now wait for both to complete
                    await receiveTask;
                    await sendTask;
    
                    _receiver.Dispose();
                    _sender.Dispose();
                }
                catch (Exception ex)
                {
                    _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}.");
                }
            }
    
            public override void Abort(ConnectionAbortedException abortReason)
            {
                // Try to gracefully close the socket to match libuv behavior.
                Shutdown(abortReason);
    
                // Cancel ProcessSends loop after calling shutdown to ensure the correct _shutdownReason gets set.
                Output.CancelPendingRead();
            }
    
            // Only called after connection middleware is complete which means the ConnectionClosed token has fired.
            public override async ValueTask DisposeAsync()
            {
                Transport.Input.Complete();
                Transport.Output.Complete();
    
                if (_processingTask != null)
                {
                    await _processingTask;
                }
    
                _connectionClosedTokenSource.Dispose();
            }
    
            private async Task DoReceive()
            {
                Exception error = null;
    
                try
                {
                    await ProcessReceives();
                }
                catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode))
                {
                    // This could be ignored if _shutdownReason is already set.
                    error = new ConnectionResetException(ex.Message, ex);
    
                    // There's still a small chance that both DoReceive() and DoSend() can log the same connection reset.
                    // Both logs will have the same ConnectionId. I don't think it's worthwhile to lock just to avoid this.
                    if (!_socketDisposed)
                    {
                        _trace.ConnectionReset(ConnectionId);
                    }
                }
                catch (Exception ex)
                    when ((ex is SocketException socketEx && IsConnectionAbortError(socketEx.SocketErrorCode)) ||
                           ex is ObjectDisposedException)
                {
                    // This exception should always be ignored because _shutdownReason should be set.
                    error = ex;
    
                    if (!_socketDisposed)
                    {
                        // This is unexpected if the socket hasn't been disposed yet.
                        _trace.ConnectionError(ConnectionId, error);
                    }
                }
                catch (Exception ex)
                {
                    // This is unexpected.
                    error = ex;
                    _trace.ConnectionError(ConnectionId, error);
                }
                finally
                {
                    // If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
                    Input.Complete(_shutdownReason ?? error);
    
                    FireConnectionClosed();
    
                    await _waitForConnectionClosedTcs.Task;
                }
            }
    
            private async Task ProcessReceives()
            {
                // Resolve `input` PipeWriter via the IDuplexPipe interface prior to loop start for performance.
                var input = Input;
                while (true)
                {
                    // Wait for data before allocating a buffer.
                    await _receiver.WaitForDataAsync();
    
                    // Ensure we have some reasonable amount of buffer space
                    var buffer = input.GetMemory(MinAllocBufferSize);
    
                    var bytesReceived = await _receiver.ReceiveAsync(buffer);
    
                    if (bytesReceived == 0)
                    {
                        // FIN
                        _trace.ConnectionReadFin(ConnectionId);
                        break;
                    }
    
                    input.Advance(bytesReceived);
    
                    var flushTask = input.FlushAsync();
    
                    var paused = !flushTask.IsCompleted;
    
                    if (paused)
                    {
                        _trace.ConnectionPause(ConnectionId);
                    }
    
                    var result = await flushTask;
    
                    if (paused)
                    {
                        _trace.ConnectionResume(ConnectionId);
                    }
    
                    if (result.IsCompleted || result.IsCanceled)
                    {
                        // Pipe consumer is shut down, do we stop writing
                        break;
                    }
                }
            }
    
            private async Task DoSend()
            {
                Exception shutdownReason = null;
                Exception unexpectedError = null;
    
                try
                {
                    await ProcessSends();
                }
                catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode))
                {
                    shutdownReason = new ConnectionResetException(ex.Message, ex);
                    _trace.ConnectionReset(ConnectionId);
                }
                catch (Exception ex)
                    when ((ex is SocketException socketEx && IsConnectionAbortError(socketEx.SocketErrorCode)) ||
                           ex is ObjectDisposedException)
                {
                    // This should always be ignored since Shutdown() must have already been called by Abort().
                    shutdownReason = ex;
                }
                catch (Exception ex)
                {
                    shutdownReason = ex;
                    unexpectedError = ex;
                    _trace.ConnectionError(ConnectionId, unexpectedError);
                }
                finally
                {
                    Shutdown(shutdownReason);
    
                    // Complete the output after disposing the socket
                    Output.Complete(unexpectedError);
    
                    // Cancel any pending flushes so that the input loop is un-paused
                    Input.CancelPendingFlush();
                }
            }
    
            private async Task ProcessSends()
            {
                // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
                var output = Output;
                while (true)
                {
                    var result = await output.ReadAsync();
    
                    if (result.IsCanceled)
                    {
                        break;
                    }
    
                    var buffer = result.Buffer;
    
                    var end = buffer.End;
                    var isCompleted = result.IsCompleted;
                    if (!buffer.IsEmpty)
                    {
                        await _sender.SendAsync(buffer);
                    }
    
                    output.AdvanceTo(end);
    
                    if (isCompleted)
                    {
                        break;
                    }
                }
            }
    
            private void FireConnectionClosed()
            {
                // Guard against scheduling this multiple times
                if (_connectionClosed)
                {
                    return;
                }
    
                _connectionClosed = true;
    
                ThreadPool.UnsafeQueueUserWorkItem(state =>
                {
                    state.CancelConnectionClosedToken();
    
                    state._waitForConnectionClosedTcs.TrySetResult(null);
                },
                this,
                preferLocal: false);
            }
    
            private void Shutdown(Exception shutdownReason)
            {
                lock (_shutdownLock)
                {
                    if (_socketDisposed)
                    {
                        return;
                    }
    
                    // Make sure to close the connection only after the _aborted flag is set.
                    // Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when
                    // a BadHttpRequestException is thrown instead of a TaskCanceledException.
                    _socketDisposed = true;
    
                    // shutdownReason should only be null if the output was completed gracefully, so no one should ever
                    // ever observe the nondescript ConnectionAbortedException except for connection middleware attempting
                    // to half close the connection which is currently unsupported.
                    _shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully.");
    
                    _trace.ConnectionWriteFin(ConnectionId, _shutdownReason.Message);
    
                    try
                    {
                        // Try to gracefully close the socket even for aborts to match libuv behavior.
                        _socket.Shutdown(SocketShutdown.Both);
                    }
                    catch
                    {
                        // Ignore any errors from Socket.Shutdown() since we're tearing down the connection anyway.
                    }
    
                    _socket.Dispose();
                }
            }
    
            private void CancelConnectionClosedToken()
            {
                try
                {
                    _connectionClosedTokenSource.Cancel();
                }
                catch (Exception ex)
                {
                    _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(CancelConnectionClosedToken)}.");
                }
            }
    
            private static bool IsConnectionResetError(SocketError errorCode)
            {
                // A connection reset can be reported as SocketError.ConnectionAborted on Windows.
                // ProtocolType can be removed once https://github.com/dotnet/corefx/issues/31927 is fixed.
                return errorCode == SocketError.ConnectionReset ||
                       errorCode == SocketError.Shutdown ||
                       (errorCode == SocketError.ConnectionAborted && IsWindows) ||
                       (errorCode == SocketError.ProtocolType && IsMacOS);
            }
    
            private static bool IsConnectionAbortError(SocketError errorCode)
            {
                // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
                return errorCode == SocketError.OperationAborted ||
                       errorCode == SocketError.Interrupted ||
                       (errorCode == SocketError.InvalidArgument && !IsWindows);
            }
        }

    再来看下HttpConnection如何处理请求

    如果是Http1.0协议,则调用Http1Connection的ProcessRequestsAsync方法处理请求

     Http1Connection继承HttpProtocol并实现了以上的所有接口

    当调用IHttpApplication的CreateContext方法时,需要的IFeatureCollection参数其实就是Http1Connection对象

    通过Http1Connection对象来创建HttpContext

    然后就进入到了asp.net的处理管道中

  • 相关阅读:
    HDU 2047 阿牛的EOF牛肉串
    HDU 2015 偶数求和
    HDU 2029 算菜价
    HDU 2028 Lowest Common Multiple Plus
    动态函数库设计
    静态函数库设计
    Linux编程规范
    Linux应用程序地址布局
    Core Dump 程序故障分析
    W-D-S-UART编程
  • 原文地址:https://www.cnblogs.com/lanpingwang/p/12641429.html
Copyright © 2011-2022 走看看