zoukankan      html  css  js  c++  java
  • 第3月第1天 GCDAsyncSocket dispatch_source_set_event_handler runloop

    + (void)startCFStreamThreadIfNeeded
    {
    	LogTrace();
    	
    	static dispatch_once_t predicate;
    	dispatch_once(&predicate, ^{
    		
    		cfstreamThreadRetainCount = 0;
    		cfstreamThreadSetupQueue = dispatch_queue_create("GCDAsyncSocket-CFStreamThreadSetup", DISPATCH_QUEUE_SERIAL);
    	});
    	
    	dispatch_sync(cfstreamThreadSetupQueue, ^{ @autoreleasepool {
    		
    		if (++cfstreamThreadRetainCount == 1)
    		{
    			cfstreamThread = [[NSThread alloc] initWithTarget:self
    			                                         selector:@selector(cfstreamThread)
    			                                           object:nil];
    			[cfstreamThread start];
    		}
    	}});
    }
    
    
    + (void)cfstreamThread { @autoreleasepool
    {
    	[[NSThread currentThread] setName:GCDAsyncSocketThreadName];
    	
    	LogInfo(@"CFStreamThread: Started");
    	
    	// We can't run the run loop unless it has an associated input source or a timer.
    	// So we'll just create a timer that will never fire - unless the server runs for decades.
    	[NSTimer scheduledTimerWithTimeInterval:[[NSDate distantFuture] timeIntervalSinceNow]
    	                                 target:self
    	                               selector:@selector(ignore:)
    	                               userInfo:nil
    	                                repeats:YES];
    	
    	NSThread *currentThread = [NSThread currentThread];
    	NSRunLoop *currentRunLoop = [NSRunLoop currentRunLoop];
    	
    	BOOL isCancelled = [currentThread isCancelled];
    	
    	while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])
    	{
    		isCancelled = [currentThread isCancelled];
    	}
    	
    	LogInfo(@"CFStreamThread: Stopped");
    }}
    
    
    - (BOOL)addStreamsToRunLoop
    {
    	LogTrace();
    	
    	NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
    	NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
    	
    	if (!(flags & kAddedStreamsToRunLoop))
    	{
    		LogVerbose(@"Adding streams to runloop...");
    		
    		[[self class] startCFStreamThreadIfNeeded];
    		[[self class] performSelector:@selector(scheduleCFStreams:)
    		                     onThread:cfstreamThread
    		                   withObject:self
    		                waitUntilDone:YES];
    		
    		flags |= kAddedStreamsToRunLoop;
    	}
    	
    	return YES;
    }
    

      

    一、GCDAsyncSocket的核心就是dispatch_source_set_event_handler

    1.accpet回调

                accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, 0, socketQueue);
                
                int socketFD = socket4FD;
                dispatch_source_t acceptSource = accept4Source;
                
                dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool {
                    
                    LogVerbose(@"event4Block");
                    
                    unsigned long i = 0;
                    unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
                    
                    LogVerbose(@"numPendingConnections: %lu", numPendingConnections);
                    
                    while ([self doAccept:socketFD] && (++i < numPendingConnections));
                }});

    2.read,write回调

     

        readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue);
        writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue);
        
        // Setup event handlers
        
        dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool {
            
            LogVerbose(@"readEventBlock");
            
            socketFDBytesAvailable = dispatch_source_get_data(readSource);
            LogVerbose(@"socketFDBytesAvailable: %lu", socketFDBytesAvailable);
            
            if (socketFDBytesAvailable > 0)
                [self doReadData];
            else
                [self doReadEOF];
        }});
        
        dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool {
            
            LogVerbose(@"writeEventBlock");
            
            flags |= kSocketCanAcceptBytes;
            [self doWriteData];
        }});

    二,缓存区

    1.创建

    GCDAsyncReadPacket没有传入buffer,则readpacket没有缓冲区,socket可读时会放入preBuffer

    - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
    {
        [self readDataToData:data withTimeout:timeout buffer:nil bufferOffset:0 maxLength:0 tag:tag];
    }
    
    - (void)readDataToData:(NSData *)data
               withTimeout:(NSTimeInterval)timeout
                    buffer:(NSMutableData *)buffer
              bufferOffset:(NSUInteger)offset
                       tag:(long)tag
    {
        [self readDataToData:data withTimeout:timeout buffer:buffer bufferOffset:offset maxLength:0 tag:tag];
    }
    
    - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout maxLength:(NSUInteger)length tag:(long)tag
    {
        [self readDataToData:data withTimeout:timeout buffer:nil bufferOffset:0 maxLength:length tag:tag];
    }
    
    - (void)readDataToData:(NSData *)data
               withTimeout:(NSTimeInterval)timeout
                    buffer:(NSMutableData *)buffer
              bufferOffset:(NSUInteger)offset
                 maxLength:(NSUInteger)maxLength
                       tag:(long)tag
    {
        if ([data length] == 0) {
            LogWarn(@"Cannot read: [data length] == 0");
            return;
        }
        if (offset > [buffer length]) {
            LogWarn(@"Cannot read: offset > [buffer length]");
            return;
        }
        if (maxLength > 0 && maxLength < [data length]) {
            LogWarn(@"Cannot read: maxLength > 0 && maxLength < [data length]");
            return;
        }
        
        GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
                                                                  startOffset:offset
                                                                    maxLength:maxLength
                                                                      timeout:timeout
                                                                   readLength:0
                                                                   terminator:data
                                                                          tag:tag];
        
        dispatch_async(socketQueue, ^{ @autoreleasepool {
            
            LogTrace();
            
            if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
            {
                [readQueue addObject:packet];
                [self maybeDequeueRead];
            }
        }});
        
        // Do not rely on the block being run in order to release the packet,
        // as the queue might get released without the block completing.
    }

    三、面向对象封装

    1.socket可读的时候先用preBuffer接收,拷贝到currentRead->buffer中,为生产者-消费者模式.

        GCDAsyncReadPacket *currentRead;
        GCDAsyncWritePacket *currentWrite;
        
    
        
        GCDAsyncSocketPreBuffer *preBuffer;
            uint8_t *buffer;
            
            if (readIntoPreBuffer)
            {
                [preBuffer ensureCapacityForWrite:bytesToRead];
                            
                buffer = [preBuffer writeBuffer];
            }
    
    。。。
    
                int socketFD = (socket4FD == SOCKET_NULL) ? socket6FD : socket4FD;
                
                ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
                LogVerbose(@"read from socket = %i", (int)result);
                
    。。。
    
                    if (readIntoPreBuffer)
                    {
                        // We just read a big chunk of data into the preBuffer
                        
                        [preBuffer didWrite:bytesRead];
                        LogVerbose(@"read data into preBuffer - preBuffer.length = %zu", [preBuffer availableBytes]);
                        
                        // Search for the terminating sequence
                        
                        bytesToRead = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
                        LogVerbose(@"copying %lu bytes from preBuffer", (unsigned long)bytesToRead);
                        
                        // Ensure there's room on the read packet's buffer
                        
                        [currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead];
                        
                        // Copy bytes from prebuffer into read buffer
                        
                        uint8_t *readBuf = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset
                                                                                         + currentRead->bytesDone;
                        
                        memcpy(readBuf, [preBuffer readBuffer], bytesToRead);
                        
                        // Remove the copied bytes from the prebuffer
                        [preBuffer didRead:bytesToRead];
                        LogVerbose(@"preBuffer.length = %zu", [preBuffer availableBytes]);
                        
                        // Update totals
                        currentRead->bytesDone += bytesToRead;
                        totalBytesReadForCurrentRead += bytesToRead;
                        
                        // Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method above
                    }

     4.runloop

     

  • 相关阅读:
    基础知识记录
    不同数据库'查询第几行到第几行记录'脚本的区别
    visual studio自动生成的私有内部字段变量以_为前缀
    ASP.NET Core 集成Prometheus+grafana
    netcore命令行运行程序
    RabbitMQ使用建议
    解决Pg新增数据主键冲突
    .Net Core调用第三方WebService
    .Net Core调用oracle存储过程
    call,apply,bind使用区别
  • 原文地址:https://www.cnblogs.com/javastart/p/6123730.html
Copyright © 2011-2022 走看看