zoukankan      html  css  js  c++  java
  • corefx 源码学习:NetworkStream.ReadAsync 是如何从 Socket 异步读取数据的

    最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于 corefx 2.2)的兴趣。

    这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。 

    NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是

    public Task<int> ReadAsync(byte[] buffer, int offset, int count);

    实际调用的是

    public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)

    上面的的方法被 NetworkStream 重写(override),调用的是 Socket 的 ReceiveAsync 方法

    return _streamSocket.ReceiveAsync(
        new Memory<byte>(buffer, offset, size),
        SocketFlags.None,
        fromNetworkStream: true,
        cancellationToken).AsTask();

    Socket.ReceiveAsync 的方法签名

    internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)

    主要实现代码

    AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive);
    if (saea.Reserve())
    {
        saea.SetBuffer(buffer);
        saea.SocketFlags = socketFlags;
        saea.WrapExceptionsInIOExceptions = fromNetworkStream;
        var result = saea.ReceiveAsync(this);
        return result;
    }
    else
    {
        // We couldn't get a cached instance, due to a concurrent receive operation on the socket.
        // Fall back to wrapping APM.
        return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags));
    }

    通常情况下都会使用 AwaitableSocketAsyncEventArgs 异步读取数据,所以我们这里只从 saea.ReceiveAsync 往下看。

    saea.ReceiveAsync 调用的是 Socket.ReceiveAsync(SocketAsyncEventArgs e)  方法,而后者调用的是 SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle) 。

    在 Linux 上 DoOperationReceive 的实现在 SocketAsyncEventArgs.Unix.cs 中,主要代码如下

    internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle)
    {
        //...
        if (_bufferList == null)
        {
            errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
        }
        else
        {
            errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
        }
    
        if (errorCode != SocketError.IOPending)
        {
            CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode);
            FinishOperationSync(errorCode, bytesReceived, flags);
        }
    
        return errorCode;
    }

    handle.AsyncContext.ReceiveAsync 对应的 Linux 实现在 SocketAsyncContext.Unix.cs 中,调用的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要实现代码如下

    public SocketError ReceiveFromAsync(Memory<byte> buffer,  SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback)
    {
        SetNonBlocking();
    
        SocketError errorCode;
        int observedSequenceNumber;
        if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
            SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
        {
            return errorCode;
        }
    
        BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
        operation.Callback = callback;
        operation.Buffer = buffer;
        operation.Flags = flags;
        operation.SocketAddress = socketAddress;
        operation.SocketAddressLen = socketAddressLen;
    
        if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
        {
            receivedFlags = operation.ReceivedFlags;
            bytesReceived = operation.BytesTransferred;
            errorCode = operation.ErrorCode;
    
            ReturnOperation(operation);
            return errorCode;
        }
    
        bytesReceived = 0;
        receivedFlags = SocketFlags.None;
        return SocketError.IOPending;
    }

    SocketPal.TryCompleteReceiveFrom 的实现代码在 SocketPal.Unix.cs 中,所调用的另一个 TryCompleteReceiveFrom 方法的签名是

    public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)

    该方法调用的是 Receive 方法

    private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)

    在 Receive 方法中调用了 

    errno = Interop.Sys.ReceiveMessage(
        socket.DangerousGetHandle(), 
        &messageHeader,
        flags,
        &received);

    Interop.Sys.ReceiveMessage 对应的是 Linux 本地库中的方法

    internal static partial class Sys
    {
        [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")]
        internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received);
    }

    Libraries.SystemNative 对应的是哪个库呢?

    它就是 System.Native.so 

    $ find /usr/share/dotnet/ -name System.Native.so
    /usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so

    接下来根据 SocketError.IOPending 的情况阅读源码。

    SocketAsyncEventArgs 在 DoOperationReceive 方法中调用 SocketAsyncContext.ReceiveFrom 方法时(handle.AsyncContext.ReceiveAsync)传递了 TransferCompletionCallback 参数值,在异步操作时是通过这个 callback 读取 socket 数据的,对应的方法是 TransferCompletionCallbackCore 。

    private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError)
    {
        CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError);
    
        CompletionCallback(bytesTransferred, receivedFlags, socketError);
    }

    TransferCompletionCallbackCore 中进一步调用 CompletionCallback 

    private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError)
    {
        if (socketError == SocketError.Success)
        {
            FinishOperationAsyncSuccess(bytesTransferred, flags);
        }
        else
        {
            if (_currentSocket.CleanedUp)
            {
                socketError = SocketError.OperationAborted;
            }
    
            FinishOperationAsyncFailure(socketError, bytesTransferred, flags);
        }
    }

    在 CompletionCallback 中当 SocketError.Success 时进一步调用 FinishOperationAsyncSuccess 

    internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags)
    {
        FinishOperationSyncSuccess(bytesTransferred, flags);
    
        // Raise completion event.
        if (_context == null)
        {
            OnCompleted(this);
        }
        else
        {
            ExecutionContext.Run(_context, s_executionCallback, this);
        }
    }

    从上面的代码可以看出实际调用的也是 FinishOperationSyncSuccess ,异步与同步读取数据最终调用的是同一个方法。

  • 相关阅读:
    shiro cookie登录根据角色跳转不同页面的解决方案
    springboot 引入shiro缓存及rememberMe支持
    android开发记录2 webview调用二维码扫描
    android app开发记录1
    bootstrap datetimepicker 出现选中后没有日期的原因
    mybatis-plus @Select select in 查询实现
    Mybatis-Plus 中的 @Select 出现查询不到数据的原因
    caffe Python API 之InnerProduct
    caffe Python API 之LRN
    caffe Python API 之Dropout
  • 原文地址:https://www.cnblogs.com/dudu/p/10804477.html
Copyright © 2011-2022 走看看