线程
使用Instruments的CPU strategy view查看代码如何在多核CPU中执行。创建线程可以使用POSIX 线程API,或者NSThread(封装POSIX 线程API)。下面是并发4个线程在一百万个数字中找最小值和最大值的pthread例子:
#import <pthread.h> struct threadInfo { uint32_t * inputValues; size_t count; }; struct threadResult { uint32_t min; uint32_t max; }; void * findMinAndMax(void *arg) { struct threadInfo const * const info = (struct threadInfo *) arg; uint32_t min = UINT32_MAX; uint32_t max = 0; for (size_t i = 0; i < info->count; ++i) { uint32_t v = info->inputValues[i]; min = MIN(min, v); max = MAX(max, v); } free(arg); struct threadResult * const result = (struct threadResult *) malloc(sizeof(*result)); result->min = min; result->max = max; return result; } int main(int argc, const char * argv[]) { size_t const count = 1000000; uint32_t inputValues[count]; // 使用随机数字填充 inputValues for (size_t i = 0; i < count; ++i) { inputValues[i] = arc4random(); } // 开始4个寻找最小值和最大值的线程 size_t const threadCount = 4; pthread_t tid[threadCount]; for (size_t i = 0; i < threadCount; ++i) { struct threadInfo * const info = (struct threadInfo *) malloc(sizeof(*info)); size_t offset = (count / threadCount) * i; info->inputValues = inputValues + offset; info->count = MIN(count - offset, count / threadCount); int err = pthread_create(tid + i, NULL, &findMinAndMax, info); NSCAssert(err == 0, @"pthread_create() failed: %d", err); } // 等待线程退出 struct threadResult * results[threadCount]; for (size_t i = 0; i < threadCount; ++i) { int err = pthread_join(tid[i], (void **) &(results[i])); NSCAssert(err == 0, @"pthread_join() failed: %d", err); } // 寻找 min 和 max uint32_t min = UINT32_MAX; uint32_t max = 0; for (size_t i = 0; i < threadCount; ++i) { min = MIN(min, results[i]->min); max = MAX(max, results[i]->max); free(results[i]); results[i] = NULL; } NSLog(@"min = %u", min); NSLog(@"max = %u", max); return 0; }
使用NSThread来写
@interface FindMinMaxThread : NSThread @property (nonatomic) NSUInteger min; @property (nonatomic) NSUInteger max; - (instancetype)initWithNumbers:(NSArray *)numbers; @end @implementation FindMinMaxThread { NSArray *_numbers; } - (instancetype)initWithNumbers:(NSArray *)numbers { self = [super init]; if (self) { _numbers = numbers; } return self; } - (void)main { NSUInteger min; NSUInteger max; // 进行相关数据的处理 self.min = min; self.max = max; } @end //启动一个新的线程,创建一个线程对象 SMutableSet *threads = [NSMutableSet set]; NSUInteger numberCount = self.numbers.count; NSUInteger threadCount = 4; for (NSUInteger i = 0; i < threadCount; i++) { NSUInteger offset = (count / threadCount) * i; NSUInteger count = MIN(numberCount - offset, numberCount / threadCount); NSRange range = NSMakeRange(offset, count); NSArray *subset = [self.numbers subarrayWithRange:range]; FindMinMaxThread *thread = [[FindMinMaxThread alloc] initWithNumbers:subset]; [threads addObject:thread]; [thread start]; }
Grand Central Dispatch
GCD概要
- 和operation queue一样都是基于队列的并发编程API,他们通过集中管理大家协同使用的线程池。
- 公开的5个不同队列:运行在主线程中的main queue,3个不同优先级的后台队列(High Priority Queue,Default Priority Queue,Low Priority Queue),以及一个优先级更低的后台队列Background Priority Queue(用于I/O)
- 可创建自定义队列:串行或并列队列。自定义一般放在Default Priority Queue和Main Queue里。
dispatch_once用法
+ (UIColor *)boringColor; { static UIColor *color; //只运行一次 static dispatch_once_t onceToken; dispatch_once(&onceToken, ^{ color = [UIColor colorWithRed:0.380f green:0.376f blue:0.376f alpha:1.000f]; }); return color; }
延后执行
使用dispatch_after
- (void)foo { double delayInSeconds = 2.0; dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, (int64_t) (delayInSeconds * NSEC_PER_SEC)); dispatch_after(popTime, dispatch_get_main_queue(), ^(void){ [self bar]; }); }
GCD队列
队列默认是串行的,只能执行一个单独的block,队列也可以是并行的,同一时间执行多个block
- (id)init; { self = [super init]; if (self != nil) { NSString *label = [NSString stringWithFormat:@"%@.isolation.%p", [self class], self]; self.isolationQueue = dispatch_queue_create([label UTF8String], 0); label = [NSString stringWithFormat:@"%@.work.%p", [self class], self]; self.workQueue = dispatch_queue_create([label UTF8String], 0); } return self; }
多线程并发读写同一个资源
//创建队列 self.isolationQueue = dispatch_queue_create([label UTF8String], DISPATCH_QUEUE_CONCURRENT); //改变setter - (void)setCount:(NSUInteger)count forKey:(NSString *)key { key = [key copy]; //确保所有barrier都是async异步的 dispatch_barrier_async(self.isolationQueue, ^(){ if (count == 0) { [self.counts removeObjectForKey:key]; } else { self.counts[key] = @(count); } }); }
都用异步处理避免死锁,异步的缺点在于调试不方便,但是比起同步容易产生死锁这个副作用还算小的。
异步API写法
设计一个异步的API调用dispatch_async(),这个调用放在API的方法或函数中做。让API的使用者设置一个回调处理队列
- (void)processImage:(UIImage *)image completionHandler:(void(^)(BOOL success))handler; { dispatch_async(self.isolationQueue, ^(void){ // do actual processing here dispatch_async(self.resultQueue, ^(void){ handler(YES); }); }); }
dispatch_apply进行快速迭代
for (size_t y = 0; y < height; ++y) { for (size_t x = 0; x < width; ++x) { // Do something with x and y here } } //使用dispatch_apply可以运行的更快 dispatch_apply(height, dispatch_get_global_queue(0, 0), ^(size_t y) { for (size_t x = 0; x < width; x += 2) { // Do something with x and y here } });
Block组合
dispatch_group_t group = dispatch_group_create(); dispatch_queue_t queue = dispatch_get_global_queue(0, 0); dispatch_group_async(group, queue, ^(){ // 会处理一会 [self doSomeFoo]; dispatch_group_async(group, dispatch_get_main_queue(), ^(){ self.foo = 42; }); }); dispatch_group_async(group, queue, ^(){ // 处理一会儿 [self doSomeBar]; dispatch_group_async(group, dispatch_get_main_queue(), ^(){ self.bar = 1; }); }); // 上面的都搞定后这里会执行一次 dispatch_group_notify(group, dispatch_get_main_queue(), ^(){ NSLog(@"foo: %d", self.foo); NSLog(@"bar: %d", self.bar); });
如何对现有API使用dispatch_group_t
//给Core Data的-performBlock:添加groups。组合完成任务后使用dispatch_group_notify来运行一个block即可。 - (void)withGroup:(dispatch_group_t)group performBlock:(dispatch_block_t)block { if (group == NULL) { [self performBlock:block]; } else { dispatch_group_enter(group); [self performBlock:^(){ block(); dispatch_group_leave(group); }]; } } //NSURLConnection也可以这样做 + (void)withGroup:(dispatch_group_t)group sendAsynchronousRequest:(NSURLRequest *)request queue:(NSOperationQueue *)queue completionHandler:(void (^)(NSURLResponse*, NSData*, NSError*))handler { if (group == NULL) { [self sendAsynchronousRequest:request queue:queue completionHandler:handler]; } else { dispatch_group_enter(group); [self sendAsynchronousRequest:request queue:queue completionHandler:^(NSURLResponse *response, NSData *data, NSError *error){ handler(response, data, error); dispatch_group_leave(group); }]; } }
注意事项
- dispatch_group_enter() 必须运行在 dispatch_group_leave() 之前。
- dispatch_group_enter() 和 dispatch_group_leave() 需要成对出现的
用GCD监视进程
NSRunningApplication *mail = [NSRunningApplication runningApplicationsWithBundleIdentifier:@"com.apple.mail"]; if (mail == nil) { return; } pid_t const pid = mail.processIdentifier; self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, pid, DISPATCH_PROC_EXIT, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(self.source, ^(){ NSLog(@"Mail quit."); }); //在事件源传到你的事件处理前需要调用dispatch_resume()这个方法 dispatch_resume(self.source);
监视文件夹内文件变化
NSURL *directoryURL; // assume this is set to a directory int const fd = open([[directoryURL path] fileSystemRepresentation], O_EVTONLY); if (fd < 0) { char buffer[80]; strerror_r(errno, buffer, sizeof(buffer)); NSLog(@"Unable to open "%@": %s (%d)", [directoryURL path], buffer, errno); return; } dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fd, DISPATCH_VNODE_WRITE | DISPATCH_VNODE_DELETE, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(source, ^(){ unsigned long const data = dispatch_source_get_data(source); if (data & DISPATCH_VNODE_WRITE) { NSLog(@"The directory changed."); } if (data & DISPATCH_VNODE_DELETE) { NSLog(@"The directory has been deleted."); } }); dispatch_source_set_cancel_handler(source, ^(){ close(fd); }); self.source = source; dispatch_resume(self.source); //还要注意需要用DISPATCH_VNODE_DELETE 去检查监视的文件或文件夹是否被删除,如果删除了就停止监听
GCD版定时器
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,0, 0, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(source, ^(){ NSLog(@"Time flies."); }); dispatch_time_t start dispatch_source_set_timer(source, DISPATCH_TIME_NOW, 5ull * NSEC_PER_SEC,100ull * NSEC_PER_MSEC); self.source = source; dispatch_resume(self.source);
GCD深入操作
- 缓冲区:dispatch_data_t基于零碎的内存区域,使用dispatch_data_apply来遍历,还可以用dispatch_data_create_subrange来创建一个不做任何拷贝的子区域
- I/O调度:使用GCD提供的dispatch_io_read,dispatch_io_write和dispatch_io_close
- 测试:使用dispatch_benchmark小工具
- 原子操作: libkern/OSAtomic.h里可以查看那些函数,用于底层多线程编程。
Operation Queues
- Operation Queue是在GCD上实现了一些方便的功能。
- NSOperationQueue有主队列和自定义队列两种类型队列。主队列在主线程上运行,自定义队列在后台。
- 重写main方法自定义自己的operations。较简单,不需要管理isExecuting和isFinished,main返回时operation就结束了。
@implementation YourOperation - (void)main { // 进行处理 ... } @end
- 重写start方法能够获得更多的控制权,还可以在一个操作中执行异步任务
@implementation YourOperation - (void)start { self.isExecuting = YES; self.isFinished = NO; // 开始处理,在结束时应该调用 finished ... } - (void)finished { self.isExecuting = NO; self.isFinished = YES; } @end //使操作队列有取消功能,需要不断检查isCancelled属性 - (void)main { while (notDone && !self.isCancelled) { // 进行处理 } }
- 定义好operation类以后,将一个operation加到队列里:
NSOperationQueue *queue = [[NSOperationQueue alloc] init]; YourOperation *operation = [[YourOperation alloc] init]; [queue addOperation:operation];
- 如果是在主队列中进行一个一次性任务,可以将block加到操作队列
[[NSOperationQueue mainQueue] addOperationWithBlock:^{ // 代码... }];
- 通过maxConcurrentOperationCount属性控制一个特定队列中并发执行操作的数量。设置为1就是串行队列。
- 对operation优先级排序,指定operation之间的依赖关系。
//确保operation1和operation2是在intermediateOperation和finishOperation之前执行 [intermediateOperation addDependency:operation1]; [intermediateOperation addDependency:operation2]; [finishedOperation addDependency:intermediateOperation];
Run Loops
- Run loop比GCD和操作队列要容易,不必处理并发中复杂情况就能异步执行。
- 主线程配置main run loop,其它线程默认都没有配置run loop。一般都在主线程中调用后分配给其它队列。如果要在其它线程添加run loop至少添加一个input source,不然一运行就会退出。
在后台操作UI
使用操作队列处理
//weak引用参照self避免循环引用,及block持有self,operationQueue retain了block,而self有retain了operationQueue。 __weak id weakSelf = self; [self.operationQueue addOperationWithBlock:^{ NSNumber* result = findLargestMersennePrime(); [[NSOperationQueue mainQueue] addOperationWithBlock:^{ MyClass* strongSelf = weakSelf; strongSelf.textLabel.text = [result stringValue]; }]; }];
drawRect在后台绘制
drawRect:方法会影响性能,所以可以放到后台执行。
//使用UIGraphicsBeginImageContextWithOptions取代UIGraphicsGetCurrentContext:方法 UIGraphicsBeginImageContextWithOptions(size, NO, 0); // drawing code here UIImage *i = UIGraphicsGetImageFromCurrentImageContext(); UIGraphicsEndImageContext(); return i;
可以把这个方法运用到table view中,使table view的cell在滚出边界时能在didEndDisplayingCell委托方法中取消。WWDC中有讲解:Session 211 -- Building Concurrent User Interfaces on iOS https://developer.apple.com/videos/wwdc/2012/
还有个使用CALayer里drawsAsynchronously属性的方法。不过有时work,有时不一定。
网络异步请求
网络都要使用异步方式,但是不要直接使用dispatch_async,这样没法取消这个网络请求。dataWithContentsOfURL:的超时是30秒,那么这个线程需要干等到超时完。解决办法就是使用NSURLConnection的异步方法,把所有操作转化成operation来执行。NSURLConnection是通过run loop来发送事件的。AFNetworking是建立一个独立的线程设置一个非main run loop。下面是处理URL连接重写自定义operation子类里的start方法
- (void)start { NSURLRequest* request = [NSURLRequest requestWithURL:self.url]; self.isExecuting = YES; self.isFinished = NO; [[NSOperationQueue mainQueue] addOperationWithBlock:^ { self.connection = [NSURLConnectionconnectionWithRequest:request delegate:self]; }]; }
重写start方法需要管理isExecuting和isFinished状态。下面是取消操作的方法
- (void)cancel { [super cancel]; [self.connection cancel]; self.isFinished = YES; self.isExecuting = NO; } //连接完成发送回调 - (void)connectionDidFinishLoading:(NSURLConnection *)connection { self.data = self.buffer; self.buffer = nil; self.isExecuting = NO; self.isFinished = YES; }
后台处理I/O
异步处理文件可以使用NSInputStream。官方文档:http://developer.apple.com/library/ios/#documentation/FileManagement/Conceptual/FileSystemProgrammingGUide/TechniquesforReadingandWritingCustomFiles/TechniquesforReadingandWritingCustomFiles.html 实例:https://github.com/objcio/issue-2-background-file-io
@interface Reader : NSObject - (void)enumerateLines:(void (^)(NSString*))block completion:(void (^)())completion; - (id)initWithFileAtPath:(NSString*)path; //采用main run loop的事件将数据发到后台操作线程去处理 - (void)enumerateLines:(void (^)(NSString*))block completion:(void (^)())completion { if (self.queue == nil) { self.queue = [[NSOperationQueue alloc] init]; self.queue.maxConcurrentOperationCount = 1; } self.callback = block; self.completion = completion; self.inputStream = [NSInputStream inputStreamWithURL:self.fileURL]; self.inputStream.delegate = self; [self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode]; [self.inputStream open]; } @end //input stream在主线程中发送代理消息,接着就可以在操作队列加入block操作 - (void)stream:(NSStream*)stream handleEvent:(NSStreamEvent)eventCode { switch (eventCode) { ... case NSStreamEventHasBytesAvailable: { NSMutableData *buffer = [NSMutableData dataWithLength:4 * 1024]; NSUInteger length = [self.inputStream read:[buffer mutableBytes] maxLength:[buffer length]]; if (0 < length) { [buffer setLength:length]; __weak id weakSelf = self; [self.queue addOperationWithBlock:^{ [weakSelf processDataChunk:buffer]; }]; } break; } ... } } //处理数据chunk,原理就是把数据切成很多小块,然后不断更新和处理buffer缓冲区,逐块读取和存入方式来处理大文件响应快而且内存开销也小。 - (void)processDataChunk:(NSMutableData *)buffer; { if (self.remainder != nil) { [self.remainder appendData:buffer]; } else { self.remainder = buffer; } [self.remainder obj_enumerateComponentsSeparatedBy:self.delimiter usingBlock:^(NSData* component, BOOL last) { if (!last) { [self emitLineWithData:component]; } else if (0 < [component length]) { self.remainder = [component mutableCopy]; } else { self.remainder = nil; } }]; }
并发开发会遇到的困难问题
多个线程访问共享资源
比如两个线程都会把计算结果写到一个整型数中。为了防止,需要一种互斥机制来访问共享资源
互斥锁
同一时刻只能有一个线程访问某个资源。某线程要访问某个共享资源先获得共享资源的互斥锁,完成操作再释放这个互斥锁,然后其它线程就能访问这个共享资源。
还有需要解决无序执行问题,这时就需要引入内存屏障。
在Objective-C中如果属性声明为atomic就能够支持互斥锁,但是因为加解锁会有性能代价,所以一般是声明noatomic的。
死锁
当多个线程在相互等待对方锁结束时就会发生死锁,程序可能会卡住。
void swap(A, B) { lock(lockA); lock(lockB); int a = A; int b = B; A = b; B = a; unlock(lockB); unlock(lockA); }
//一般没问题,但是如果两个线程使用相反的值同时调用上面这个方法就可能会死锁。线程1获得X的一个锁,线程2获得Y的一个锁,它们会同时等待另一个锁的释放,但是却是没法等到的。
swap(X, Y); // 线程 1
swap(Y, X); // 线程 2
为了防止死锁,需要使用比简单读写锁更好的办法,比如write preferencehttp://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock,或read-copy-update算法http://en.wikipedia.org/wiki/Read-copy-update
优先级反转
运行时低优先级任务由于先取得了释放了锁的共享资源而阻塞了高优先级任务,这种情况叫做优先级反转
最佳安全实践避免问题的方法
从主线程中取到数据,利用一个操作队列在后台处理数据,完后返回后台队列中得到的数据到主队列中。这样的操作不会有任何锁操作。
并发测试
- 使用SenTestingKit框架测试:https://github.com/nxtbgthng/SenTestingKitAsync
- kiwi:https://github.com/allending/Kiwi
- GHunit:https://github.com/gabriel/gh-unit/