+ (void)startCFStreamThreadIfNeeded { LogTrace(); static dispatch_once_t predicate; dispatch_once(&predicate, ^{ cfstreamThreadRetainCount = 0; cfstreamThreadSetupQueue = dispatch_queue_create("GCDAsyncSocket-CFStreamThreadSetup", DISPATCH_QUEUE_SERIAL); }); dispatch_sync(cfstreamThreadSetupQueue, ^{ @autoreleasepool { if (++cfstreamThreadRetainCount == 1) { cfstreamThread = [[NSThread alloc] initWithTarget:self selector:@selector(cfstreamThread) object:nil]; [cfstreamThread start]; } }}); } + (void)cfstreamThread { @autoreleasepool { [[NSThread currentThread] setName:GCDAsyncSocketThreadName]; LogInfo(@"CFStreamThread: Started"); // We can't run the run loop unless it has an associated input source or a timer. // So we'll just create a timer that will never fire - unless the server runs for decades. [NSTimer scheduledTimerWithTimeInterval:[[NSDate distantFuture] timeIntervalSinceNow] target:self selector:@selector(ignore:) userInfo:nil repeats:YES]; NSThread *currentThread = [NSThread currentThread]; NSRunLoop *currentRunLoop = [NSRunLoop currentRunLoop]; BOOL isCancelled = [currentThread isCancelled]; while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) { isCancelled = [currentThread isCancelled]; } LogInfo(@"CFStreamThread: Stopped"); }} - (BOOL)addStreamsToRunLoop { LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null"); if (!(flags & kAddedStreamsToRunLoop)) { LogVerbose(@"Adding streams to runloop..."); [[self class] startCFStreamThreadIfNeeded]; [[self class] performSelector:@selector(scheduleCFStreams:) onThread:cfstreamThread withObject:self waitUntilDone:YES]; flags |= kAddedStreamsToRunLoop; } return YES; }
一、GCDAsyncSocket的核心就是dispatch_source_set_event_handler
1.accpet回调
accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, 0, socketQueue); int socketFD = socket4FD; dispatch_source_t acceptSource = accept4Source; dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool { LogVerbose(@"event4Block"); unsigned long i = 0; unsigned long numPendingConnections = dispatch_source_get_data(acceptSource); LogVerbose(@"numPendingConnections: %lu", numPendingConnections); while ([self doAccept:socketFD] && (++i < numPendingConnections)); }});
2.read,write回调
readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue); writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue); // Setup event handlers dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool { LogVerbose(@"readEventBlock"); socketFDBytesAvailable = dispatch_source_get_data(readSource); LogVerbose(@"socketFDBytesAvailable: %lu", socketFDBytesAvailable); if (socketFDBytesAvailable > 0) [self doReadData]; else [self doReadEOF]; }}); dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool { LogVerbose(@"writeEventBlock"); flags |= kSocketCanAcceptBytes; [self doWriteData]; }});
二,缓存区
1.创建
GCDAsyncReadPacket没有传入buffer,则readpacket没有缓冲区,socket可读时会放入preBuffer
- (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag { [self readDataToData:data withTimeout:timeout buffer:nil bufferOffset:0 maxLength:0 tag:tag]; } - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout buffer:(NSMutableData *)buffer bufferOffset:(NSUInteger)offset tag:(long)tag { [self readDataToData:data withTimeout:timeout buffer:buffer bufferOffset:offset maxLength:0 tag:tag]; } - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout maxLength:(NSUInteger)length tag:(long)tag { [self readDataToData:data withTimeout:timeout buffer:nil bufferOffset:0 maxLength:length tag:tag]; } - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout buffer:(NSMutableData *)buffer bufferOffset:(NSUInteger)offset maxLength:(NSUInteger)maxLength tag:(long)tag { if ([data length] == 0) { LogWarn(@"Cannot read: [data length] == 0"); return; } if (offset > [buffer length]) { LogWarn(@"Cannot read: offset > [buffer length]"); return; } if (maxLength > 0 && maxLength < [data length]) { LogWarn(@"Cannot read: maxLength > 0 && maxLength < [data length]"); return; } GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer startOffset:offset maxLength:maxLength timeout:timeout readLength:0 terminator:data tag:tag]; dispatch_async(socketQueue, ^{ @autoreleasepool { LogTrace(); if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites)) { [readQueue addObject:packet]; [self maybeDequeueRead]; } }}); // Do not rely on the block being run in order to release the packet, // as the queue might get released without the block completing. }
三、面向对象封装
1.socket可读的时候先用preBuffer接收,拷贝到currentRead->buffer中,为生产者-消费者模式.
GCDAsyncReadPacket *currentRead; GCDAsyncWritePacket *currentWrite; GCDAsyncSocketPreBuffer *preBuffer;
uint8_t *buffer; if (readIntoPreBuffer) { [preBuffer ensureCapacityForWrite:bytesToRead]; buffer = [preBuffer writeBuffer]; } 。。。 int socketFD = (socket4FD == SOCKET_NULL) ? socket6FD : socket4FD; ssize_t result = read(socketFD, buffer, (size_t)bytesToRead); LogVerbose(@"read from socket = %i", (int)result); 。。。 if (readIntoPreBuffer) { // We just read a big chunk of data into the preBuffer [preBuffer didWrite:bytesRead]; LogVerbose(@"read data into preBuffer - preBuffer.length = %zu", [preBuffer availableBytes]); // Search for the terminating sequence bytesToRead = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done]; LogVerbose(@"copying %lu bytes from preBuffer", (unsigned long)bytesToRead); // Ensure there's room on the read packet's buffer [currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead]; // Copy bytes from prebuffer into read buffer uint8_t *readBuf = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset + currentRead->bytesDone; memcpy(readBuf, [preBuffer readBuffer], bytesToRead); // Remove the copied bytes from the prebuffer [preBuffer didRead:bytesToRead]; LogVerbose(@"preBuffer.length = %zu", [preBuffer availableBytes]); // Update totals currentRead->bytesDone += bytesToRead; totalBytesReadForCurrentRead += bytesToRead; // Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method above }
4.runloop