zoukankan      html  css  js  c++  java
  • 【领略RxSwift源码】- 变换操作(Operators)

    上一篇中,我们分析了在RxSwift中的整个订阅流程。在开讲变换操作之前,首先要弄清楚Sink的概念,不清楚的同学可以翻看上一篇的分析。简单的来说,在每一次订阅操作之前都会进行一次Sink对流的操作。如果把Rx中的流当做水,那么Sink就相当于每个水管水龙头的滤网,在出水之前进行最后的加工。

     
    Sink.png

    每一次进行subscribe,可以类比成出水,在每一次出水之前,RxSwift都会做一件事情:

        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    

    通过上面的源码我们可以发现,每当一个Observable被订阅,那么该Observable一定会执行run方法,而run方法中做的事情就是Sink的相关处理操作。

    简单的来说Sink主要做两件事情:

    1. 对Next、Complete、Error事件的转发;
    2. 对流转发之前的预先变化。

    而我们的变换操作基本上都是在各种各样的Sink中操作的,为什么说是基本上呢?因为在一些高阶变化(嵌套变换)的情况之下,Sink并不是发生变换的地方,具体的情况在下文会慢慢说到。

    例子

    下面是最简单的一个示例代码,我们先从最常见的map出发,让我们来看看Krunoslav Zaher是如何处理map的。

    Observable.of(1, 2, 3)
        .map { $0 * $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    

    我们可以在map方法之上卡一个断点,程序运行之后我们可以看到停在了下面的方法定义。

    extension ObservableType {
        public func map<R>(_ transform: @escaping (E) throws -> R)
            -> Observable<R> {
            return self.asObservable().composeMap(transform)
        }
    }
    

    我们可以看到,这里做了两件事情,首先确保把调用者转化成Observable,因为符合ObservableType的对象有可能是ControlEventControlProperty之类的东西。然后调用composeMap方法,将我们所期望的变换操作的闭包传入。

    OK,我们再进一层,来看看composeMap做了什么:

    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
            return _map(source: self, transform: transform)
        }
    

    我们可以看到,在这里Observable调用了自身的_map私有方法:

    internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
        return Map(source: source, transform: transform)
    }
    
    final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
        typealias Transform = (SourceType) throws -> ResultType
        private let _source: Observable<SourceType>
        private let _transform: Transform
    
        init(source: Observable<SourceType>, transform: @escaping Transform) {
            _source = source
            _transform = transform
        }
        override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
            let originalSelector = _transform
            return Map<SourceType, R>(source: _source, transform: { (s: SourceType) throws -> R in
                let r: ResultType = try originalSelector(s)
                return try selector(r)
            })
        }
        
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
            let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
            let subscription = _source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    

    我们可以看到,所谓的_map实际上又返回了一个基于Producer类(Producer继承自Observable,而Observable类中又是最开始定义composeMap的地方,这个集成链对于接下来的理解很重要)的Map对象。这里主要做了三件事情:

    1. 首先把通过构造器把“可观察序列”和“变换操作”保存起来备用。
    2. 重写父类的composeMap,从原来的直接使用传入的“变换操作”(transform)构造Map对象变成了先使用Map对象自带的“变换操作”进行一次变换,再使用传入的“变换操作”进行一次变换。这样的递归处理方式就可以达到嵌套处理map操作的目的,就像这样:Observable<Int>.of(1, 3, 5).map(+).map(+)
    3. 重写父类的run方法,就像前文中说的那样,run方法会在订阅之前执行,并且使用各类的Sink在传递数据时对“数据源”进行各类的加工处理。而在这个例子中,这个Sink就是MapSink,这个MapSink在每次的Next事件的时候,使用传入的transform对数据源进行加工,然后再将加工后的数据源传出。

    至此所有的map操作已经全部完成。我们可以看到,map的操作其实是“惰性”的,也就是说,当你使用了map操作除非你使用了嵌套map或者对观察序列进行了订阅,否则他们都不会立刻执行变换操作。

    生产者-消费者模式

    在RxSwift的设计实现过程中,其实也是对生产者-消费者模式(Producer–consumer pattern)实际应用。在RxSwift中,所有的可观察序列都充当着生产者的作用,所以我们可以变换操作最后返回的都是一个继承自Producer类的一个子类(除了一些Subject,Subject比较特殊,之后会好好讨论一下)。

     
    生产者继承概览.png

    上面的脑图大概展示了Producer所派生的子类,我们可以看到,无论是我们常用的“初始化”方法:justoffrom,还是我们常用的变换方法:mapflatMap,merge,他们所对应的实现都是一种Producer

    我们可以看到,也正是得益于生产者-消费者模式的实现,使得RxSwift在可观察序列如同工厂里的流水线一样,可以在每一条流水线结束之前进行自定义的加工。

     
    Work Flow.png

    总结

    接下来我们可以俯瞰一下RxSwift对于事件变换的操作,以下做一些逻辑上的梳理工作,希望大家可以看的更加清楚:

    1. 协议拓展

    从一个协议开始。 ---- WWDC 2015

    我们知道,RxSwift的可观察序列都是基于ObservableType,所以当我们需要给所有的可观察序列添加一个变换操作的时候,我们只需要通过extension来添加一个公开的方法,然后去实现它。

    extension ObservableType {
    
        public func map<R>(_ transform: @escaping (E) throws -> R)
            -> Observable<R> {
            return self.asObservable().composeMap(transform)
        }
    
        public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
            -> Observable<O.E> {
                return FlatMap(source: asObservable(), selector: selector)
        }
    
        public func concat<O: ObservableConvertibleType>(_ second: O) -> Observable<E> where O.E == E {
            return Observable.concat([self.asObservable(), second.asObservable()])
        }
    
        public static func combineLatest<O1: ObservableType, O2: ObservableType>
            (_ source1: O1, _ source2: O2)
                -> Observable<(O1.E, O2.E)> {
            return CombineLatest2(
                source1: source1.asObservable(), source2: source2.asObservable(),
                resultSelector: { ($0, $1) }
            )
        }
    
        // More and more ....
    }
    

    上面我所列出来的代码是我为了集中展示所以放在同一个extension中,在实际的源码中他们都是分散在不同的swift文件中的。所以我们知道,所有我们所使用的变换操作,都是通过extension拓展到ObservableType协议当中的。

    通过翻看源码我们可以看到,上述的变换操作其实都做了一件事情,那就是返回一个Producer的具体子类。比如map返回的是Map类的实例对象,combineLatest返回的是CombineLatest2类的实例对象。

    2. 具象化的Producer

    那么通过拓展方法所返回的Producer的子类又是做了一些什么事情呢?

    首先,具象化的Producer一定会重写override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Accumulate方法,在该方法中,RxSwift通过具象化的Sink来对数据源进行处理,然后让源可观察序列执行订阅。

    其次,Producer在初始化的时候会至少接收两个参数:一个参数是所传递的可观察序列,另外一个参数是所进行变换操作的闭包。当然,有些变换操作可能由于操作的特性而需要三个的参数。比如Scan操作,不仅仅需要闭包accumulator,而且还需要一个seed,这也是由Scan操作的特性所决定了,在这里不多加赘述。当Producer保存了这些变换所必要的参数之后,在run方法中的sink就能够在订阅输出之前执行这些变换,然后输出给订阅者了。

    值得注意的是,由于run方法和subscribe方法之间的递归调用,所以这样的实现模式也天然的支持嵌套的变换操作。

    3. "苦力"Sink

    所以变换的闭包的执行都是在各类的Sink当中,比如MapSink:

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                /// 进行变换操作
                let mappedElement = try _selector(element, try incrementChecked(&_index))
                /// 将变换操作之后的事件转发给原来的观察者
                forwardOn(.next(mappedElement))
            }
            catch let e {
                forwardOn(.error(e))
                dispose()
            }
        case .error(let error):
            forwardOn(.error(error))
            dispose()
        case .completed:
            forwardOn(.completed)
            dispose()
        }
    }
    

    我们可以看到,在这里我们终于进行了变换操作,并且变换操作之后将结果转发给了观察者。

    至此,整条变换链都转换完毕。

    设计的遗憾

    composeMap的定义方法之上,我们可以看到如下的一段注释:

        // this is kind of ugly I know :(
        // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
        /// Optimizations for map operator
    

    在上一节的总结中我们知道,在RxSwift中的变换操作的嵌套是通过run方法和subscribe方法的递归调用来解决的。但是这里存在问题,比如,当你嵌套10个map方法的时候,每次发生onNext都会导致10次的变换操作的递归调用,然后再生成最后的值传递给订阅者。用简单的函数式的表达就像这样:

    10(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1) = 20
    

    那么,我们为什么不可以直接这样呢?

    10(+10) = 20
    

    基于这样的考虑,我们可以看到map的默认实现比较特殊,它并不是直接返回一个Map对象,而是通过composeMap返回一个Map对象,然后再在Map对象中重写composeMap以达到当发生嵌套调用的时候可以优化函数式调用:

    final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
    
        // ....
    
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
            let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
            let subscription = _source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    

    也正是为了这样的一个优化,导致似乎看起来很ugly,这也是设计上的遗憾吧。



    作者:Maru
    链接:https://www.jianshu.com/p/a11234b7a089
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    ClickOnce發布經驗
    reporting Server組件不全引起的致命錯誤
    異步調用
    Usercontrol Hosted in IE
    MATLAB命令大全(转载)
    一种保护眼睛的好方法
    关于oracle自动编号
    An Algorithm Summary of Programming Collective Intelligence (1)
    An Algorithm Summary of Programming Collective Intelligence (3)
    An Algorithm Summary of Programming Collective Intelligence (4)
  • 原文地址:https://www.cnblogs.com/feng9exe/p/9083834.html
Copyright © 2011-2022 走看看