zoukankan      html  css  js  c++  java
  • iOS即时通讯之CocoaAsyncSocket源码解析五

    接上篇:iOS即时通讯之CocoaAsyncSocket源码解析四         原文

    前言:

    本文为CocoaAsyncSocket Read篇终,将重点涉及该框架是如何利用缓冲区对数据进行读取、以及各种情况下的数据包处理,其中还包括普通的、和基于TLS的不同读取操作等等。

    正文:

    前文讲完了两次TLS建立连接的流程,接着就是本篇的重头戏了:doReadData方法。在这里我不准备直接把这个整个方法列出来,因为就光这一个方法,加上注释有1200行,整个贴过来也无法展开描述,所以在这里我打算对它分段进行讲解:

    注:以下代码整个包括在doReadData大括号中:

    //读取数据
    - (void)doReadData
    {
      ....
    }

    Part1.无法正常读取数据时的前置处理:

     1 //如果当前读取的包为空,或者flag为读取停止,这两种情况是不能去读取数据的
     2 if ((currentRead == nil) || (flags & kReadsPaused))
     3 {
     4      LogVerbose(@"No currentRead or kReadsPaused");
     5 
     6      // Unable to read at this time
     7      //如果是安全的通信,通过TLS/SSL
     8      if (flags & kSocketSecure)
     9      {
    10        //刷新SSLBuffer,把数据从链路上移到prebuffer中 (当前不读取数据的时候做)
    11           [self flushSSLBuffers];
    12      }
    13 
    14    //判断是否用的是 CFStream的TLS
    15      if ([self usingCFStreamForTLS])
    16      {
    17 
    18      }
    19      else
    20      {
    21        //挂起source
    22           if (socketFDBytesAvailable > 0)
    23           {
    24                [self suspendReadSource];
    25           }
    26      }
    27      return;
    28 }

    当我们当前读取的包是空或者标记为读停止状态的时候,则不会去读取数据。
    前者不难理解,因为我们要读取的数据最终是要传给currentRead中去的,所以如果currentRead为空,我们去读数据也没有意义。
    后者kReadsPaused标记是从哪里加上的呢?我们全局搜索一下,发现它才read超时的时候被添加。
    讲到这我们顺便来看这个读取超时的一个逻辑,我们每次做读取任务传进来的超时,都会调用这么一个方法:

    [self setupReadTimerWithTimeout:currentRead->timeout];
     1 //初始化读的超时
     2 - (void)setupReadTimerWithTimeout:(NSTimeInterval)timeout
     3 {
     4     if (timeout >= 0.0)
     5     {
     6         //生成一个定时器source
     7         readTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
     8 
     9         __weak GCDAsyncSocket *weakSelf = self;
    10 
    11         //句柄
    12         dispatch_source_set_event_handler(readTimer, ^{ @autoreleasepool {
    13         #pragma clang diagnostic push
    14         #pragma clang diagnostic warning "-Wimplicit-retain-self"
    15 
    16             __strong GCDAsyncSocket *strongSelf = weakSelf;
    17             if (strongSelf == nil) return_from_block;
    18 
    19             //执行超时操作
    20             [strongSelf doReadTimeout];
    21 
    22         #pragma clang diagnostic pop
    23         }});
    24 
    25         #if !OS_OBJECT_USE_OBJC
    26         dispatch_source_t theReadTimer = readTimer;
    27 
    28         //取消的句柄
    29         dispatch_source_set_cancel_handler(readTimer, ^{
    30         #pragma clang diagnostic push
    31         #pragma clang diagnostic warning "-Wimplicit-retain-self"
    32 
    33             LogVerbose(@"dispatch_release(readTimer)");
    34             dispatch_release(theReadTimer);
    35 
    36         #pragma clang diagnostic pop
    37         });
    38         #endif
    39 
    40         //定时器延时 timeout时间执行
    41         dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeout * NSEC_PER_SEC));
    42         //间隔为永远,即只执行一次
    43         dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);
    44         dispatch_resume(readTimer);
    45     }
    46

    这个方法定义了一个GCD定时器,这个定时器只执行一次,间隔就是我们的超时,很显然这是一个延时执行,那小伙伴要问了,这里为什么我们不用NSTimer或者下面这种方式:

    [self performSelector:<#(nonnull SEL)#> withObject:<#(nullable id)#> afterDelay:<#(NSTimeInterval)#>

    原因很简单,performSelector是基于runloop才能使用的,它本质是转化成runloop基于非端口的源source0。很显然我们所在的socketQueue开辟出来的线程,并没有添加一个runloop。而NSTimer也是一样。

    所以这里我们用GCD Timer,因为它是基于XNU内核来实现的,并不需要借助于runloop

    这里当超时时间间隔到达时,我们会执行超时操作:

    [strongSelf doReadTimeout];
     1 /执行超时操作
     2 - (void)doReadTimeout
     3 {
     4     // This is a little bit tricky.
     5     // Ideally we'd like to synchronously query the delegate about a timeout extension.
     6     // But if we do so synchronously we risk a possible deadlock.
     7     // So instead we have to do so asynchronously, and callback to ourselves from within the delegate block.
     8 
     9     //因为这里用同步容易死锁,所以用异步从代理中回调
    10 
    11     //标记读暂停
    12     flags |= kReadsPaused;
    13 
    14     __strong id theDelegate = delegate;
    15 
    16     //判断是否实现了延时  补时的代理
    17     if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:shouldTimeoutReadWithTag:elapsed:bytesDone:)])
    18     {
    19         //拿到当前读的包
    20         GCDAsyncReadPacket *theRead = currentRead;
    21 
    22         //代理queue中回调
    23         dispatch_async(delegateQueue, ^{ @autoreleasepool {
    24 
    25             NSTimeInterval timeoutExtension = 0.0;
    26 
    27             //调用代理方法,拿到续的时长
    28             timeoutExtension = [theDelegate socket:self shouldTimeoutReadWithTag:theRead->tag
    29                                                                          elapsed:theRead->timeout
    30                                                                        bytesDone:theRead->bytesDone];
    31 
    32             //socketQueue中,做延时
    33             dispatch_async(socketQueue, ^{ @autoreleasepool {
    34 
    35                 [self doReadTimeoutWithExtension:timeoutExtension];
    36             }});
    37         }});
    38     }
    39     else
    40     {
    41         [self doReadTimeoutWithExtension:0.0];
    42     }
    43 }
    //做读取数据延时
    - (void)doReadTimeoutWithExtension:(NSTimeInterval)timeoutExtension
    {
        if (currentRead)
        {
            if (timeoutExtension > 0.0)
            {
                //把超时加上
                currentRead->timeout += timeoutExtension;
    
                // Reschedule the timer
                //重新生成时间
                dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeoutExtension * NSEC_PER_SEC));
                //重置timer时间
                dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);
    
                // Unpause reads, and continue
                //在把paused标记移除
                flags &= ~kReadsPaused;
                //继续去读取数据
                [self doReadData];
            }
            else
            {
                //输出读取超时,并断开连接
                LogVerbose(@"ReadTimeout");
    
                [self closeWithError:[self readTimeoutError]];
            }
        }
    }

    这里调用了续时代理,如果我们实现了这个代理,则可以增加这个超时时间,然后重新生成超时定时器,移除读取停止的标记kReadsPaused。继续去读取数据。
    否则我们就断开socket
    注意:这个定时器会被取消,如果当前数据包被读取完成,这样就不会走到定时器超时的时间,则不会断开socket。讲到这是不是大家就有印象了?这个就是之前在楼主:
    iOS即时通讯,从入门到“放弃”?中讲过的可以被用来做PingPong机制的原理。

    我们接着回到doReadData中,我们讲到如果当前读取包为空或者状态为kReadsPaused,我们就去执行一些非读取数据的处理。
    这里我们第一步去判断当前连接是否为kSocketSecure,也就是安全通道的TLS。如果是我们则调用

    if (flags & kSocketSecure)
    {
         //刷新,把TLS加密型的数据从链路上移到prebuffer中 (当前暂停的时候做)
         [self flushSSLBuffers];
    }

    按理说,我们有当前读取包的时候,在去从prebuffersocket中去读取,但是这里为什么要提前去读呢?
    我们来看看这个框架作者的解释:

    // Here's the situation:
    // We have an established secure connection.
    // There may not be a currentRead, but there might be encrypted data sitting around for us.
    // When the user does get around to issuing a read, that encrypted data will need to be decrypted.
    // So why make the user wait?
    // We might as well get a head start on decrypting some data now.
    // The other reason we do this has to do with detecting a socket disconnection.
    // The SSL/TLS protocol has it's own disconnection handshake.
    // So when a secure socket is closed, a "goodbye" packet comes across the wire.
    // We want to make sure we read the "goodbye" packet so we can properly detect the TCP disconnection.

    简单来讲,就是我们用TLS类型的Socket,读取数据的时候需要解密的过程,而这个过程是费时的,我们没必要让用户在读取数据的时候去等待这个解密的过程,我们可以提前在数据一到达,就去读取解密。
    而且这种方式,还能时刻根据TLSgoodbye包来准确的检测到TCP断开连接。

    在我们来看flushSSLBuffers方法之前,我们先来看看这个一直提到的全局缓冲区prebuffer的定义,它其实就是下面这么一个类的实例:

    Part3.GCDAsyncSocketPreBuffer的定义

    @interface GCDAsyncSocketPreBuffer : NSObject
    {
        //unsigned char
        //提前的指针,指向这块提前的缓冲区
        uint8_t *preBuffer;
        //size_t 它是一个与机器相关的unsigned类型,其大小足以保证存储内存中对象的大小。
        //它可以存储在理论上是可能的任何类型的数组的最大大小
        size_t preBufferSize;
        //读的指针
        uint8_t *readPointer;
        //写的指针
        uint8_t *writePointer;
    }

    里面存了3个指针,包括preBuffer起点指针、当前读写所处位置指针、以及一个preBufferSize,这个sizepreBuffer所指向的位置,在内存中分配的空间大小。

    我们来看看它的几个方法:

    //初始化
    - (id)initWithCapacity:(size_t)numBytes
    {
        if ((self = [super init]))
        {
            //设置size
            preBufferSize = numBytes;
            //申请size大小的内存给preBuffer
            preBuffer = malloc(preBufferSize);
    
            //为同一个值
            readPointer = preBuffer;
            writePointer = preBuffer;
        }
        return self;
    }

    包括一个初始化方法,去初始化preBufferSize大小的一块内存空间。然后3个指针都指向这个空间。

    - (void)dealloc
    {
        if (preBuffer)
            free(preBuffer);
    }

    销毁的方法:释放preBuffer。

     1 //确认读的大小
     2 - (void)ensureCapacityForWrite:(size_t)numBytes
     3 {
     4     //拿到当前可用的空间大小
     5     size_t availableSpace = [self availableSpace];
     6 
     7     //如果申请的大小大于可用的大小
     8     if (numBytes > availableSpace)
     9     {
    10         //需要多出来的大小
    11         size_t additionalBytes = numBytes - availableSpace;
    12         //新的总大小
    13         size_t newPreBufferSize = preBufferSize + additionalBytes;
    14         //重新去分配preBuffer
    15         uint8_t *newPreBuffer = realloc(preBuffer, newPreBufferSize);
    16 
    17         //读的指针偏移量(已读大小)
    18         size_t readPointerOffset = readPointer - preBuffer;
    19         //写的指针偏移量(已写大小)
    20         size_t writePointerOffset = writePointer - preBuffer;
    21         //提前的Buffer重新复制
    22         preBuffer = newPreBuffer;
    23         //大小重新赋值
    24         preBufferSize = newPreBufferSize;
    25 
    26         //读写指针重新赋值 + 上偏移量
    27         readPointer = preBuffer + readPointerOffset;
    28         writePointer = preBuffer + writePointerOffset;
    29     }
    30 }

    确保prebuffer可用空间的方法:这个方法会重新分配preBuffer,直到可用大小等于传递进来的numBytes,已用大小不会变。

     1 //仍然可读的数据,过程是先写后读,只有写的大于读的,才能让你继续去读,不然没数据可读了
     2 - (size_t)availableBytes
     3 {
     4     return writePointer - readPointer;
     5 }
     6 
     7 - (uint8_t *)readBuffer
     8 {
     9     return readPointer;
    10 }
    11 
    12 - (void)getReadBuffer:(uint8_t **)bufferPtr availableBytes:(size_t *)availableBytesPtr
    13 {
    14     if (bufferPtr) *bufferPtr = readPointer;
    15     if (availableBytesPtr) *availableBytesPtr = [self availableBytes];
    16 }
    17 
    18 //读数据的指针
    19 - (void)didRead:(size_t)bytesRead
    20 {
    21     readPointer += bytesRead;
    22     //如果读了这么多,指针和写的指针还相同的话,说明已经读完,重置指针到最初的位置
    23     if (readPointer == writePointer)
    24     {
    25         // The prebuffer has been drained. Reset pointers.
    26         readPointer  = preBuffer;
    27         writePointer = preBuffer;
    28     }
    29 }
    30 //prebuffer的剩余空间  = preBufferSize(总大小) - (写的头指针 - preBuffer一开的指针,即已被写的大小)
    31 
    32 - (size_t)availableSpace
    33 {
    34     return preBufferSize - (writePointer - preBuffer);
    35 }
    36 
    37 - (uint8_t *)writeBuffer
    38 {
    39     return writePointer;
    40 }
    41 
    42 - (void)getWriteBuffer:(uint8_t **)bufferPtr availableSpace:(size_t *)availableSpacePtr
    43 {
    44     if (bufferPtr) *bufferPtr = writePointer;
    45     if (availableSpacePtr) *availableSpacePtr = [self availableSpace];
    46 }
    47 
    48 - (void)didWrite:(size_t)bytesWritten
    49 {
    50     writePointer += bytesWritten;
    51 }
    52 
    53 - (void)reset
    54 {
    55     readPointer  = preBuffer;
    56     writePointer = preBuffer;
    57 }

    然后就是对读写指针进行处理的方法,如果读了多少数据readPointer就后移多少,写也是一样。
    而获取当前未读数据,则是用已写指针-已读指针,得到的差值,当已读=已写的时候,说明prebuffer数据读完,则重置读写指针的位置,还是指向初始化位置。

    讲完全局缓冲区对于指针的处理,我们接着往下说

    Part4.flushSSLBuffers方法:

      1 //缓冲ssl数据
      2 - (void)flushSSLBuffers
      3 {
      4      LogTrace();
      5      //断言为安全Socket
      6      NSAssert((flags & kSocketSecure), @"Cannot flush ssl buffers on non-secure socket");
      7      //如果preBuffer有数据可读,直接返回
      8      if ([preBuffer availableBytes] > 0)
      9      {
     10           return;
     11      }
     12 
     13      #if TARGET_OS_IPHONE
     14      //如果用的CFStream的TLS,把数据用CFStream的方式搬运到preBuffer中
     15      if ([self usingCFStreamForTLS])
     16      {
     17         //如果flag为kSecureSocketHasBytesAvailable,而且readStream有数据可读
     18           if ((flags & kSecureSocketHasBytesAvailable) && CFReadStreamHasBytesAvailable(readStream))
     19           {
     20                LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
     21 
     22             //默认一次读的大小为4KB??
     23                CFIndex defaultBytesToRead = (1024 * 4);
     24 
     25             //用来确保有这么大的提前buffer缓冲空间
     26                [preBuffer ensureCapacityForWrite:defaultBytesToRead];
     27                //拿到写的buffer
     28                uint8_t *buffer = [preBuffer writeBuffer];
     29 
     30             //从readStream中去读, 一次就读4KB,读到数据后,把数据写到writeBuffer中去   如果读的大小小于readStream中数据流大小,则会不停的触发callback,直到把数据读完为止。
     31                CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
     32             //打印结果
     33                LogVerbose(@"%@ - CFReadStreamRead(): result = %i", THIS_METHOD, (int)result);
     34 
     35             //大于0,说明读写成功
     36                if (result > 0)
     37                {
     38                 //把写的buffer头指针,移动result个偏移量
     39                     [preBuffer didWrite:result];
     40                }
     41 
     42             //把kSecureSocketHasBytesAvailable 仍然可读的标记移除
     43                flags &= ~kSecureSocketHasBytesAvailable;
     44           }
     45 
     46           return;
     47      }
     48 
     49      #endif
     50 
     51     //不用CFStream的处理方法
     52 
     53     //先设置一个预估可用的大小
     54      __block NSUInteger estimatedBytesAvailable = 0;
     55      //更新预估可用的Block
     56      dispatch_block_t updateEstimatedBytesAvailable = ^{
     57 
     58         //预估大小 = 未读的大小 + SSL的可读大小
     59           estimatedBytesAvailable = socketFDBytesAvailable + [sslPreBuffer availableBytes];
     60 
     61 
     62           size_t sslInternalBufSize = 0;
     63         //获取到ssl上下文的大小,从sslContext中
     64           SSLGetBufferedReadSize(sslContext, &sslInternalBufSize);
     65           //再加上下文的大小
     66           estimatedBytesAvailable += sslInternalBufSize;
     67      };
     68 
     69     //调用这个Block
     70      updateEstimatedBytesAvailable();
     71 
     72     //如果大于0,说明有数据可读
     73      if (estimatedBytesAvailable > 0)
     74      {
     75 
     76           LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
     77 
     78         //标志,循环是否结束,SSL的方式是会阻塞的,直到读的数据有estimatedBytesAvailable大小为止,或者出错
     79           BOOL done = NO;
     80           do
     81           {
     82                LogVerbose(@"%@ - estimatedBytesAvailable = %lu", THIS_METHOD, (unsigned long)estimatedBytesAvailable);
     83 
     84                // Make sure there's enough room in the prebuffer
     85                //确保有足够的空间给prebuffer
     86                [preBuffer ensureCapacityForWrite:estimatedBytesAvailable];
     87 
     88                // Read data into prebuffer
     89                //拿到写的buffer
     90                uint8_t *buffer = [preBuffer writeBuffer];
     91                size_t bytesRead = 0;
     92                //用SSLRead函数去读,读到后,把数据写到buffer中,estimatedBytesAvailable为需要读的大小,bytesRead这一次实际读到字节大小,为sslContext上下文
     93                OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
     94                LogVerbose(@"%@ - read from secure socket = %u", THIS_METHOD, (unsigned)bytesRead);
     95 
     96             //把写指针后移bytesRead大小
     97                if (bytesRead > 0)
     98                {
     99                     [preBuffer didWrite:bytesRead];
    100                }
    101 
    102                LogVerbose(@"%@ - prebuffer.length = %zu", THIS_METHOD, [preBuffer availableBytes]);
    103 
    104             //如果读数据出现错误
    105                if (result != noErr)
    106                {
    107                     done = YES;
    108                }
    109                else
    110                {
    111                 //在更新一下可读的数据大小
    112                     updateEstimatedBytesAvailable();
    113                }
    114 
    115           }
    116         //只有done为NO,而且 estimatedBytesAvailable大于0才继续循环
    117         while (!done && estimatedBytesAvailable > 0);
    118      }
    119 }

    这个方法有点略长,包含了两种SSL的数据处理:

    1. CFStream类型:我们会调用下面这个函数去从stream并且读取数据并解密:
    2. CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);

      数据被读取到后,直接转移到了prebuffer中,并且调用:

    3. [preBuffer didWrite:result];

      让写指针后移读取到的数据大小。
      这里有两个关于CFReadStreamRead方法,需要注意的问题:
      1)就是我们调用它去读取4KB数据,并不仅仅是只读这么多,而是因为这个方法是会递归调用的,它每次只读4KB,直到把stream中的数据读完。
      2)我们之前设置的CFStream函数的回调,在数据来了之后只会被触发一次,以后数据再来都不会触发。直到我们调用这个方法,把stream中的数据读完,下次再来数据才会触发函数回调。这也是我们在使用CFStream的时候,不需要担心像source那样,有数据会不断的被触发回调,而需要挂起像source那样挂起stream(实际也没有这样的方法)。

    2. SSL安全通道类型:这里我们主要是循环去调用下面这个函数去读取数据:

    OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);

    其他的基本和CFStream一致

    这里需要注意的是SSLRead这个方法,并不是直接从我们的socket中获取到的数据,而是从我们一开始绑定的SSL回调函数中,得到数据。而回调函数本身,也需要调用read函数从socket中获取到加密的数据。然后再经由SSLRead这个方法,数据被解密,并且传递给buffer

    至于SSLRead绑定的回调函数,是怎么处理数据读取的,因为它处理数据的流程,和我们doReadData后续数据读取处理基本相似,所以现在暂时不提。

    我们绕了一圈,讲完了这个包为空或者当前暂停状态下的前置处理,总结一下:

    1. 就是如果是SSL类型的数据,那么先解密了,缓冲到prebuffer中去。
    2. 判断当前socket可读数据大于0,非CFStreamSSL类型,则挂起source,防止反复触发。

    Part5.接着我们开始doReadData正常数据处理流程:

    首先它大的方向,依然是分为3种类型的数据处理:
    1.SSL安全通道; 2.CFStream类型SSL; 3.普通数据传输。
    因为这3种类型的代码,重复部分较大,处理流程基本类似,只不过调用读取方法所有区别:

    //1.
    OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
    //2.
    CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
    //3.
    ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

    SSLRead回调函数内部,也调用了第3种read读取,这个我们后面会说。
    现在这里我们将跳过前两种(方法部分调用可以见上面的flushSSLBuffers方法),只讲第3种普通数据的读取操作,而SSL的读取操作,基本一致。

    先来看看当前数据包任务是否完成,是如何定义的:

    由于框架提供的对外read接口:

    - (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag;
    - (void)readDataToLength:(NSUInteger)length withTimeout:(NSTimeInterval)timeout tag:(long)tag;
    - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag;

    将数据读取是否完成的操作,大致分为这3个类型:
    1.全读;2读取一定的长度;3读取到某个标记符为止。 

    当且仅当上面3种类型对应的操作完成,才视作当前包任务完成,才会回调我们在类中声明的读取消息的代理:

    - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag

    否则就等待着,直到当前数据包任务完成。

    然后我们读取数据的流程大致如下:

    先从prebuffer中去读取,如果读完了,当前数据包任务仍未完成,那么再从socket中去读取。
    而判断包是否读完,都是用我们上面的3种类型,来对应处理的。

    讲了半天理论,想必大家看的有点不耐烦了,接下来看看代码实际是如何处理的吧:

    step1:从prebuffer中读取数据:

     1 //先从提前缓冲区去读,如果缓冲区可读大小大于0
     2 if ([preBuffer availableBytes] > 0)
     3 {
     4      // There are 3 types of read packets:
     5      // 
     6      // 1) Read all available data.
     7      // 2) Read a specific length of data.
     8      // 3) Read up to a particular terminator.
     9      //3种类型的读法,1、全读、2、读取特定长度、3、读取到一个明确的界限
    10 
    11      NSUInteger bytesToCopy;
    12 
    13    //如果当前读的数据界限不为空
    14      if (currentRead->term != nil)
    15      {
    16           // Read type #3 - read up to a terminator
    17           //直接读到界限
    18           bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
    19      }
    20      else
    21      {
    22           // Read type #1 or #2
    23           //读取数据,读到指定长度或者数据包的长度为止
    24           bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];
    25      }
    26 
    27      // Make sure we have enough room in the buffer for our read.
    28      //从上两步拿到我们需要读的长度,去看看有没有空间去存储
    29      [currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];
    30 
    31      // Copy bytes from prebuffer into packet buffer
    32 
    33    //拿到我们需要追加数据的指针位置
    34 #pragma mark - 不明白
    35    //当前读的数据 + 开始偏移 + 已经读完的??
    36      uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
    37                                                                        currentRead->bytesDone;
    38      //从prebuffer处复制过来数据,bytesToCopy长度
    39      memcpy(buffer, [preBuffer readBuffer], bytesToCopy);
    40 
    41      // Remove the copied bytes from the preBuffer
    42    //从preBuffer移除掉已经复制的数据
    43      [preBuffer didRead:bytesToCopy];
    44 
    45 
    46      LogVerbose(@"copied(%lu) preBufferLength(%zu)", (unsigned long)bytesToCopy, [preBuffer availableBytes]);
    47 
    48      // Update totals
    49 
    50    //已读的数据加上
    51      currentRead->bytesDone += bytesToCopy;
    52    //当前已读的数据加上
    53      totalBytesReadForCurrentRead += bytesToCopy;
    54 
    55      // Check to see if the read operation is done
    56      //判断是不是读完了
    57      if (currentRead->readLength > 0)
    58      {
    59           // Read type #2 - read a specific length of data
    60           //如果已读 == 需要读的长度,说明已经读完
    61           done = (currentRead->bytesDone == currentRead->readLength);
    62      }
    63    //判断界限标记
    64      else if (currentRead->term != nil)
    65      {
    66           // Read type #3 - read up to a terminator
    67 
    68           // Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method
    69           //如果没做完,且读的最大长度大于0,去判断是否溢出
    70           if (!done && currentRead->maxLength > 0)
    71           {
    72                // We're not done and there's a set maxLength.
    73                // Have we reached that maxLength yet?
    74 
    75            //如果已读的大小大于最大的大小,则报溢出错误
    76                if (currentRead->bytesDone >= currentRead->maxLength)
    77                {
    78                     error = [self readMaxedOutError];
    79                }
    80           }
    81      }
    82      else
    83      {
    84           // Read type #1 - read all available data
    85           // 
    86           // We're done as soon as
    87           // - we've read all available data (in prebuffer and socket)
    88           // - we've read the maxLength of read packet.
    89           //判断已读大小和最大大小是否相同,相同则读完
    90           done = ((currentRead->maxLength > 0) && (currentRead->bytesDone == currentRead->maxLength));
    91      }
    92 
    93 }

    这个方法就是利用我们之前提到的3种类型,来判断数据包需要读取的长度,然后调用:

    memcpy(buffer, [preBuffer readBuffer], bytesToCopy);

    把数据从preBuffer中,移到了currentRead数据包中。

    step2:从socket中读取数据:

      1 // 从socket中去读取
      2 
      3 //是否读到EOFException ,这个错误指的是文件结尾了还在继续读,就会导致这个错误被抛出
      4 BOOL socketEOF = (flags & kSocketHasReadEOF) ? YES : NO;  // Nothing more to read via socket (end of file)
      5 
      6 //如果没完成,且没错,没读到结尾,且没有可读数据了
      7 BOOL waiting   = !done && !error && !socketEOF && !hasBytesAvailable; // Ran out of data, waiting for more
      8 
      9 //如果没完成,且没错,没读到结尾,有可读数据
     10 if (!done && !error && !socketEOF && hasBytesAvailable)
     11 {
     12    //断言,有可读数据
     13      NSAssert(([preBuffer availableBytes] == 0), @"Invalid logic");
     14    //是否读到preBuffer中去
     15    BOOL readIntoPreBuffer = NO;
     16      uint8_t *buffer = NULL;
     17      size_t bytesRead = 0;
     18 
     19    //如果flag标记为安全socket
     20      if (flags & kSocketSecure)
     21      {
     22        //...类似flushSSLBuffer的一系列操作
     23      }
     24      else
     25      {
     26           // Normal socket operation
     27           //普通的socket 操作
     28 
     29           NSUInteger bytesToRead;
     30 
     31           // There are 3 types of read packets:
     32           //
     33           // 1) Read all available data.
     34           // 2) Read a specific length of data.
     35           // 3) Read up to a particular terminator.
     36 
     37        //和上面类似,读取到边界标记??不是吧
     38           if (currentRead->term != nil)
     39           {
     40                // Read type #3 - read up to a terminator
     41 
     42            //读这个长度,如果到maxlength,就用maxlength。看如果可用空间大于需要读的空间,则不用prebuffer
     43                bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable
     44                                                     shouldPreBuffer:&readIntoPreBuffer];
     45           }
     46 
     47           else
     48           {
     49                // Read type #1 or #2
     50                //直接读这个长度,如果到maxlength,就用maxlength
     51                bytesToRead = [currentRead readLengthForNonTermWithHint:estimatedBytesAvailable];
     52           }
     53 
     54        //大于最大值,则先读最大值
     55           if (bytesToRead > SIZE_MAX) { // NSUInteger may be bigger than size_t (read param 3)
     56                bytesToRead = SIZE_MAX;
     57           }
     58 
     59           // Make sure we have enough room in the buffer for our read.
     60           //
     61           // We are either reading directly into the currentRead->buffer,
     62           // or we're reading into the temporary preBuffer.
     63 
     64           if (readIntoPreBuffer)
     65           {
     66                [preBuffer ensureCapacityForWrite:bytesToRead];
     67 
     68                buffer = [preBuffer writeBuffer];
     69           }
     70           else
     71           {
     72                [currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead];
     73 
     74                buffer = (uint8_t *)[currentRead->buffer mutableBytes]
     75                       + currentRead->startOffset
     76                       + currentRead->bytesDone;
     77           }
     78 
     79           // Read data into buffer
     80 
     81           int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
     82 #pragma mark - 开始读取数据,最普通的形式 read
     83 
     84        //读数据
     85           ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
     86           LogVerbose(@"read from socket = %i", (int)result);
     87        //读取错误
     88           if (result < 0)
     89           {
     90            //EWOULDBLOCK IO阻塞
     91                if (errno == EWOULDBLOCK)
     92                //先等待
     93                     waiting = YES;
     94                else
     95                //得到错误
     96                     error = [self errnoErrorWithReason:@"Error in read() function"];
     97                //把可读取的长度设置为0
     98                socketFDBytesAvailable = 0;
     99           }
    100        //读到边界了
    101           else if (result == 0)
    102           {
    103                socketEOF = YES;
    104                socketFDBytesAvailable = 0;
    105           }
    106        //正常
    107           else
    108           {
    109            //设置读到的数据长度
    110                bytesRead = result;
    111 
    112            //如果读到的数据小于应该读的长度,说明这个包没读完
    113                if (bytesRead < bytesToRead)
    114                {
    115                     // The read returned less data than requested.
    116                     // This means socketFDBytesAvailable was a bit off due to timing,
    117                     // because we read from the socket right when the readSource event was firing.
    118                     socketFDBytesAvailable = 0;
    119                }
    120            //正常
    121                else
    122                {
    123                //如果 socketFDBytesAvailable比读了的数据小的话,直接置为0
    124                     if (socketFDBytesAvailable <= bytesRead)
    125                          socketFDBytesAvailable = 0;
    126                //减去已读大小
    127                     else
    128                          socketFDBytesAvailable -= bytesRead;
    129                }
    130                //如果 socketFDBytesAvailable 可读数量为0,把读的状态切换为等待
    131                if (socketFDBytesAvailable == 0)
    132                {
    133                     waiting = YES;
    134                }
    135           }
    136      }

    本来想讲点什么。。发现确实没什么好讲的,无非就是判断应该读取的长度,然后调用: 

    ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

    socket中得到读取的实际长度。

    唯一需要讲一下的可能是数据流向的问题,这里调用:

    bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable shouldPreBuffer:&readIntoPreBuffer];

    来判断数据是否先流向prebuffer,还是直接流向currentRead,而SSL的读取中也有类似方法: 

    - (NSUInteger)optimalReadLengthWithDefault:(NSUInteger)defaultValue shouldPreBuffer:(BOOL *)shouldPreBufferPtr

    这个方法核心的思路就是,如果当前读取包,长度给明了,则直接流向currentRead,如果数据长度不清楚,那么则去判断这一次读取的长度,和currentRead可用空间长度去对比,如果长度比currentRead可用空间小,则流向currentRead,否则先用prebuffer来缓冲。

    至于细节方面,大家对着github中的源码注释看看吧,这么大篇幅的业务代码,一行行讲确实没什么意义。

    走完这两步读取,接着就是第三步:

     step3:判断数据包完成程度:

    这里有3种情况:

    1.数据包刚好读完;2.数据粘包;3.数据断包;
    注:这里判断粘包断包的长度,都是我们一开始调用read方法给的长度或者分界符得出的。

    很显然,第一种就什么都不用处理,完美匹配。
    第二种情况,我们把需要的长度放到currentRead,多余的长度放到prebuffer中去。
    第三种情况,数据还没读完,我们暂时为未读完。

    这里就不贴代码了。

    就这样普通读取数据的整个流程就走完了,而SSL的两种模式,和上述基本一致。

    我们接着根据之前读取的结果,来判断数据是否读完:

    //检查是否读完
    if (done)
    {
        //完成这次数据的读取
        [self completeCurrentRead];
        //如果没出错,没有到边界,prebuffer中还有可读数据
        if (!error && (!socketEOF || [preBuffer availableBytes] > 0))
        {
            //让读操作离队,继续进行下一次读取
            [self maybeDequeueRead];
        }
    }

    如果读完,则去做读完的操作,并且进行下一次读取。

    我们来看看读完的操作:

     1 //完成了这次的读数据
     2 - (void)completeCurrentRead
     3 {
     4     LogTrace();
     5     //断言currentRead
     6     NSAssert(currentRead, @"Trying to complete current read when there is no current read.");
     7 
     8     //结果数据
     9     NSData *result = nil;
    10 
    11     //如果是我们自己创建的Buffer
    12     if (currentRead->bufferOwner)
    13     {
    14         // We created the buffer on behalf of the user.
    15         // Trim our buffer to be the proper size.
    16         //修剪buffer到合适的大小
    17         //把大小设置到我们读取到的大小
    18         [currentRead->buffer setLength:currentRead->bytesDone];
    19         //赋值给result
    20         result = currentRead->buffer;
    21     }
    22     else
    23     {
    24         // We did NOT create the buffer.
    25         // The buffer is owned by the caller.
    26         // Only trim the buffer if we had to increase its size.
    27         //这是调用者的data,我们只会去加大尺寸
    28         if ([currentRead->buffer length] > currentRead->originalBufferLength)
    29         {
    30             //拿到的读的size
    31             NSUInteger readSize = currentRead->startOffset + currentRead->bytesDone;
    32             //拿到原始尺寸
    33             NSUInteger origSize = currentRead->originalBufferLength;
    34 
    35             //取得最大的
    36             NSUInteger buffSize = MAX(readSize, origSize);
    37             //把buffer设置为较大的尺寸
    38             [currentRead->buffer setLength:buffSize];
    39         }
    40         //拿到数据的头指针
    41         uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset;
    42 
    43         //reslut为,从头指针开始到长度为写的长度 freeWhenDone为YES,创建完就释放buffer
    44         result = [NSData dataWithBytesNoCopy:buffer length:currentRead->bytesDone freeWhenDone:NO];
    45     }
    46 
    47     __strong id theDelegate = delegate;
    48 
    49 #pragma mark -总算到调用代理方法,接受到数据了
    50     if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadData:withTag:)])
    51     {
    52         //拿到当前的数据包
    53         GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer
    54 
    55         dispatch_async(delegateQueue, ^{ @autoreleasepool {
    56             //把result在代理queue中回调出去。
    57             [theDelegate socket:self didReadData:result withTag:theRead->tag];
    58         }});
    59     }
    60     //取消掉读取超时
    61     [self endCurrentRead];
    62 }

    这里对currentReaddata做了个长度的设置。然后调用代理把最终包给回调出去。最后关掉我们之前提到的读取超时。

    还是回到doReadData,就剩下最后一点处理了:

    //如果这次读的数量大于0
    else if (totalBytesReadForCurrentRead > 0)
    {
        // We're not done read type #2 or #3 yet, but we have read in some bytes
    
        __strong id theDelegate = delegate;
    
        //如果响应读数据进度的代理
        if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
        {
            long theReadTag = currentRead->tag;
    
            //代理queue中回调出去
            dispatch_async(delegateQueue, ^{ @autoreleasepool {
    
                [theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
            }});
        }
    }

    这里未完成,如果这次读取大于0,如果响应读取进度的代理,则把当前进度回调出去。

    最后检查错误:

     1 //检查错误
     2 if (error)
     3 {
     4     //如果有错直接报错断开连接
     5     [self closeWithError:error];
     6 }
     7 //如果是读到边界错误
     8 else if (socketEOF)
     9 {
    10     [self doReadEOF];
    11 }
    12 
    13 //如果是等待
    14 else if (waiting)
    15 {
    16     //如果用的是CFStream,则读取数据和source无关
    17     //非CFStream形式
    18     if (![self usingCFStreamForTLS])
    19     {
    20         // Monitor the socket for readability (if we're not already doing so)
    21         //重新恢复source
    22         [self resumeReadSource];
    23     }
    24 }

    如果有错,直接断开socket,如果是边界错误,调用边界错误处理,如果是等待,说明当前包还没读完,如果非CFStreamTLS,则恢复source,等待下一次数据到达的触发。

    关于这个读取边界错误EOF,这里我简单的提下,其实它就是服务端发出一个边界错误,说明不会再有数据发送给我们了。我们讲无法再接收到数据,但是我们其实还是可以写数据,发送给服务端的。

    doReadEOF这个方法的处理,就是做了这么一件事。判断我们是否需要这种不可读,只能写的连接。

    我们来简单看看这个方法:

    Part6.读取边界错误处理:

      1 //读到EOFException,边界错误
      2 - (void)doReadEOF
      3 {
      4     LogTrace();
      5    //这个方法可能被调用很多次,如果读到EOF的时候,还有数据在prebuffer中,在调用doReadData之后?? 这个方法可能被持续的调用
      6 
      7     //标记为读EOF
      8     flags |= kSocketHasReadEOF;
      9 
     10     //如果是安全socket
     11     if (flags & kSocketSecure)
     12     {
     13         //去刷新sslbuffer中的数据
     14         [self flushSSLBuffers];
     15     }
     16 
     17     //标记是否应该断开连接
     18     BOOL shouldDisconnect = NO;
     19     NSError *error = nil;
     20 
     21     //如果状态为开始读写TLS
     22     if ((flags & kStartingReadTLS) || (flags & kStartingWriteTLS))
     23     {
     24         //我们得到EOF在开启TLS之前,这个TLS握手是不可能的,因此这是不可恢复的错误
     25 
     26         //标记断开连接
     27         shouldDisconnect = YES;
     28         //如果是安全的TLS,赋值错误
     29         if ([self usingSecureTransportForTLS])
     30         {
     31             error = [self sslError:errSSLClosedAbort];
     32         }
     33     }
     34     //如果是读流关闭状态
     35     else if (flags & kReadStreamClosed)
     36     {
     37 
     38         //不应该被关闭
     39         shouldDisconnect = NO;
     40     }
     41     else if ([preBuffer availableBytes] > 0)
     42     {
     43         //仍然有数据可读的时候不关闭
     44         shouldDisconnect = NO;
     45     }
     46     else if (config & kAllowHalfDuplexConnection)
     47     {
     48 
     49         //拿到socket
     50         int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
     51 
     52         //轮询用的结构体
     53 
     54         /*
     55          struct pollfd {
     56          int fd;        //文件描述符
     57          short events;  //要求查询的事件掩码  监听的
     58          short revents; //返回的事件掩码   实际发生的
     59          };
     60          */
     61 
     62         struct pollfd pfd[1];
     63         pfd[0].fd = socketFD;
     64         //写数据不会导致阻塞。
     65         pfd[0].events = POLLOUT;
     66         //这个为当前实际发生的事情
     67         pfd[0].revents = 0;
     68 
     69         /*
     70          poll函数使用pollfd类型的结构来监控一组文件句柄,ufds是要监控的文件句柄集合,nfds是监控的文件句柄数量,timeout是等待的毫秒数,这段时间内无论I/O是否准备好,poll都会返回。timeout为负数表示无线等待,timeout为0表示调用后立即返回。执行结果:为0表示超时前没有任何事件发生;-1表示失败;成功则返回结构体中revents不为0的文件描述符个数。pollfd结构监控的事件类型如下:
     71          int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
     72          */
     73         //阻塞的,但是timeout为0,则不阻塞,直接返回
     74         poll(pfd, 1, 0);
     75 
     76         //如果被触发的事件是写数据
     77         if (pfd[0].revents & POLLOUT)
     78         {
     79             // Socket appears to still be writeable
     80 
     81             //则标记为不关闭
     82             shouldDisconnect = NO;
     83             //标记为读流关闭
     84             flags |= kReadStreamClosed;
     85 
     86             // Notify the delegate that we're going half-duplex
     87             //通知代理,我们开始半双工
     88             __strong id theDelegate = delegate;
     89 
     90             //调用已经关闭读流的代理方法
     91             if (delegateQueue && [theDelegate respondsToSelector:@selector(socketDidCloseReadStream:)])
     92             {
     93                 dispatch_async(delegateQueue, ^{ @autoreleasepool {
     94 
     95                     [theDelegate socketDidCloseReadStream:self];
     96                 }});
     97             }
     98         }
     99         else
    100         {
    101             //标记为断开
    102             shouldDisconnect = YES;
    103         }
    104     }
    105     else
    106     {
    107         shouldDisconnect = YES;
    108     }
    109 
    110     //如果应该断开
    111     if (shouldDisconnect)
    112     {
    113         if (error == nil)
    114         {
    115             //判断是否是安全TLS传输
    116             if ([self usingSecureTransportForTLS])
    117             {
    118                 ///标记错误信息
    119                 if (sslErrCode != noErr && sslErrCode != errSSLClosedGraceful)
    120                 {
    121                     error = [self sslError:sslErrCode];
    122                 }
    123                 else
    124                 {
    125                     error = [self connectionClosedError];
    126                 }
    127             }
    128             else
    129             {
    130                 error = [self connectionClosedError];
    131             }
    132         }
    133         //关闭socket
    134         [self closeWithError:error];
    135     }
    136     //不断开
    137     else
    138     {
    139         //如果不是用CFStream流
    140         if (![self usingCFStreamForTLS])
    141         {
    142             // Suspend the read source (if needed)
    143             //挂起读source
    144             [self suspendReadSource];
    145         }
    146     }
    147 }

    简单说一下,这个方法主要是对socket是否需要主动关闭进行了判断:这里仅仅以下3种情况,不会关闭socket

    1. 读流已经是关闭状态(如果加了这个标记,说明为半双工连接状态)。
    2. preBuffer中还有可读数据,我们需要等数据读完才能关闭连接。
    3. 配置标记为kAllowHalfDuplexConnection,我们则要开始半双工处理。我们调用了:
    4. poll(pfd, 1, 0);
      函数,如果触发了写事件POLLOUT,说明我们半双工连接成功,则我们可以在读流关闭的状态下,仍然可以向服务器写数据。

    其他情况下,一律直接关闭socket
    而不关闭的情况下,我们会挂起source。这样我们就只能可写不可读了。

    最后还是提下SSL的回调方法,数据解密的地方。两种模式的回调;

    Part7.两种SSL数据解密位置:

    1.CFStream:当我们调用:

    CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);

    数据就会被解密。
    2.SSL安全通道:当我们调用:

    OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);

    会触发SSL绑定的函数回调:

    //读函数
    static OSStatus SSLReadFunction(SSLConnectionRef connection, void *data, size_t *dataLength)
    {
        //拿到socket
        GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)connection;
    
        //断言当前为socketQueue
        NSCAssert(dispatch_get_specific(asyncSocket->IsOnSocketQueueOrTargetQueueKey), @"What the deuce?");
    
        //读取数据,并且返回状态码
        return [asyncSocket sslReadWithBuffer:data length:dataLength];
    }

    接着我们在下面的方法进行了数据读取:

    //SSL读取数据最终方法
    - (OSStatus)sslReadWithBuffer:(void *)buffer length:(size_t *)bufferLength
    {
        //...
        ssize_t result = read(socketFD, buf, bytesToRead);
        //....
    }

    其实read这一步,数据是没有被解密的,然后传递回SSLReadFunction,在传递到SSLRead内部,数据被解密。

    尾声:

    这个系列就剩下最后一篇Write了。由于内容相对比较简单,预计就一篇写完了。
    如果一直看到这里的朋友,会发现,相对之前有些内容,讲解没那么详细了。其实原因主要有两点,一是代码数量庞大,确实无法详细。二是楼主对这个系列写的有点不耐烦,想要尽快结束了..
    不过至少整篇的源码注释在github上是有的,我觉得大家自己去对着源码去阅读理解同样重要,如果一直逐字逐行的去讲,那就真的没什么意义了。

     

  • 相关阅读:
    2020春Python程序设计_练习1
    热词分析——性能战术
    《淘宝网》质量属性
    架构师应该如何工作?
    寒假学习(十四)
    寒假学习(十三)
    灭霸冲刺(3)
    灭霸冲刺(2)
    灭霸冲刺(1)
    灭霸计划会议
  • 原文地址:https://www.cnblogs.com/francisblogs/p/6860347.html
Copyright © 2011-2022 走看看