zoukankan      html  css  js  c++  java
  • 学习RxJS: 导入

    原文地址:http://www.moye.me/2016/05/31/learning_rxjs_part_one_preliminary/

    引子

    新手们在异步编程里跌倒时,永远会有这么一个经典问题:怎么在一次异步调用里return一个结果啊?

    老司机说要用回调函数,然后有条件判断的嵌套回调(回调地狱)问题来了;

    老司机推荐用事件,然后异步流程里有顺序依赖;

    老司机推荐用Promise,然后有顺序依赖的流程里,居然还想订阅事件;

    老司机建议试试协程,谁知对方想要合并两个异步调用;

    ……

    以上,是异步编程里要面对的一些难题,也是ReactiveX API 所致力解决的

    是什么

    知道有 ReactiveX 这么一回事, 源于一位巨硬铁粉的安利演示:Reactive LINQ 加持的C#,简洁且颇具表达力;随后,便是万众瞩目的 Angular 2,这货的标配大礼包里就有RxJS,比比皆是的 api.invocation.map(...).subscribe(fn, fn, fn) 片断,让jQuery青年们一头雾水。

    落伍总是不好的,林子里的鸟都在讨论FRP时,我们也要跟上:

    Rx_Logo_SReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io

    

    概念

    ReactiveX 的自述是  “An API for asynchronous programming with observable streams”,那么,什么是Observable,什么又是Stream呢?

    Stream

    Erik Meijer发过一篇paper: “Your Mouse Is a Database”,大概意思是说,用户的鼠标点击其实是一个无穷而实时的事件序列,可以将其视为一个与时间线相关的数据流,我们 可以查询并操作这个数据流,在它可用时(或者等待数据流可用):

    flowing_sequence_of_data

     在 Rx 编程中,任何数据都可以被表达为数据流的形式,我们要做的是,对数据流进行订阅、查询、过滤、打平、归并等各种操作。

    由于是对 数据流序列 进行订阅(观察),Rx 的编程模型实际是基于 Observer pattern 和 Iterator pattern 这两种设计模式构建的。

    Observable

    在Rx中,Observable就是一个序列,它按序对订阅方(subscriber)进行值的推送,遵循一套 “Don’t call us; we’ll call you.” 的基本法。

    基于 Observable 的模式, 和传统的 Observer pattern有两点本质的不同:

    1. Observable 只在至少有一个订阅者时,数据才开始流动
    2. 在数据流结束(iterator 不再hasNext)时,Observable会发出通知(onCompleted)

    怎么用

    程序员们大都被调教成了马基雅维利主义者:“整点有用的”。所以,还是来看看RxJS怎么用吧:

    场景

    我有一个C类局域网,想要挨个ping一下网络内的设备,看看哪些IP在线(并不知道DHCP的客户端列表,所以得从 xxx.xxx.xxx.2 ping到 xxx.xxx.xxx.254)

    思路

    很明显,ping 是一个异步的操作,这里大概有 254 - 2 = 252 个异步操作,难点不在于异步,而在于流程控制,在RxJS 里,可以很方便的把Observable源进行归并(merge),从而让异步数据流可控且有序

    RxJS方案

    依赖
    "dependencies": {
        "ping": "^0.1.10",
        "ramda": "^0.21.0",
        "rx": "^4.1.0",
    }
    代码
    var Rx = require('rx')
    var R = require('ramda')
    var pingCommand = require('ping')
    
    var config = {
      timeout: 10,       // 超时为10秒
      extra: ["-i 2"],   // 每次发包间隔时长
    }
    function promisablePing(host) {
      return new Promise((resolve, reject) => {
        pingCommand.sys.probe(host
          , isAlive => isAlive ? resolve(host) : reject(`${host}: unreachable.`)
          , config)
      })
    }
    function ping(host) {
      return Rx.Observable.create(observer => {
        return promisablePing(host)
          .then(host => observer.onNext(host))
          .then(_ => observer.onCompleted())
          .catch(err => observer.onError(err))
      })
    }
    
    var tasks = R.range(2, 254).map(i => ping(`192.168.50.${i}`))
    Rx.Observable
      .merge(...tasks)
      .subscribe(
        host => console.log(`pong: ${host}`),
        err => console.error(err)
      )

    说明

    代码足够简单,值得说明的是:

      1. Rx.Observable.create会创建一个Observable对象,对它进行订阅的观察者即create函数中回调的入值observer,观察者有三个方法:onNext, onCompleted, and onError:
        1. onNext :在序列推送一个新值时被调用,对应到观察者的subscribe第一个函参
        2. onCompleted:序列中已无值可用,对应到观察者的subscribe第三个函参
        3. onError:序列中发生了错误,对应到观察者的subscribe第二个函参
      2. merge合并后的操作流,是一个对IP在:192.168.50.2 - 254 范围内的设备进行Ping的操作序列,但是Observable有一个特点,就是任何时候触发了 错误回调(即Rx.Observable.create创建那个的观察者,进行了onError通知,从而触发了消费者提供给subscribe函数第二个参数)那么整个Observable序列就此结束。比如,我的C类子网就两台设备在线:xxx.xxx.xxx..100 和 xxx.xxx.xxx.200,然后 xxx.xxx.xxx.2 在10秒后超时报错,那这条Observable时间线看起来就是这样的:
        BEGIN-> .100-.200---------------------[.2 error] ->END
      3. Ramda 是一个优秀的函数式JS库,当然,用成了lodash也不坏

    小结

    以上,只是冰山一角,下回,想聊聊基于RxJS的Web框架:Cycle.js

     

    更多文章请移步我的blog新地址: http://www.moye.me/ 

  • 相关阅读:
    java 深克隆(深拷贝)与浅克隆(拷贝)详解
    设计模式之单例模式
    设计模式之工厂模式
    批量下载google 字体小工具
    LBPL--基于Asp.net、 quartz.net 快速开发定时服务的插件化项目
    测试
    WCF 生产json对外的接口
    四舍五入小算法 (以前写的,采用拆分)
    自己动手写控件(模仿mvc htmlhelper的类)
    步骤详解安装Apache web服务器
  • 原文地址:https://www.cnblogs.com/moye/p/learning_rxjs_part_one_preliminary.html
Copyright © 2011-2022 走看看