zoukankan      html  css  js  c++  java
  • Dart3_异步编程

    参考

    https://dart.cn/guides/language/language-tour#asynchrony-support

    https://dart.cn/guides/libraries/library-tour#dartasync---asynchronous-programming

    https://dart.cn/codelabs/async-await

    异步编程

    在dart中是没有线程的概念的,取而代之的是异步编程。

    可以使用两种方式创建异步函数:

    • 通过async和await关键字
    • 通过Future api来链式创建

    async和await的操作和Future是分不开的,只是async和await更简化了逻辑,看起来像是同步代码一样。

    在直接使用 Future API 前,首先应该考虑 await 来替代。代码中使用 await 表达式会比直接使用 Future API 更容易理解。

    async和await

    声明一个异步函数,在方法名后边添加上关键字async即可,同时返回值要修改成用Future<T>,T是函数体要返回的值,如下例:

    Future<void> main() async {
      var future = getName();
      future.then((value) => print('getName:$value'));
      print("after get name");
    }
    
    Future<String> getName() async {
      print("getName start");
      return "wz";
    }

    输出结果是:

    getName start
    
    after get name
    
    getName:wz

    上边的代码其实并不会异步执行,因为异步执行是在遇到第一await时会挂起函数,并直接返回Future<T>,同时函数会等待await指定的操作完成后才继续方法后边的代码。

    修改一下上边的getName的代码:

    Future<String> getName() async {
      print("getName start");
      await Future.delayed(Duration(seconds: 2));
      print("after delay");
      return "wz";
    }

    输出结果是:

    getName start
    
    after get name
    
    after delay
    
    getName:wz

    await使用说明:

    • await只能用在async中。
    • await后可以跟普通函数(但没有意义的),也可以跟异步函数(返回值是Future<T>的函数)。
    • await等待的操作结果是可以赋值给一个变量的,此时会把后边的Future<T>的T取出来,这样我们就可以用同步的方式编写异步调用,
    var entrypoint = await findEntrypoint();
    
    var exitCode = await runExecutable(entrypoint, args);
    
    await flushThenExit(exitCode);

    处理异常

    使用 try、catch 以及 finally 来处理使用 await 导致的异常:

    try {
      version = await lookUpVersion();
      。。。
    } catch (e) {
      // 无法找到版本时做出的反应
    }

    如果有异常发生的话,lookUpVersion下边的代码就不会执行了,会直接跳到catch中。

    Future

    https://api.dart.cn/stable/2.10.4/dart-async/Future-class.html

    从 Dart 2.1 开始,使用 Future 和 Stream 不需要导入 dart:async ,因为 dart:core 库 export 了这些类。

    上边通过async和await返回了一个Future对象,可以通过此对象注册一些监听,比如执行完后再去执行其他,异常处理等。

    Future<int> future = getFuture();
    future.then((value) => handleValue(value))
          .catchError((error) => handleError(error));

    FutureOr<T>

    先说一下这个类,因为Future中有很多方法都有此类,

    此类表示的是一个值,这个值可以是Future<T>或T。

    此类声明是内部future或value泛型类型的公共代理。对此类的引用被解析为内部类型。

    任何类extend、mixin或implement 此类都是一个编译时错误。

    Future的构造函数

    • Future(FutureOr<T> computation())

    创建一个Future,其中包含Timer.run异步调用computation()的结果。

    • Future.delayed(Duration duration, [FutureOr<T> computation()])

    创建一个Future,会在指定的延时后执行computation()

    • Future.microtask(FutureOr<T> computation())

    其中包含scheduleMicrotask异步调用computation()的结果。

    microtask是比普通的事件(比如Timer事件)更先执行的一种任务。

    • Future.sync(FutureOr<T> computation())

    包含立即调用computation()的结果。

    If calling computation throws, the returned future is completed with the error.

    If calling computation returns a Future<T>, that future is returned.

    If calling computation returns a non-future value, a future is returned which has been completed with that value.

    • Future.error(Object error, [StackTrace? stackTrace])

    创建一个Future,此Future有一个error的结果。

    • Future.value([FutureOr<T>? value])

    如果value是一个Future,那么会创建一个等待value的future执行完成的Future。

    如果value是一个值,那么就相当于new Future<T>.sync(() => value)

    Future的成员方法

    • asStream() → Stream<T>

    Creates a Stream containing the result of this future.

    • catchError(Function onError, {bootest(Object error)}) → Future<T>

    Handles errors emitted by this Future.

    确保调用 catchError() 方式在 then() 的结果上,而不是在原来的 Future 对象上调用。否则的话,catchError() 就只能处理原来 Future 对象抛出的异常,而无法处理 then() 代码里面的异常。

    • then<R>(FutureOr<R> onValue(T value), {Function? onError}) → Future<R>

    Register callbacks to be called when this future completes.

    • timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) → Future<T>

    Time-out the future computation after timeLimit has passed.

    • whenComplete(FutureOr<void> action()) → Future<T>

    Registers a function to be called when this future completes.

    Future的static方法

    • any<T>(Iterable<Future<T>> futures) → Future<T>

    Returns the result of the first future in futures to complete.

    • doWhile(FutureOr<bool> action()) → Future

    Performs an operation repeatedly untiit returns false.

    • forEach<T>(Iterable<T> elements, FutureOr action(T element)) → Future

    Performs an action for each element of the iterable, in turn.

    • wait<T>(Iterable<Future<T>> futures, {booeagerError: false, void cleanUp(T successValue)}) → Future<List<T>>

    Waits for multiple futures to complete and collects their results.

    Stream

    在 Dart API 中 Stream 对象随处可见,Stream 用来表示一系列数据。例如,HTM中的按钮点击就是通过 stream 传递的。同样也可以将文件作为数据流来读取。

    如果想从 Stream 中获取值,可以有两种选择:

    • 使用 async 关键字和一个 异步循环(使用 await for 关键字标识)。
    • 使用 Stream API。详情参考库概览。

    await for

    使用 await for 定义异步循环看起来是这样的:

    await for (varOrType identifier in expression) {
      // 每当 Stream 发出一个值时会执行
    }

    expression的类型必须是 Stream。执行流程如下:

    1. 等待直到 Stream 返回一个数据。

    2. 使用 1 中 Stream 返回的数据执行循环体。

    3. 重复 1、2 过程直到 Stream close。

    使用 break 和 return 语句可以停止接收 Stream 数据,这样就跳出了循环并取消注册监听 Stream。

    Stream

    https://api.dart.cn/stable/2.10.4/dart-async/Stream-class.html

    Stream提供了一种接收事件序列的方法。每个事件要么是一个数据事件,也称为流的一个元素,要么是一个错误事件,它是一个失败的通知。

    当Stream已发出其所有事件时,单个“done”事件将通知侦听器已到达结束。

    你可以通过Stream.listen()来监听Stream的事件,它会返回一个StreamSubscription对象,你可以在此对象上添加一些监听,暂停恢复事件的监听,或cancel取消监听。

    当“done”事件被触发时,订阅者在接收事件之前被取消订阅。事件发送后,Stream就没有订阅者了。允许在此时之后向广播Stream添加新订阅者,但他们将尽快接收到新的“done”事件。

    有两种类型的Stream:

    • 单订阅Stream,

    是只能被listen一次,多次调用是不允许的,即使在取消第一个监听之后在调用也不允许。

    并且此Stream是冷流,即只有当有监听者时才会开始产生事件,取消监听后Stream停止发送事件,即使事件源仍然可以提供更多。

    单订阅流通常用于流式传输较大的连续数据块,如文件I/O。

    • 广播Stream。

    可以被多个监听者监听,

    并且此Stream是热流,即只要有事件就发送,不管有没有监听者,

    单订阅Stream也可以通过asBroadcastStream()来转换成广播Stream。

    当注册监听者时,只能接收到注册之后发送的事件,而不能接收到之前已经发送过的。

    Stream的Api

    属性

    • first → Future<T>

    监听Stream,只要接收到第一个事件就立马取消监听。

    • isBroadcast → bool

    Whether this stream is a broadcast stream.

    • last → Future<T>

    The last element of this stream.

    • length → Future<int>

    The number of elements in this stream.

    方法

    • asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription)}) → Stream<T>

    Returns a multi-subscription stream that produces the same events as this.

    • drain<E>([E? futureValue]) → Future<E>

    Discards aldata on this stream, but signals when it is done or an error occurred.

    • handleError(Function onError, {bootest(dynamic error)}) → Stream<T>

    Creates a wrapper Stream that intercepts some errors from this stream.

    • listen(void onData(T event), {Function? onError, void onDone(), bool? cancelOnError}) → StreamSubscription<T>

    Adds a subscription to this stream.

    • skip(int count) → Stream<T>

    Skips the first count data events from this stream.

    StreamSubscription

    • Future<void> cancel();
    • void onData(void handleData(T data)?);
    • void onError(Function? handleError);
    • void onDone(void handleDone()?);
    • void pause([Future<void>? resumeSignal]);
    • void resume();
    • Future<E> asFuture<E>([E? futureValue]);
    • booget isPaused;

    Generators

    当你需要延迟地生成一连串的值时,可以考虑使用 生成器函数。

    Dart 内置支持两种形式的生成器方法:

    • 同步 生成器:返回一个 Iterable 对象。
    • 异步 生成器:返回一个 Stream 对象。

    通过在函数上加 sync* 关键字并将返回值类型设置为 Iterable 来实现一个 同步 生成器函数,在函数中使用 yield 语句来传递值:

    Iterable<int> naturalsTo(int n) sync* {
      int k = 0;
      while (k < n) yield k++;
    }

    实现 异步 生成器函数与同步类似,只不过关键字为 async* 并且返回值为 Stream:

    Stream<int> asynchronousNaturalsTo(int n) async* {
      int k = 0;
      while (k < n) yield k++;
    }

    如果生成器是递归调用的,可是使用 yield* 语句提升执行性能:

    Iterable<int> naturalsDownFrom(int n) sync* {
      if (n > 0) {
        yield n;
        yield* naturalsDownFrom(n - 1);
      }
    }

    Isolates

    https://www.bilibili.com/video/BV1Y7411V7iP/?spm_id_from=333.788.videocard.0

    大多数计算机中,甚至在移动平台上,都在使用多核 CPU。为了有效利用多核性能,开发者一般使用共享内存的方式让线程并发地运行。然而,多线程共享数据通常会导致很多潜在的问题,并导致代码运行出错。

    为了解决多线程带来的并发问题,Dart 使用 isolates 替代线程。

    • 每一个 isolate 有它自己的堆内存以确保其状态不被其它 isolates 访问。公共变量的值都是isolate 独有的,类似于java的ThreadLocal。
    • 每一个 isolate都有自己的线程和一个eventloop事件循环系统。
    • isolate之间想要沟通是通过ReceivePort, SendPort发送接收事件来解决的。都会发送到对方isolate的eventloop。

    可以通过Isolate的spawn来创建一个isloate,

    external static Future<Isolate> spawn<T>(
        void entryPoint(T message), T message,
        {bool paused = false,
        bool errorsAreFatal = true,
        SendPort? onExit,
        SendPort? onError,
        @Since("2.3") String? debugName});
    • entryPoint:是新的isolate要运行的方法,或者初始化方法,它接收第二个参数类型的值。
    • message:用来发送给新isolate的一个消息,通常会把当前isolate创建的SendPort包装在message内传递给新的isolate,那么新isolate就可以用SendPort去通知父isolate。
    • debugName:是debug时的名字,一般Isolate.current.debugName来获取到当前Isolate的名字。

    static属性

    • external static Isolate get current;

    Return an Isolate object representing the current isolate.

    方法

    kill({int priority: beforeNextEvent}) → void

    Requests the isolate to shut down.

    ReceivePort

    • abstract class ReceivePort implements Stream<dynamic>

    可以看到它实现了Stream,所以可以调用Stream.listen来监听SendPort发送过来的信息,

    此Stream不是广播Stream,所以如果需要的话调用asBroadcastStream()来转换成广播Stream。

    • close() → void

    Closes this.

    • SendPort get sendPort;

    Returns a [SendPort] that sends to this receive port.

    SendPort

    • send(Object? message) → void

    Sends an asynchronous message through this send port, to its corresponding ReceivePort.

    例子

    int age = 18;
    
    void main() {
      age++;
      print("Isolate:${Isolate.current.debugName},age:$age");
    
      ReceivePort receivePort = ReceivePort();
      receivePort.listen((message) {
        print("receive data:$message");
        receivePort.close();
      });
    
      Isolate.spawn(initSubIsolate, Data(receivePort.sendPort, "from main", 1),
          debugName: "sub_isolate");
    }
    
    void initSubIsolate(Data data) {
      print("Isolate:${Isolate.current.debugName},age:$age");
      print("Isolate:${Isolate.current.debugName}:$data");
    
      Future.delayed(Duration(seconds: 2), () {
        data.sendPort.send(Data(null, "fuck you", data.requestCode));
      });
    }
    
    class Data {
      final SendPort sendPort;
      final String msg;
      final int requestCode;
    
      Data(this.sendPort, this.msg, this.requestCode);
    
      @override
      String toString() {
        return 'Data{msg: $msg, requestCode: $requestCode}';
      }
    }

    输出:

    Isolate:main,age:19
    Isolate:sub_isolate,age:18
    Isolate:sub_isolate:Data{msg: from main, requestCode: 1}
    receive data:Data{msg: fuck you, requestCode: 1}

    eventloop

    wps1

  • 相关阅读:
    刷题笔记
    布隆过滤器
    单例模式,堆,BST,AVL树,红黑树
    B树、B-树、B+树、B*树【转】,mysql索引
    数据结构与算法80道
    海量数据处理【转】
    volcanol的工控博客
    volcanol的工控博客
    volcanol的工控博客
    volcanol的工控博客
  • 原文地址:https://www.cnblogs.com/muouren/p/14009783.html
Copyright © 2011-2022 走看看