zoukankan      html  css  js  c++  java
  • 【领略RxSwift源码】- 变换操作(Operators)

    上一篇中,我们分析了在RxSwift中的整个订阅流程。在开讲变换操作之前,首先要弄清楚Sink的概念,不清楚的同学可以翻看上一篇的分析。简单的来说,在每一次订阅操作之前都会进行一次Sink对流的操作。如果把Rx中的流当做水,那么Sink就相当于每个水管水龙头的滤网,在出水之前进行最后的加工。

     
    Sink.png

    每一次进行subscribe,可以类比成出水,在每一次出水之前,RxSwift都会做一件事情:

        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    

    通过上面的源码我们可以发现,每当一个Observable被订阅,那么该Observable一定会执行run方法,而run方法中做的事情就是Sink的相关处理操作。

    简单的来说Sink主要做两件事情:

    1. 对Next、Complete、Error事件的转发;
    2. 对流转发之前的预先变化。

    而我们的变换操作基本上都是在各种各样的Sink中操作的,为什么说是基本上呢?因为在一些高阶变化(嵌套变换)的情况之下,Sink并不是发生变换的地方,具体的情况在下文会慢慢说到。

    例子

    下面是最简单的一个示例代码,我们先从最常见的map出发,让我们来看看Krunoslav Zaher是如何处理map的。

    Observable.of(1, 2, 3)
        .map { $0 * $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    

    我们可以在map方法之上卡一个断点,程序运行之后我们可以看到停在了下面的方法定义。

    extension ObservableType {
        public func map<R>(_ transform: @escaping (E) throws -> R)
            -> Observable<R> {
            return self.asObservable().composeMap(transform)
        }
    }
    

    我们可以看到,这里做了两件事情,首先确保把调用者转化成Observable,因为符合ObservableType的对象有可能是ControlEventControlProperty之类的东西。然后调用composeMap方法,将我们所期望的变换操作的闭包传入。

    OK,我们再进一层,来看看composeMap做了什么:

    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
            return _map(source: self, transform: transform)
        }
    

    我们可以看到,在这里Observable调用了自身的_map私有方法:

    internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
        return Map(source: source, transform: transform)
    }
    
    final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
        typealias Transform = (SourceType) throws -> ResultType
        private let _source: Observable<SourceType>
        private let _transform: Transform
    
        init(source: Observable<SourceType>, transform: @escaping Transform) {
            _source = source
            _transform = transform
        }
        override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
            let originalSelector = _transform
            return Map<SourceType, R>(source: _source, transform: { (s: SourceType) throws -> R in
                let r: ResultType = try originalSelector(s)
                return try selector(r)
            })
        }
        
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
            let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
            let subscription = _source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    

    我们可以看到,所谓的_map实际上又返回了一个基于Producer类(Producer继承自Observable,而Observable类中又是最开始定义composeMap的地方,这个集成链对于接下来的理解很重要)的Map对象。这里主要做了三件事情:

    1. 首先把通过构造器把“可观察序列”和“变换操作”保存起来备用。
    2. 重写父类的composeMap,从原来的直接使用传入的“变换操作”(transform)构造Map对象变成了先使用Map对象自带的“变换操作”进行一次变换,再使用传入的“变换操作”进行一次变换。这样的递归处理方式就可以达到嵌套处理map操作的目的,就像这样:Observable<Int>.of(1, 3, 5).map(+).map(+)
    3. 重写父类的run方法,就像前文中说的那样,run方法会在订阅之前执行,并且使用各类的Sink在传递数据时对“数据源”进行各类的加工处理。而在这个例子中,这个Sink就是MapSink,这个MapSink在每次的Next事件的时候,使用传入的transform对数据源进行加工,然后再将加工后的数据源传出。

    至此所有的map操作已经全部完成。我们可以看到,map的操作其实是“惰性”的,也就是说,当你使用了map操作除非你使用了嵌套map或者对观察序列进行了订阅,否则他们都不会立刻执行变换操作。

    生产者-消费者模式

    在RxSwift的设计实现过程中,其实也是对生产者-消费者模式(Producer–consumer pattern)实际应用。在RxSwift中,所有的可观察序列都充当着生产者的作用,所以我们可以变换操作最后返回的都是一个继承自Producer类的一个子类(除了一些Subject,Subject比较特殊,之后会好好讨论一下)。

     
    生产者继承概览.png

    上面的脑图大概展示了Producer所派生的子类,我们可以看到,无论是我们常用的“初始化”方法:justoffrom,还是我们常用的变换方法:mapflatMap,merge,他们所对应的实现都是一种Producer

    我们可以看到,也正是得益于生产者-消费者模式的实现,使得RxSwift在可观察序列如同工厂里的流水线一样,可以在每一条流水线结束之前进行自定义的加工。

     
    Work Flow.png

    总结

    接下来我们可以俯瞰一下RxSwift对于事件变换的操作,以下做一些逻辑上的梳理工作,希望大家可以看的更加清楚:

    1. 协议拓展

    从一个协议开始。 ---- WWDC 2015

    我们知道,RxSwift的可观察序列都是基于ObservableType,所以当我们需要给所有的可观察序列添加一个变换操作的时候,我们只需要通过extension来添加一个公开的方法,然后去实现它。

    extension ObservableType {
    
        public func map<R>(_ transform: @escaping (E) throws -> R)
            -> Observable<R> {
            return self.asObservable().composeMap(transform)
        }
    
        public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
            -> Observable<O.E> {
                return FlatMap(source: asObservable(), selector: selector)
        }
    
        public func concat<O: ObservableConvertibleType>(_ second: O) -> Observable<E> where O.E == E {
            return Observable.concat([self.asObservable(), second.asObservable()])
        }
    
        public static func combineLatest<O1: ObservableType, O2: ObservableType>
            (_ source1: O1, _ source2: O2)
                -> Observable<(O1.E, O2.E)> {
            return CombineLatest2(
                source1: source1.asObservable(), source2: source2.asObservable(),
                resultSelector: { ($0, $1) }
            )
        }
    
        // More and more ....
    }
    

    上面我所列出来的代码是我为了集中展示所以放在同一个extension中,在实际的源码中他们都是分散在不同的swift文件中的。所以我们知道,所有我们所使用的变换操作,都是通过extension拓展到ObservableType协议当中的。

    通过翻看源码我们可以看到,上述的变换操作其实都做了一件事情,那就是返回一个Producer的具体子类。比如map返回的是Map类的实例对象,combineLatest返回的是CombineLatest2类的实例对象。

    2. 具象化的Producer

    那么通过拓展方法所返回的Producer的子类又是做了一些什么事情呢?

    首先,具象化的Producer一定会重写override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Accumulate方法,在该方法中,RxSwift通过具象化的Sink来对数据源进行处理,然后让源可观察序列执行订阅。

    其次,Producer在初始化的时候会至少接收两个参数:一个参数是所传递的可观察序列,另外一个参数是所进行变换操作的闭包。当然,有些变换操作可能由于操作的特性而需要三个的参数。比如Scan操作,不仅仅需要闭包accumulator,而且还需要一个seed,这也是由Scan操作的特性所决定了,在这里不多加赘述。当Producer保存了这些变换所必要的参数之后,在run方法中的sink就能够在订阅输出之前执行这些变换,然后输出给订阅者了。

    值得注意的是,由于run方法和subscribe方法之间的递归调用,所以这样的实现模式也天然的支持嵌套的变换操作。

    3. "苦力"Sink

    所以变换的闭包的执行都是在各类的Sink当中,比如MapSink:

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                /// 进行变换操作
                let mappedElement = try _selector(element, try incrementChecked(&_index))
                /// 将变换操作之后的事件转发给原来的观察者
                forwardOn(.next(mappedElement))
            }
            catch let e {
                forwardOn(.error(e))
                dispose()
            }
        case .error(let error):
            forwardOn(.error(error))
            dispose()
        case .completed:
            forwardOn(.completed)
            dispose()
        }
    }
    

    我们可以看到,在这里我们终于进行了变换操作,并且变换操作之后将结果转发给了观察者。

    至此,整条变换链都转换完毕。

    设计的遗憾

    composeMap的定义方法之上,我们可以看到如下的一段注释:

        // this is kind of ugly I know :(
        // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
        /// Optimizations for map operator
    

    在上一节的总结中我们知道,在RxSwift中的变换操作的嵌套是通过run方法和subscribe方法的递归调用来解决的。但是这里存在问题,比如,当你嵌套10个map方法的时候,每次发生onNext都会导致10次的变换操作的递归调用,然后再生成最后的值传递给订阅者。用简单的函数式的表达就像这样:

    10(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1) = 20
    

    那么,我们为什么不可以直接这样呢?

    10(+10) = 20
    

    基于这样的考虑,我们可以看到map的默认实现比较特殊,它并不是直接返回一个Map对象,而是通过composeMap返回一个Map对象,然后再在Map对象中重写composeMap以达到当发生嵌套调用的时候可以优化函数式调用:

    final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
    
        // ....
    
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
            let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
            let subscription = _source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    

    也正是为了这样的一个优化,导致似乎看起来很ugly,这也是设计上的遗憾吧。



    作者:Maru
    链接:https://www.jianshu.com/p/a11234b7a089
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    Js获取当前日期时间及其它操作
    OpenResty
    Nginx开发从入门到精通
    TengineWeb服务器项目
    VS2012的SVN插件VISUALSVN
    VS项目如何运用svn的忽略列表
    SVN 中trunk、branches、tags都什么意思?
    SVN服务器搭建和使用(一)
    逗号分隔字符串转换为一张表--解决查询in(逗号分隔字符串)出错问题
    判断函数是否存在、判断函数是否存在并执行
  • 原文地址:https://www.cnblogs.com/feng9exe/p/9083834.html
Copyright © 2011-2022 走看看