zoukankan      html  css  js  c++  java
  • [IO]System.IO.Pipelines

      System.IO.Pipelines 是一个新库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。

    System.IO.Pipelines 解决什么问题

      System.IO.Pipelines 已构建为:

    • 具有高性能的流数据分析功能。
    • 减少代码复杂性。

    下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 ' ' 分隔):

    async Task ProcessLinesAsync(NetworkStream stream)
    {
        var buffer = new byte[1024];
        await stream.ReadAsync(buffer, 0, buffer.Length);
        
        // Process a single line from the buffer
        ProcessLine(buffer);
    }

    前面的代码有几个问题:

    • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
    • 忽略了 stream.ReadAsync 的结果。 stream.ReadAsync 返回读取的数据量。
    • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
    • 它为每次读取分配一个 byte 数组。

    要解决上述问题,需要进行以下更改:

    • 缓冲传入的数据,直到找到新行。

    • 分析缓冲区中返回的所有行。

    • 该行可能大于 1KB(1024 字节)。 找到需要调整输入缓冲区大小的代码(一行完整的代码)。

      • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
      • 压缩用于读取行的缓冲区,以减少空余。
    • 请考虑使用缓冲池来避免重复分配内存。

    下面的代码解决了其中一些问题:

     1 async Task ProcessLinesAsync(NetworkStream stream)
     2 {
     3     byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
     4     var bytesBuffered = 0;
     5     var bytesConsumed = 0;
     6 
     7     while (true)
     8     {
     9         // Calculate the amount of bytes remaining in the buffer.
    10         var bytesRemaining = buffer.Length - bytesBuffered;
    11 
    12         if (bytesRemaining == 0)
    13         {
    14             // Double the buffer size and copy the previously buffered data into the new buffer.
    15             var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
    16             Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
    17             // Return the old buffer to the pool.
    18             ArrayPool<byte>.Shared.Return(buffer);
    19             buffer = newBuffer;
    20             bytesRemaining = buffer.Length - bytesBuffered;
    21         }
    22 
    23         var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
    24         if (bytesRead == 0)
    25         {
    26             // EOF
    27             break;
    28         }
    29 
    30         // Keep track of the amount of buffered bytes.
    31         bytesBuffered += bytesRead;
    32         var linePosition = -1;
    33 
    34         do
    35         {
    36             // Look for a EOL in the buffered data.
    37             linePosition = Array.IndexOf(buffer, (byte)'
    ', bytesConsumed,
    38                                          bytesBuffered - bytesConsumed);
    39 
    40             if (linePosition >= 0)
    41             {
    42                 // Calculate the length of the line based on the offset.
    43                 var lineLength = linePosition - bytesConsumed;
    44 
    45                 // Process the line.
    46                 ProcessLine(buffer, bytesConsumed, lineLength);
    47 
    48                 // Move the bytesConsumed to skip past the line consumed (including 
    ).
    49                 bytesConsumed += lineLength + 1;
    50             }
    51         }
    52         while (linePosition >= 0);
    53     }
    54 }
    View Code

    Pipe

     Pipe 类可用于创建 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都可用于 PipeReader

    var pipe = new Pipe();
    PipeReader reader = pipe.Reader;
    PipeWriter writer = pipe.Writer;
    

    Pipe基本用法

     1 async Task ProcessLinesAsync(Socket socket)
     2 {
     3     var pipe = new Pipe();
     4     Task writing = FillPipeAsync(socket, pipe.Writer);
     5     Task reading = ReadPipeAsync(pipe.Reader);
     6 
     7     await Task.WhenAll(reading, writing);
     8 }
     9 
    10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
    11 {
    12     const int minimumBufferSize = 512;
    13 
    14     while (true)
    15     {
    16         // Allocate at least 512 bytes from the PipeWriter.
    17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
    18         try
    19         {
    20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
    21             if (bytesRead == 0)
    22             {
    23                 break;
    24             }
    25             // Tell the PipeWriter how much was read from the Socket.
    26             writer.Advance(bytesRead);
    27         }
    28         catch (Exception ex)
    29         {
    30             LogError(ex);
    31             break;
    32         }
    33 
    34         // Make the data available to the PipeReader.
    35         FlushResult result = await writer.FlushAsync();
    36 
    37         if (result.IsCompleted)
    38         {
    39             break;
    40         }
    41     }
    42 
    43      // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    44     await writer.CompleteAsync();
    45 }
    46 
    47 async Task ReadPipeAsync(PipeReader reader)
    48 {
    49     while (true)
    50     {
    51         ReadResult result = await reader.ReadAsync();
    52         ReadOnlySequence<byte> buffer = result.Buffer;
    53 
    54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
    55         {
    56             // Process the line.
    57             ProcessLine(line);
    58         }
    59 
    60         // Tell the PipeReader how much of the buffer has been consumed.
    61         reader.AdvanceTo(buffer.Start, buffer.End);
    62 
    63         // Stop reading if there's no more data coming.
    64         if (result.IsCompleted)
    65         {
    66             break;
    67         }
    68     }
    69 
    70     // Mark the PipeReader as complete.
    71     await reader.CompleteAsync();
    72 }
    73 
    74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
    75 {
    76     // Look for a EOL in the buffer.
    77     SequencePosition? position = buffer.PositionOf((byte)'
    ');
    78 
    79     if (position == null)
    80     {
    81         line = default;
    82         return false;
    83     }
    84 
    85     // Skip the line + the 
    .
    86     line = buffer.Slice(0, position.Value);
    87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    88     return true;
    89 }
    View Code

    有两个循环:

    • FillPipeAsync 从 Socket 读取并写入 PipeWriter
    • ReadPipeAsync 从 PipeReader 读取并分析传入的行。

    没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader 和 PipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。

    在第一个循环中:

    在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:

    • 返回包含两条重要信息的 ReadResult

      • 以 ReadOnlySequence<byte> 形式读取的数据。
      • 布尔值 IsCompleted,指示是否已到达数据结尾 (EOF)。

    找到行尾 (EOL) 分隔符并分析该行后:

    • 该逻辑处理缓冲区以跳过已处理的内容。
    • 调用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。

    读取器和编写器循环通过调用 Complete 结束。 Complete 使基础管道释放其分配的内存。

    反压和流量控制

    理想情况下,读取和分析可协同工作:

    • 写入线程使用来自网络的数据并将其放入缓冲区。
    • 分析线程负责构造适当的数据结构。

    通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

    • 读取线程领先于分析线程。
    • 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。

    为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

    为解决上述问题,Pipe 提供了两个设置来控制数据流:

    具有 ResumeWriterThreshold 和 PauseWriterThreshold 的图

    PipeWriter.FlushAsync

    • 当 Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>
    • 低于 ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>

    使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

  • 相关阅读:
    PHP之旅3 php数组以及遍历数组 以及each() list() foreach()
    NSSetUncaughtExceptionHandler
    runtime
    Objective-C中的instancetype和id区别
    tableView 局部刷新
    CGAffineTransform
    iOS中文本属性Attributes
    ios 相机 自定义 相片的截取
    php程序的生命周期
    PHP代码执行流程
  • 原文地址:https://www.cnblogs.com/amytal/p/11723026.html
Copyright © 2011-2022 走看看