zoukankan      html  css  js  c++  java
  • Dart3_异步编程

    参考

    https://dart.cn/guides/language/language-tour#asynchrony-support

    https://dart.cn/guides/libraries/library-tour#dartasync---asynchronous-programming

    https://dart.cn/codelabs/async-await

    异步编程

    在dart中是没有线程的概念的,取而代之的是异步编程。

    可以使用两种方式创建异步函数:

    • 通过async和await关键字
    • 通过Future api来链式创建

    async和await的操作和Future是分不开的,只是async和await更简化了逻辑,看起来像是同步代码一样。

    在直接使用 Future API 前,首先应该考虑 await 来替代。代码中使用 await 表达式会比直接使用 Future API 更容易理解。

    async和await

    声明一个异步函数,在方法名后边添加上关键字async即可,同时返回值要修改成用Future<T>,T是函数体要返回的值,如下例:

    Future<void> main() async {
      var future = getName();
      future.then((value) => print('getName:$value'));
      print("after get name");
    }
    
    Future<String> getName() async {
      print("getName start");
      return "wz";
    }

    输出结果是:

    getName start
    
    after get name
    
    getName:wz

    上边的代码其实并不会异步执行,因为异步执行是在遇到第一await时会挂起函数,并直接返回Future<T>,同时函数会等待await指定的操作完成后才继续方法后边的代码。

    修改一下上边的getName的代码:

    Future<String> getName() async {
      print("getName start");
      await Future.delayed(Duration(seconds: 2));
      print("after delay");
      return "wz";
    }

    输出结果是:

    getName start
    
    after get name
    
    after delay
    
    getName:wz

    await使用说明:

    • await只能用在async中。
    • await后可以跟普通函数(但没有意义的),也可以跟异步函数(返回值是Future<T>的函数)。
    • await等待的操作结果是可以赋值给一个变量的,此时会把后边的Future<T>的T取出来,这样我们就可以用同步的方式编写异步调用,
    var entrypoint = await findEntrypoint();
    
    var exitCode = await runExecutable(entrypoint, args);
    
    await flushThenExit(exitCode);

    处理异常

    使用 try、catch 以及 finally 来处理使用 await 导致的异常:

    try {
      version = await lookUpVersion();
      。。。
    } catch (e) {
      // 无法找到版本时做出的反应
    }

    如果有异常发生的话,lookUpVersion下边的代码就不会执行了,会直接跳到catch中。

    Future

    https://api.dart.cn/stable/2.10.4/dart-async/Future-class.html

    从 Dart 2.1 开始,使用 Future 和 Stream 不需要导入 dart:async ,因为 dart:core 库 export 了这些类。

    上边通过async和await返回了一个Future对象,可以通过此对象注册一些监听,比如执行完后再去执行其他,异常处理等。

    Future<int> future = getFuture();
    future.then((value) => handleValue(value))
          .catchError((error) => handleError(error));

    FutureOr<T>

    先说一下这个类,因为Future中有很多方法都有此类,

    此类表示的是一个值,这个值可以是Future<T>或T。

    此类声明是内部future或value泛型类型的公共代理。对此类的引用被解析为内部类型。

    任何类extend、mixin或implement 此类都是一个编译时错误。

    Future的构造函数

    • Future(FutureOr<T> computation())

    创建一个Future,其中包含Timer.run异步调用computation()的结果。

    • Future.delayed(Duration duration, [FutureOr<T> computation()])

    创建一个Future,会在指定的延时后执行computation()

    • Future.microtask(FutureOr<T> computation())

    其中包含scheduleMicrotask异步调用computation()的结果。

    microtask是比普通的事件(比如Timer事件)更先执行的一种任务。

    • Future.sync(FutureOr<T> computation())

    包含立即调用computation()的结果。

    If calling computation throws, the returned future is completed with the error.

    If calling computation returns a Future<T>, that future is returned.

    If calling computation returns a non-future value, a future is returned which has been completed with that value.

    • Future.error(Object error, [StackTrace? stackTrace])

    创建一个Future,此Future有一个error的结果。

    • Future.value([FutureOr<T>? value])

    如果value是一个Future,那么会创建一个等待value的future执行完成的Future。

    如果value是一个值,那么就相当于new Future<T>.sync(() => value)

    Future的成员方法

    • asStream() → Stream<T>

    Creates a Stream containing the result of this future.

    • catchError(Function onError, {bootest(Object error)}) → Future<T>

    Handles errors emitted by this Future.

    确保调用 catchError() 方式在 then() 的结果上,而不是在原来的 Future 对象上调用。否则的话,catchError() 就只能处理原来 Future 对象抛出的异常,而无法处理 then() 代码里面的异常。

    • then<R>(FutureOr<R> onValue(T value), {Function? onError}) → Future<R>

    Register callbacks to be called when this future completes.

    • timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) → Future<T>

    Time-out the future computation after timeLimit has passed.

    • whenComplete(FutureOr<void> action()) → Future<T>

    Registers a function to be called when this future completes.

    Future的static方法

    • any<T>(Iterable<Future<T>> futures) → Future<T>

    Returns the result of the first future in futures to complete.

    • doWhile(FutureOr<bool> action()) → Future

    Performs an operation repeatedly untiit returns false.

    • forEach<T>(Iterable<T> elements, FutureOr action(T element)) → Future

    Performs an action for each element of the iterable, in turn.

    • wait<T>(Iterable<Future<T>> futures, {booeagerError: false, void cleanUp(T successValue)}) → Future<List<T>>

    Waits for multiple futures to complete and collects their results.

    Stream

    在 Dart API 中 Stream 对象随处可见,Stream 用来表示一系列数据。例如,HTM中的按钮点击就是通过 stream 传递的。同样也可以将文件作为数据流来读取。

    如果想从 Stream 中获取值,可以有两种选择:

    • 使用 async 关键字和一个 异步循环(使用 await for 关键字标识)。
    • 使用 Stream API。详情参考库概览。

    await for

    使用 await for 定义异步循环看起来是这样的:

    await for (varOrType identifier in expression) {
      // 每当 Stream 发出一个值时会执行
    }

    expression的类型必须是 Stream。执行流程如下:

    1. 等待直到 Stream 返回一个数据。

    2. 使用 1 中 Stream 返回的数据执行循环体。

    3. 重复 1、2 过程直到 Stream close。

    使用 break 和 return 语句可以停止接收 Stream 数据,这样就跳出了循环并取消注册监听 Stream。

    Stream

    https://api.dart.cn/stable/2.10.4/dart-async/Stream-class.html

    Stream提供了一种接收事件序列的方法。每个事件要么是一个数据事件,也称为流的一个元素,要么是一个错误事件,它是一个失败的通知。

    当Stream已发出其所有事件时,单个“done”事件将通知侦听器已到达结束。

    你可以通过Stream.listen()来监听Stream的事件,它会返回一个StreamSubscription对象,你可以在此对象上添加一些监听,暂停恢复事件的监听,或cancel取消监听。

    当“done”事件被触发时,订阅者在接收事件之前被取消订阅。事件发送后,Stream就没有订阅者了。允许在此时之后向广播Stream添加新订阅者,但他们将尽快接收到新的“done”事件。

    有两种类型的Stream:

    • 单订阅Stream,

    是只能被listen一次,多次调用是不允许的,即使在取消第一个监听之后在调用也不允许。

    并且此Stream是冷流,即只有当有监听者时才会开始产生事件,取消监听后Stream停止发送事件,即使事件源仍然可以提供更多。

    单订阅流通常用于流式传输较大的连续数据块,如文件I/O。

    • 广播Stream。

    可以被多个监听者监听,

    并且此Stream是热流,即只要有事件就发送,不管有没有监听者,

    单订阅Stream也可以通过asBroadcastStream()来转换成广播Stream。

    当注册监听者时,只能接收到注册之后发送的事件,而不能接收到之前已经发送过的。

    Stream的Api

    属性

    • first → Future<T>

    监听Stream,只要接收到第一个事件就立马取消监听。

    • isBroadcast → bool

    Whether this stream is a broadcast stream.

    • last → Future<T>

    The last element of this stream.

    • length → Future<int>

    The number of elements in this stream.

    方法

    • asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription)}) → Stream<T>

    Returns a multi-subscription stream that produces the same events as this.

    • drain<E>([E? futureValue]) → Future<E>

    Discards aldata on this stream, but signals when it is done or an error occurred.

    • handleError(Function onError, {bootest(dynamic error)}) → Stream<T>

    Creates a wrapper Stream that intercepts some errors from this stream.

    • listen(void onData(T event), {Function? onError, void onDone(), bool? cancelOnError}) → StreamSubscription<T>

    Adds a subscription to this stream.

    • skip(int count) → Stream<T>

    Skips the first count data events from this stream.

    StreamSubscription

    • Future<void> cancel();
    • void onData(void handleData(T data)?);
    • void onError(Function? handleError);
    • void onDone(void handleDone()?);
    • void pause([Future<void>? resumeSignal]);
    • void resume();
    • Future<E> asFuture<E>([E? futureValue]);
    • booget isPaused;

    Generators

    当你需要延迟地生成一连串的值时,可以考虑使用 生成器函数。

    Dart 内置支持两种形式的生成器方法:

    • 同步 生成器:返回一个 Iterable 对象。
    • 异步 生成器:返回一个 Stream 对象。

    通过在函数上加 sync* 关键字并将返回值类型设置为 Iterable 来实现一个 同步 生成器函数,在函数中使用 yield 语句来传递值:

    Iterable<int> naturalsTo(int n) sync* {
      int k = 0;
      while (k < n) yield k++;
    }

    实现 异步 生成器函数与同步类似,只不过关键字为 async* 并且返回值为 Stream:

    Stream<int> asynchronousNaturalsTo(int n) async* {
      int k = 0;
      while (k < n) yield k++;
    }

    如果生成器是递归调用的,可是使用 yield* 语句提升执行性能:

    Iterable<int> naturalsDownFrom(int n) sync* {
      if (n > 0) {
        yield n;
        yield* naturalsDownFrom(n - 1);
      }
    }

    Isolates

    https://www.bilibili.com/video/BV1Y7411V7iP/?spm_id_from=333.788.videocard.0

    大多数计算机中,甚至在移动平台上,都在使用多核 CPU。为了有效利用多核性能,开发者一般使用共享内存的方式让线程并发地运行。然而,多线程共享数据通常会导致很多潜在的问题,并导致代码运行出错。

    为了解决多线程带来的并发问题,Dart 使用 isolates 替代线程。

    • 每一个 isolate 有它自己的堆内存以确保其状态不被其它 isolates 访问。公共变量的值都是isolate 独有的,类似于java的ThreadLocal。
    • 每一个 isolate都有自己的线程和一个eventloop事件循环系统。
    • isolate之间想要沟通是通过ReceivePort, SendPort发送接收事件来解决的。都会发送到对方isolate的eventloop。

    可以通过Isolate的spawn来创建一个isloate,

    external static Future<Isolate> spawn<T>(
        void entryPoint(T message), T message,
        {bool paused = false,
        bool errorsAreFatal = true,
        SendPort? onExit,
        SendPort? onError,
        @Since("2.3") String? debugName});
    • entryPoint:是新的isolate要运行的方法,或者初始化方法,它接收第二个参数类型的值。
    • message:用来发送给新isolate的一个消息,通常会把当前isolate创建的SendPort包装在message内传递给新的isolate,那么新isolate就可以用SendPort去通知父isolate。
    • debugName:是debug时的名字,一般Isolate.current.debugName来获取到当前Isolate的名字。

    static属性

    • external static Isolate get current;

    Return an Isolate object representing the current isolate.

    方法

    kill({int priority: beforeNextEvent}) → void

    Requests the isolate to shut down.

    ReceivePort

    • abstract class ReceivePort implements Stream<dynamic>

    可以看到它实现了Stream,所以可以调用Stream.listen来监听SendPort发送过来的信息,

    此Stream不是广播Stream,所以如果需要的话调用asBroadcastStream()来转换成广播Stream。

    • close() → void

    Closes this.

    • SendPort get sendPort;

    Returns a [SendPort] that sends to this receive port.

    SendPort

    • send(Object? message) → void

    Sends an asynchronous message through this send port, to its corresponding ReceivePort.

    例子

    int age = 18;
    
    void main() {
      age++;
      print("Isolate:${Isolate.current.debugName},age:$age");
    
      ReceivePort receivePort = ReceivePort();
      receivePort.listen((message) {
        print("receive data:$message");
        receivePort.close();
      });
    
      Isolate.spawn(initSubIsolate, Data(receivePort.sendPort, "from main", 1),
          debugName: "sub_isolate");
    }
    
    void initSubIsolate(Data data) {
      print("Isolate:${Isolate.current.debugName},age:$age");
      print("Isolate:${Isolate.current.debugName}:$data");
    
      Future.delayed(Duration(seconds: 2), () {
        data.sendPort.send(Data(null, "fuck you", data.requestCode));
      });
    }
    
    class Data {
      final SendPort sendPort;
      final String msg;
      final int requestCode;
    
      Data(this.sendPort, this.msg, this.requestCode);
    
      @override
      String toString() {
        return 'Data{msg: $msg, requestCode: $requestCode}';
      }
    }

    输出:

    Isolate:main,age:19
    Isolate:sub_isolate,age:18
    Isolate:sub_isolate:Data{msg: from main, requestCode: 1}
    receive data:Data{msg: fuck you, requestCode: 1}

    eventloop

    wps1

  • 相关阅读:
    Java实现 LeetCode 735 行星碰撞(栈)
    Java实现 LeetCode 735 行星碰撞(栈)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 蓝桥杯算法提高 求最大值
    Java实现 蓝桥杯算法提高 求最大值
    Java实现 蓝桥杯算法提高 求最大值
    Python eval() 函数
    Python repr() 函数
  • 原文地址:https://www.cnblogs.com/muouren/p/14009783.html
Copyright © 2011-2022 走看看