zoukankan      html  css  js  c++  java
  • Future.dart阅读

    Future主要用于处理flutter中的异步输入事件, 通过指定不同的初始化构造函数将异步任务转化为同步执行, 提交到当前Isolate的microTask队列或eventTask队列
    此外还提供了遍历的方法对future进行管理, 错捕捉和拦截,future遍历, 合并, 同时还可以转换为更为灵活的Stream处理。在实际使用中应特殊注意异常的捕捉。它和Timer,Zone, Isolate密切相关.

    Future执行顺序

    根据执行效率来分, Future有3种创建方法, 它们的优先级按如下先后顺序

    1. Future.sync: 绝对同步指令, 在当前函数栈中同步执行, 优先级最高
    2. Future.microTask/Future.value: 优先级相同, FIFO
    3. Future(()=> { }): 异步创建, 优先级最低

    顺序补充说明, 其实在理解上面 1, 2, 3之后就很容推导了

    1. 同优先级别按照先后顺序执行
    2. 同优先级内层嵌套Future优先级比外层底
    3. Future.sync后接then, 它的优先级降级为 mircoTask 级别, 其它的Future创建then的优先级沿用创建时的优先级。

    以下是测试代码:

    void main() {
      print('start ....');
      
      Future.microtask(() => print('microtask0'));
    
      //通过Future.value的方式直接返回一个完成的Future,它直接插入到micoktask中
      Future.value('feature 0').then((value) => print('feature0->1')).then((value) => print('feature0->2'));
    
      //插入到当前Isolate EventLoop的EventQueue中
      Future(()=> print('feature 1'));
      
      //同步执行插入到当前Isolate EventLoop microTask队列中
      Future.microtask(() => print('microtask2'));
    
      //Future.sync为同步执行指令,和函数内的同步方法具有相同优先级
      final syncFuture = Future.sync(() => print('sync feature2'));
      syncFuture.then((value) => print('sync then feature3')).then((value) => print('sync then feature4'));
      Future.microtask(() => print('microtask4'));
    
      //顺序执行,插入到EventQueue中,执行顺序 `feature4` -> `feature5` -> feature6`,小于microTask
      final future4 = Future(() => print('feature4'));
      final microtask = Future.microtask(() => print('microtask5'));
      microtask.then((value) => print('microtask5-0'));
    
      future4.then((value) => print('feature5')).then((value) => print('feature6'));
      Future.microtask(() => print('microtask6'));
      
      //同步等待 `feature7` 和 `feature8` 执行完毕后再执行后面的 `feature9`
      Future.wait([Future(() => print('feature7')), Future(() => print('feature8'))]).then((value) => print('feature 9'));
      
      //同层级的Future执行按先进先出的规则,嵌套的内层feature11,feature12的future要慢于外层 `feature15`
      Future((){
         print('feature 10');
         Future.microtask(() => 'feature 10 mictask');
         Future(() => print('feature11')).then((value) => print('feature 12')); //内层feature11,feature12的future要慢于外层 `feature15`
      }).then((value) => print('feature 13'));
    
      Future(() => print('feature 15'));
    
      Future.microtask(() => print('microtask15')).then((value) => print('microtask15 then'));
    
      Future.microtask(() => print('microtask16'));
    
      print('end ....');
    
    //Log输出
    /**
    start ....
    sync feature2
    end ....
    sync feature3
    sync feature4
    microtask1
    microtask1 then
    microtask2 then
    microtask2
    feature 1
    feature4
    feature5
    feature6
    feature7
    feature8
    feature 9
    feature 10
      */
    }
    

    相关的类介绍

    FutureOr<T> //用于描述一个Future对象,返回Future和Future的值
    
    //T可以是任意的数据类型
    Comparable (dart.core)
        BigInt (dart.core)  //大整数,hexacdemical literal
        DateTime (dart.core)
        Duration (dart.core)
        num (dart.core)
            double (dart.core)
            int (dart.core)
        String (dart.core)
    ````
    
    ``` dart
    Future (dart.async)
        _Future (dart.async)  //一个延迟计算的对象,它是一个抽象类
        SynchronousFuture (synchronous_future.dart) //提供一个同步执行的futrue
        TickerFuture (ticker.dart) //提供一个间隔执行的Future对象,动画vsyn信号回调专用
        DelegatingFuture (future.dart) //对 `_Future` 类的包装,简化用户调用接口
            ResultFuture (future.dart) //用于提供Future value,当future完成时可以同步获取到它对应的value
    

    Future源码解读

    abstract class Future<T> { 
      //在当前函数栈中同步创建一个带有数据的Future,访问时会被提交到microTaskQueue中
      _Future.value(T value) : this.zoneValue(value, Zone.current);
     
      //计算Future执行的命令,如果有嵌套则会异步全部执行完毕
      factory Future(FutureOr<T> computation()) {
        Timer.run(() { //通过Timer插入到EventLoop的EventQueue中,在CPU分配的时间片内执行本次任务
            result._complete(computation());//串联所有的Future,之后计算出最后的结果 
    
      //直接将计算任务添加到当前Isolate的microTaskQueue中
      factory Future.microtask(FutureOr<T> computation()) {
        scheduleMicrotask(() {
            //在本次调度的时间片内,完成计算操作
            result._complete(computation());
       
      //同步执行,直接采用当前函数分配的时间片执行
      factory Future.sync(FutureOr<T> computation()) { ...
          var result = computation();  ...
      
      //提交任务到microTask完成和value绑定
      @pragma("vm:entry-point")
      factory Future.value([FutureOr<T> value]) {
        return new _Future<T>.immediate(value); 
      //提交一个错误的值到 microTaskQueue中
      factory Future.error(Object error, [StackTrace stackTrace]) { ...
        return new _Future<T>.immediateError(error, stackTrace); 
      
      //提交一延迟执行的timer source到 EventLoop的EventQueue,
      factory Future.delayed(Duration duration, [FutureOr<T> computation()]) { ...
        new Timer(duration, () { 
    
      //合并多个Future事件,任何一个错误会导致整个list future抛出异常,通过指定cleanUp可以挽回部分成功的数据
      static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
          {bool eagerError: false, void cleanUp(T successValue)}) { ...
        handleError(Object theError, StackTrace theStackTrace) {  
                    cleanUp(value); ...
          //遍历合并数据
          for (var future in futures) {
            int pos = remaining;
            future.then((T value) { ...
                  result._completeWithValue(values);
      
    
      /**
       * Returns the result of the first future in [futures] to complete.
       *
       * The returned future is completed with the result of the first
       * future in [futures] to report that it is complete,
       * whether it's with a value or an error.
       * The results of all the other futures are discarded.
       *
       * If [futures] is empty, or if none of its futures complete,
       * the returned future never completes.
       */
       //返回第一个future的执行结果
      static Future<T> any<T>(Iterable<Future<T>> futures) {
        var completer = new Completer<T>.sync();
        var onValue = (T value) {
          if (!completer.isCompleted) completer.complete(value);
        };
        var onError = (error, StackTrace stack) {
          if (!completer.isCompleted) completer.completeError(error, stack);
        };
        for (var future in futures) {
          future.then(onValue, onError: onError);
        }
        return completer.future;
      }
     
      //遍历执行 `elements` ,可以支持同步和异步
      static Future forEach<T>(Iterable<T> elements, FutureOr action(T element)) { ...
      
      //配合forEach对elements进行遍历,并执行
      static Future doWhile(FutureOr<bool> action()) {
        _Future doneSignal = new _Future();
        void Function(bool) nextIteration;
        nextIteration = Zone.current.bindUnaryCallbackGuarded((bool keepGoing) {
          while (keepGoing) {
            FutureOr<bool> result;
            try {
              result = action(); //具体实现暴露给开发者,对数组类的元素进行转换 
    
      //callbak注册 订阅成功/失败/完成的几件
      Future<R> then<R>(FutureOr<R> onValue(T value), {Function onError});
      Future<T> catchError(Function onError, {bool test(Object error)});
      Future<T> whenComplete(FutureOr action());
    
      //转化为流事件
      Stream<T> asStream();
    
      //注册timeout事件,通常用于对突发异常进行retry操作
      Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()});
    }
    

    _Future

    它是对 Future 的具体实现, 对future输出的控制

    class _Future<T> implements Future<T> { ...
      //状态标志位,代表了当前 `_future` 执行的状态
      static const int _stateIncomplete = 0;
      static const int _statePendingComplete = 1;
      static const int _stateChained = 2;
      static const int _stateValue = 4;
      static const int _stateError = 8;
    
      //初始化当前future的状态,当有订阅时开始变更
      int _state = _stateIncomplete;
      //当前Future执行的zone,用于提交then的microTask事件,绑定success和error事件
      final Zone _zone; 
    
      //存储当前的result或listeners,中间存储值
      @pragma("vm:entry-point")
      var _resultOrListeners;
    
      
      static List<Function> _continuationFunctions(_Future<Object> future) {
        List<Function> result = null;
        while (true) {
          //当pendding的时候才能添加future的listener
          if (future._mayAddListener) return result; ...
          //遍历执行listenr的绑定操作,Future每次listener操作完成之后返回一个新的Future用于串联future
            (result ??= <Function>[]).add(listener.handleValue);
            future = listener.result; 
      
    
      Future<R> then<R>(FutureOr<R> f(T value), {Function onError}) { ...
         //绑定成功的订阅事件到CurrentZone
          f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f); 
          //选择性绑定onError事件到CurrentZone
          onError = _registerErrorHandler(onError, currentZone); 
        }
        //添加listener,每个listen持有了 then注册的回掉函数,并且在执行完成之后会返回一个新的Future,这也是为什么then能够连续串联好多个,由于 `f` 绑定的是同一个zone,所以他们执行在同一个zone下
        _Future<R> result = new _Future<R>();
        _addListener(new _FutureListener<T, R>.then(result, f, onError)); ...
    
      /// Registers a system created result and error continuation.
      ///
      /// Used by the implementation of `await` to listen to a future.
      /// The system created liseners are not registered in the zone,
      /// and the listener is marked as being from an `await` .
      /// This marker is used in [_continuationFunctions].
      //注册处一个系统回调事件,返回result由 `await` 关键字决定, `await future` 就相当于执行这个方法
      Future<E> _thenAwait<E>(FutureOr<E> f(T value), Function onError) {
        _Future<E> result = new _Future<E>();
        _addListener(new _FutureListener<T, E>.thenAwait(result, f, onError));
        return result;
      }
    
      Future<T> catchError(Function onError, {bool test(error)}) {  
      Future<T> whenComplete(dynamic action()) { 
      
      //转换成一个future对象
      Stream<T> asStream() => new Stream<T>.fromFuture(this);
      
      //添加listener对象
      void _addListener(_FutureListener listener) {...
          _zone.scheduleMicrotask(() {  ... 
    
       //反转链表,确保先添加的先执行FIFO
      _FutureListener _reverseListeners(_FutureListener listeners) { ...
    
      //传递future用
      static void _chainForeignFuture(Future source, _Future target) { ...
      static void _chainCoreFuture(_Future source, _Future target) { ...
      
      void _complete(FutureOr<T> value) { 
      /**
       * Propagates the value/error of [source] to its [listeners], executing the
       * listeners' callbacks.
       */
       //在future完成时遍历listener发送事件给订阅者
      static void _propagateToListeners(_Future source, _FutureListener listeners) {
        while (true) { ...
          assert(source._isComplete);  ...
          while (listeners._nextListener != null) { ...
            void handleValueCallback() { ...
                listenerValueOrError = listener.handleValue(sourceResult);
            void handleError() {
                  listenerValueOrError = listener.handleError(asyncError);
      
      //注册timeout订阅事件
      Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) { ...
          timer = new Timer(timeLimit, () { ...
              result._complete(zone.run(onTimeout)); ..
              result._completeError(e, s); 
        //自动实现了它的链式订阅
        this.then((T v) { ...
            result._completeWithValue(v);
            result._completeError(e, s); 
    

    SynchronousFuture

    从名字上看, 它是一个同步执行的future

    class SynchronousFuture<T> implements Future<T> { ...
      @override
      Future<T> catchError(Function onError, { bool test(Object error) }) => Completer<T>().future;
      
      //f函数执行执行,没有绑定zone的复杂流程,Future.sync
      @override
      Future<E> then<E>(FutureOr<E> f(T value), { Function onError }) {...
        final dynamic result = f(_value);   
      @override
      Future<T> whenComplete(FutureOr<dynamic> action()) { ...
    }
    

    DelegatingFuture

    代理Future, ResultFuture继承于它, 提供了 Future直接访问reuslt的能力, 具体实现在 Result 类中

    class DelegatingFuture<T> implements Future<T> { ...
      DelegatingFuture(this._future);
      @override
      Stream<T> asStream() => _future.asStream(); ...
      @override
      Future<T> catchError(Function onError, {bool Function(Object error) test}) =>  ...
      @override
      Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) => ...
      @override
      Future<T> whenComplete(FutureOr Function() action) => ...
      @override
      Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout})  ...
    
    class ResultFuture<T> extends DelegatingFuture<T> { 
      bool get isComplete => result != null; 
      Result<T> get result => _result;
      Result<T> _result;
    
      ResultFuture(Future<T> future) : super(future) {
        Result.capture(future).then((result) {
          _result = result;
        });
      }
    }
    
    abstract class Result<T> { ....
      final T value; ...
      static Future<Result<T>> capture<T>(Future<T> future) {
        return future.then((value) => ValueResult(value),
            onError: (error, StackTrace stackTrace) =>
                ErrorResult(error, stackTrace));
      }
    
    

    Completer

    用来包装future, 避免then嵌套, 优化代码结构

    Completer (dart.async)
        _Completer (dart.async)
            _AsyncCompleter (dart.async)
            _SyncCompleter (dart.async)
    

    与FutureBuilder的结合使用

    • FutureBuilder用于实现一个异步构建的widget, 根据当前异步事件执行的状态显示不同的widget, 在实际工作中会经常用到, 它的本质就是将 需要传递的数据分成不同的的阶段获取, 然后再拿到每个阶段的数据后调用setState去触发Widget的build, 实际的widget封装在它的构造然后中,

    • 下面是一个简易的FutureBuider, 只是为了理解其中的运作流程, 具体细节暂时忽略

    //提供构建的四个阶段的数据,根据实际需要还可以定义得更多。
    enum FutureStatus {
      initial,
      done,
      error,
      pending,
    }
    //包装状态和数据,用于构建每个阶段的widget
    class FutureSnapshot {
      dynamic data;
      FutureStatus status;
      FutureSnapshot({this.data, this.status});
    }
    
    typedef WidgetBuilder = Widget Function(BuildContext, FutureSnapshot);
    
    class CustomFutureBuilder extends StatefulWidget {
      final Future future;
      final WidgetBuilder builder;
      CustomFutureBuilder({this.future, this.builder});
      @override
      CustomFutureBuilderState createState() => CustomFutureBuilderState();
    }
    
    class CustomFutureBuilderState extends State<CustomFutureBuilder> {
      FutureSnapshot snapshot;
    
      @override
      Widget build(BuildContext context) => widget.builder(context, snapshot);
      @override
      void initState() {
        super.initState();
        snapshot = FutureSnapshot(status: FutureStatus.initial, data: null); //初始化阶段,
        subscribe();
      }
    
      void subscribe() {
        widget.future.then((value) {
          setState(() {
            snapshot = FutureSnapshot(status: FutureStatus.initial, data: value); //数据加载成功阶段
          });
        }, onError: (error, stackTrace) {
          setState(() {
            snapshot = FutureSnapshot(
                status: FutureStatus.initial,
                data: FlutterErrorDetails(exception: error, stack: stackTrace)); //数据加载失败阶段
          });
        });
        snapshot = FutureSnapshot(
            status: FutureStatus.initial, data: FlutterErrorDetails()); //等待阶段
      }
    }
    
    class CustomFutureBuilderDemo extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return CustomFutureBuilder(     //具体使用.
            future: Future.value('xxxxx'),
            builder: (contxt, snapShot) {
              switch (snapShot.status) {
                case FutureStatus.error:
                  return Text('error');
                case FutureStatus.initial:
                  return Text('initial');
                case FutureStatus.done:
                  return Text('done');
                case FutureStatus.pending:
                  return CircularProgressIndicator();
              }
              return Container();
            });
      }
    }
    

    上面的例子主要介绍了FutureBuilder的一些基本功能, 在实际的使用中, 还需要考虑到widget的刷新, future的变化, 如何去解除订阅, 重新初始化构建树, 下面是标准的futurebuilder实现。

    class _FutureBuilderState<T> extends State<FutureBuilder<T>> {
     
      Object _activeCallbackIdentity; //增加了Callback标志符,来控制future的同步
      AsyncSnapshot<T> _snapshot; //同上面的FutureSnapShot一样,包装状态和值
    
      @override
      void initState() {
        super.initState();
        _snapshot = AsyncSnapshot<T>.withData(ConnectionState.none, widget.initialData); //初始化阶段状态和值
        _subscribe();  //订阅Future
      }
    
      @override
      void didUpdateWidget(FutureBuilder<T> oldWidget) {
        super.didUpdateWidget(oldWidget);
        if (oldWidget.future != widget.future) {  //视图更新后对future解绑操作,重新订阅新的future
          if (_activeCallbackIdentity != null) {
            _unsubscribe();
            _snapshot = _snapshot.inState(ConnectionState.none); //重制阶段状态和值
          }
          _subscribe();
        }
      }
    
      @override
      Widget build(BuildContext context) => widget.builder(context, _snapshot);
    
      @override
      void dispose() {
        _unsubscribe();
        super.dispose();
      }
    
      void _subscribe() {
        if (widget.future != null) {
          final Object callbackIdentity = Object();
          _activeCallbackIdentity = callbackIdentity;
          widget.future.then<void>((T data) {
            if (_activeCallbackIdentity == callbackIdentity) {
              setState(() {
                _snapshot = AsyncSnapshot<T>.withData(ConnectionState.done, data); //完成阶段状态值
              });
            }
          }, onError: (Object error) {
            if (_activeCallbackIdentity == callbackIdentity) {
              setState(() {
                _snapshot = AsyncSnapshot<T>.withError(ConnectionState.done, error); //错误阶段状态值
              });
            }
          });
          _snapshot = _snapshot.inState(ConnectionState.waiting); //等待阶段状态值
        }
      }
    
      void _unsubscribe() {
        _activeCallbackIdentity = null;  //控制future和build同步的标志位,类似信号量(不过flutter ui线程为单线程)避免future执行时widget已经被移除了.
      }
    }
    

    StreamBuilder与Future应用

    和Future的设计方式基本一致, 只不过他的状态多一点点, 数据源是持续的,不想future的数据是单次的,StreamBuilder继承了 _StreamBuilderBase , _StreamBuilderBase 用来管理stream流的订阅和更新, StreamBuilder 则更多的是用来实现暴露给用户的接口,状态值的变化, Widget的builder实现.

    相比较FutureBuilder主要差异主要在这里,状态值多一些.

    class _StreamBuilderBaseState<T, S> extends State<StreamBuilderBase<T, S>> ...
    void _subscribe() {
        if (widget.stream != null) {
          _subscription = widget.stream.listen((T data) {
            setState(() {
              _summary = widget.afterData(_summary, data);
            });
          }, onError: (Object error) {
            setState(() {
              _summary = widget.afterError(_summary, error);
            });
          }, onDone: () {
            setState(() {
              _summary = widget.afterDone(_summary);
            });
          });
          _summary = widget.afterConnected(_summary);
        }
      }
    

    通常而言, 处理简单的网络回调请求一般用 FutureBuider 就能满组需求了,如果是涉及到持续的数据订阅如蓝牙数据, 定位信息等, 可以采用StreamBuilder. 在使用过程中需要特别注意 异步Future/Stream对象变更后, 他所构建的wiget的作用域问题. 因为widget切换会导致他们所依赖的 InheritedWidget 被移除, 如果是项目中使用了Bloc应特别注意。

    小结

    Future是为适配Isolate中的 microTask和eventTask2而涉及, 包含了常规的数据订阅,输入输出, 异常捕捉, 多个异步数据合并, 通过异步数据顺序执行, 并且提供了便利的Completer对future的使用进行解耦操作, 基于Future的基本功能, 同时也为Stream流和RxData的封装奠定了基石。

  • 相关阅读:
    vue_03
    vue03
    vue2
    vue02
    vue 01
    JavaScript要点 (一) 变量-作用域
    在iOS应用程序中打开设备设置界面及其中某指定的选项界面
    多线程操作Coredata(转)
    iOS_城市定位
    本地验证码
  • 原文地址:https://www.cnblogs.com/wwoo/p/futuredart-yue-du.html
Copyright © 2011-2022 走看看