zoukankan      html  css  js  c++  java
  • ReactiveCocoa源码解读(二)

    上一篇解读了ReactiveCocoa的三个重要的类的底层实现,本篇继续。

    一、RACMulticastConnection

    1.应用

    RACMulticastConnection: 用于当一个信号被多次订阅时,为了保证创建信号时,避免多次调用创建信号的block造成副作用,可以使用该类处理,保证创建信号的block执行一次。

    // 创建信号
    RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id subscriber) {
    	NSLog(@"发送请求");
    	[subscriber sendNext:@1];
    	return nil;
    }];
    
    // 创建连接
    RACMulticastConnection *connect = [signal publish];
    
    // 订阅连接的信号
    [connect.signal subscribeNext:^(id x) {
    	NSLog(@"connect 第一次订阅信号: %@", x);
    }];
    
    [connect.signal subscribeNext:^(id x) {
    	NSLog(@"connect 第二次订阅信号: %@", x);
    }];
    
    // 连接
    [connect connect];
    

    2.源码实现

    • 底层原理
    1.创建connect,connect.sourceSignal -> RACSignal(原始信号) connect.signal -> RACSubject
    2.订阅connect.signal,会调用RACSubject的subscribeNext,创建订阅者,而且把订阅者保存起来,不会执行block。
    3.[connect connect]内部会订阅RACSignal(原始信号),并且订阅者是RACSubject
    	3.1.订阅原始信号,就会调用原始信号中的didSubscribe
    	3.2 didSubscribe,拿到订阅者调用sendNext,其实是调用RACSubject的sendNext
    4.RACSubject的sendNext,会遍历RACSubject所有订阅者发送信号。
    	4.1 因为刚刚第二步,都是在订阅RACSubject,因此会拿到第二步所有的订阅者,调用他们的nextBlock
    
    • 创建信号

    + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe

    // RACDynamicSignal.m
    + (RACSignal *)createSignal:(RACDisposable * (^)(id subscriber))didSubscribe {
    	RACDynamicSignal *signal = [[self alloc] init];
    	//将代码块保存到信号里面(但此时仅仅是保存,没有调用),所以信号还是冷信号
    	signal->_didSubscribe = [didSubscribe copy];
    	return [signal setNameWithFormat:@"+createSignal:"];
    }
    
    • 创建连接

    [signal publish]

    // RACSignal+Operations.m
    - (RACMulticastConnection *)publish {
    	// 创建订阅者
    	RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    	// 创建connection,参数是刚才创建的订阅者
    	RACMulticastConnection *connection = [self multicast:subject];
    	return connection;
    }
    
    - (RACMulticastConnection *)multicast:(RACSubject *)subject {
    	[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    	RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    	return connection;
    }
    
    // RACMulticastConnection.m
    - (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    	NSCParameterAssert(source != nil);
    	NSCParameterAssert(subject != nil);
    
    	self = [super init];
    	if (self == nil) return nil;
    	// 保存原始信号
    	_sourceSignal = source;
    	_serialDisposable = [[RACSerialDisposable alloc] init];
    	// 保存订阅者,即_signal是RACSubject对象
    	_signal = subject;
    
    	return self;
    }
    
    • 订阅信号

    (RACDisposable *)subscribeNext:(void (^ )(id x))nextBlock;

    // RACSignal.m
    - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    	NSCParameterAssert(nextBlock != NULL);
    
    	RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    	return [self subscribe:o];
    }
    
    // RACSubscriber.m
    + (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
    	RACSubscriber *subscriber = [[self alloc] init];
    
    	subscriber->_next = [next copy];
    	subscriber->_error = [error copy];
    	subscriber->_completed = [completed copy];
    
    	return subscriber;
    }
    
    // RACSubject.m
    - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    	NSCParameterAssert(subscriber != nil);
    
    	RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    	subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    
    	NSMutableArray *subscribers = self.subscribers;
    	@synchronized (subscribers) {
    		[subscribers addObject:subscriber];
    	}
    	
    	return [RACDisposable disposableWithBlock:^{
    		@synchronized (subscribers) {
    			// Since newer subscribers are generally shorter-lived, search
    			// starting from the end of the list.
    			NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
    				return obj == subscriber;
    			}];
    
    			if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
    		}
    	}];
    }
    
    • 连接信号

    [connect connect];

    // RACMulticastConnection.m
    - (RACDisposable *)connect {
    	BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
    
    	if (shouldConnect) {
    		// 订阅原生信号
    		self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    	}
    
    	return self.serialDisposable;
    }
    
    // RACDynamicSignal.m
    - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    	NSCParameterAssert(subscriber != nil);
    
    	RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    	subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    
    	if (self.didSubscribe != NULL) {
    		RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
    			RACDisposable *innerDisposable = self.didSubscribe(subscriber);
    			[disposable addDisposable:innerDisposable];
    		}];
    
    		[disposable addDisposable:schedulingDisposable];
    	}
    	
    	return disposable;
    }
    
    // RACSubject.m
    - (void)sendNext:(id)value {
    	// 遍历_subscribers数组,执行nextBlock
    	[self enumerateSubscribersUsingBlock:^(id subscriber) {
    		[subscriber sendNext:value];
    	}];
    }
    

    3.流程图

    RACMulticastConnection

    4.总结

    RACMulticastConnection利用RACSubject实现了创建信号的block只执行一次的功能。对于需要对此订阅信号,但是不希望多次创建信号的应用场合,可以RACMulticastConnection解决。

    二、RACCommand

    1.应用

    RACCommand类用来表示动作的执行, 是对动作触发后的连锁事件的封装。常用在封装网络请求,按钮点击等等场合。

    RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(id input) {
    	return [RACSignal createSignal:^RACDisposable *(id subscriber) {
    
    		if (/* DISABLES CODE */ (YES)) {
    		// 正常发送数据,必须发送完成信号
    			[subscriber sendNext:@"Smile"];
    			[subscriber sendCompleted];
    		} else {
    			// 发送错误信号
    			[subscriber sendError:[NSError errorWithDomain:@"Network failed" code:0005 userInfo:nil]];
    		}
    
    		// 信号被销毁前,做一些清理的工作;如果不需要,可以 return nil
    		return [RACDisposable disposableWithBlock:^{
    			NSLog(@"信号被销毁了");
    		}];
    	}];
    }];
    
    // 执行信号并订阅
    [[command execute:nil] subscribeNext:^(id x) {
    	NSLog(@"receive data: %@", x);
    }];
    

    2.源码实现

    RACCommand底层实现

    1. 创建命令,保存signalBlock
    2. 执行命令
    * 2.1 调用signalBlock
    * 2.2 创建connect,传入RACReplaySubject对象,然后连接信号
    3. 订阅信号
    * 3.1 创建订阅者,保存到RACReplaySubject对象的_subscribers数组中
    * 3.2 遍历valuesReceived数组,调用订阅者发送数据
    
    • 创建command

    - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock

    // RACCommand.m
    - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
    	return [self initWithEnabled:nil signalBlock:signalBlock];
    }
    
    - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
    	NSCParameterAssert(signalBlock != nil);
    
    	self = [super init];
    	if (self == nil) return nil;
    
    	_activeExecutionSignals = [[NSMutableArray alloc] init];
    	// 保存创建信号的block
    	_signalBlock = [signalBlock copy];
    	......
    }
    
    • 执行command

    - (RACSignal *)execute:(id)input

    // RACCommand.m
    - (RACSignal *)execute:(id)input {
    	// `immediateEnabled` is guaranteed to send a value upon subscription, so
    	// -first is acceptable here.
    	BOOL enabled = [[self.immediateEnabled first] boolValue];
    	if (!enabled) {
    		NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
    			NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
    			RACUnderlyingCommandErrorKey: self
    		}];
    
    		return [RACSignal error:error];
    	}
    
    	RACSignal *signal = self.signalBlock(input);
    	......
    	// 创建连接,用RACReplaySubject作为订阅者
    	RACMulticastConnection *connection = [[signal
    	subscribeOn:RACScheduler.mainThreadScheduler]
    	multicast:[RACReplaySubject subject]];
    
    	......
    	// 连接信号
    	[connection connect];
    		return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
    }
    
    // RACMulticastConnection.m
    - (RACDisposable *)connect {
    	BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &amp;_hasConnected);
    
    	if (shouldConnect) {
    		// 执行创建信号的block
    		self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    	}
    
    	return self.serialDisposable;
    }
    
    • 订阅command

    - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock

    // RACSignal.m
    - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    	NSCParameterAssert(nextBlock != NULL);
    
    	RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    	return [self subscribe:o];
    }
    
    // RACReplaySubject.m
    - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    	RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
    
    	RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
    		@synchronized (self) {
    			for (id value in self.valuesReceived) {
    				if (compoundDisposable.disposed) return;
    				// 调用订阅者,发送数据 "Smile"
    				[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
    			}
    
    			if (compoundDisposable.disposed) return;
    
    			if (self.hasCompleted) {
    				[subscriber sendCompleted];
    			} else if (self.hasError) {
    				[subscriber sendError:self.error];
    			} else {
    				// 调用父类方法,保存订阅者到_subscribers数组
    				RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
    				[compoundDisposable addDisposable:subscriptionDisposable];
    			}
    		}
    	}];
    
    	[compoundDisposable addDisposable:schedulingDisposable];
    
    	return compoundDisposable;
    }
    

    3.流程图

    RACCommand

    4.总结

    RACCommand用来封装事件时,还可以订阅信号(executionSignals)、订阅最新信号(switchToLatest)、跳过几次信号(skip)或信号是否正在执行(executing),在执行信号时,还可以监听错误信号和完成信号,请参考demo例子。


    ReactiveCocoa框架的源码分析暂告一段落,如有分析不足之处,欢迎互相交流。

    Demo地址:

    RACDemo

  • 相关阅读:
    ZOJ 4097 Rescue the Princess
    最大值最小化 最小值最大化
    SD第九届省赛B题 Bullet
    Euler Circuit UVA
    bzoj 1878
    随笔
    BZOJ
    主席树模板
    AC自动机模板
    BZOJ
  • 原文地址:https://www.cnblogs.com/fishbay/p/7206768.html
Copyright © 2011-2022 走看看