zoukankan      html  css  js  c++  java
  • vertx 异步编程指南 step8-使用RxJava进行反应式编程

    vertx 异步编程指南 step8-使用RxJava进行反应式编程

    到目前为止,我们已经使用基于回调的API探索了Vert.x堆栈的各个领域。它只是起作用,并且这种编程模型在很多语言的开发人员中都是众所周知的。然而,它可能会变得有点乏味,特别是当你结合几个事件源或处理复杂的数据流时。

    这正是RxJava发挥的作用,Vert.x可以无缝集成它。

    注意
    在本指南中,使用了RxJava 2.x,但Vert.x也适用于RxJava 1.x. RxJava 2.x在Reactive-Streams规范的基础上完全重写。在2.0 wiki页面的不同之处了解更多信息。

    启用RxJava API

    除了基于回调的API之外,Vert.x模块还提供了“Rxified” API。要启用它,首先将vertx-rx-java2模块添加到Maven POM文件中:

     
    <dependency>
     
    <groupId>io.vertx</groupId>
     
    <artifactId>vertx-rx-java2</artifactId>
     
    </dependency>

    Verticle必须进行修改,以便扩展io.vertx.reactivex.core.AbstractVerticle而不是io.vertx.core.AbstractVerticle。这有什么不同?前一类扩展后者并暴露一个io.vertx.reactivex.core.Vertx领域。

    io.vertx.reactivex.core.Vertx定义rxSomething(…​)与基于回调的对应方法等效的额外方法。

    让我们来看看MainVerticle如何更好地了解它在实践中的工作原理:

    Single<String> dbVerticleDeployment = vertx.rxDeployVerticle(

    "io.vertx.guides.wiki.database.WikiDatabaseVerticle");

    rxDeploy方法不会将Handler<AsyncResult<String>>最终参数作为参数。相反,它返回一个Single<String>

    此外,调用该方法时操作不会启动。它在你订阅时开始Single。操作完成后,它会发出部署id或使用a指示问题的原因Throwable

    按顺序部署Verticle

    要最终确定MainVerticle重构,我们必须确保部署操作按顺序触发并发生:

    dbVerticleDeployment 
    .flatMap(id -> { (1) 
    Single<String> httpVerticleDeployment = vertx.rxDeployVerticle( 
    "io.vertx.guides.wiki.http.HttpServerVerticle", 
    new DeploymentOptions().setInstances(2));  
    return httpVerticleDeployment; 
    })
     
    .subscribe(id -> startFuture.complete(), startFuture::fail); (2)
    1. flatMap运营商应用功能的结果dbVerticleDeployment。它在这里安排的部署HttpServerVerticle

    2. 订阅时开始操作。在成功或错误时,MainVerticle开始未来可能完成或失败。

    部分“Rxifying” HttpServerVerticle

    如果按顺序跟随指南,随时编辑代码,那么HttpServerVerticle类仍然使用基于回调的API。在您可以使用RxJava API 自然执行异步操作之前,即 同时需要重构HttpServerVerticle

    导入Vert.x类的RxJava版本

     
    import io.vertx.reactivex.core.AbstractVerticle;
     
    import io.vertx.reactivex.core.http.HttpServer;
     
    import io.vertx.reactivex.ext.auth.AuthProvider;
     
    import io.vertx.reactivex.ext.auth.User;
     
    import io.vertx.reactivex.ext.auth.jwt.JWTAuth;
     
    import io.vertx.reactivex.ext.auth.shiro.ShiroAuth;
     
    import io.vertx.reactivex.ext.web.Router;
     
    import io.vertx.reactivex.ext.web.RoutingContext;
     
    import io.vertx.reactivex.ext.web.client.WebClient;
     
    import io.vertx.reactivex.ext.web.client.HttpResponse; (1)
     
    import io.vertx.reactivex.ext.web.codec.BodyCodec;
     
    import io.vertx.reactivex.ext.web.handler.*;
     
    import io.vertx.reactivex.ext.web.sstore.LocalSessionStore;
     
    import io.vertx.reactivex.ext.web.templ.FreeMarkerTemplateEngine;
     
    import org.slf4j.Logger;
     
    import org.slf4j.LoggerFactory;
    1. 我们的backupHandler()方法仍然使用HttpResponse类,所以它必须导入。事实证明,HttpResponseVert.x提供的RxJava版本可以作为这种特定情况下的替代品。所述“Rxified”在代码step-8作为响应类型由lambda表达式推断引导库的文件夹不导入此类。

     

    “Rxified” vertx实例上使用委托

    要调用一个方法,io.vertx.core.Vertx当你有一个方法时io.vertx.reactivex.core.Vertx,调用该getDelegate()方法。start()在创建以下实例时需要调整Verticle的方法 WikiDatabaseService

    1.  
      @Override
    2.  
      public void start(Future<Void> startFuture) throws Exception {
    3.  
       
    4.  
      String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue");
    5.  
      dbService = io.vertx.guides.wiki.database.WikiDatabaseService.createProxy(vertx.getDelegate(), wikiDbQueue);

    同时执行授权查询

    在前面的例子中,我们看到了如何使用RxJava操作符和Rxified Vert.x API按顺序执行异步操作。但有时候这种担保不是必需的,或者您只是希望它们为了性能原因而同时运行。

    JWT令牌生成过程HttpServerVerticle就是这种情况的一个很好的例子。要创建令牌,我们需要完成所有授权查询,但查询彼此独立:

    1.  
      auth.rxAuthenticate(creds).flatMap(user -> {
    2.  
      Single<Boolean> create = user.rxIsAuthorised("create"); (1)
    3.  
      Single<Boolean> delete = user.rxIsAuthorised("delete");
    4.  
      Single<Boolean> update = user.rxIsAuthorised("update");
    5.  
       
    6.  
      return Single.zip(create, delete, update, (canCreate, canDelete, canUpdate) -> { (2)
    7.  
      return jwtAuth.generateToken(
    8.  
      new JsonObject()
    9.  
      .put("username", context.request().getHeader("login"))
    10.  
      .put("canCreate", canCreate)
    11.  
      .put("canDelete", canDelete)
    12.  
      .put("canUpdate", canUpdate),
    13.  
      new JWTOptions()
    14.  
      .setSubject("Wiki API")
    15.  
      .setIssuer("Vert.x"));
    16.  
      });
    17.  
      }).subscribe(token -> {
    18.  
      context.response().putHeader("Content-Type", "text/plain").end(token);
    19.  
      }, t -> context.fail(401));
    1. Single创建三个对象,表示不同的授权查询。

    2. 当三个操作成功完成时,zip运算符回调将与结果一起调用。

    查询数据库

    直接查询

    通常,需要单个数据库查询来准备对用户的响应。对于这样简单的情况,JDBCClient提供rxQueryXXXrxUpdateXXX方法:

    1.  
      String query = sqlQueries.get(SqlQuery.GET_PAGE_BY_ID);
    2.  
      JsonArray params = new JsonArray().add(id);
    3.  
      Single<ResultSet> resultSet = dbClient.rxQueryWithParams(query, params);

    使用数据库连接

    当直接查询不适合时(例如,当多个查询必须参与相同的事务时),您可以从池中获取数据库连接。所有你需要做的是呼吁rxGetConnectionJDBCClient

    Single<SQLConnection> connection = dbClient.rxGetConnection();

    该方法返回一个Single<Connection>您可以轻松转换flatMapXXX以执行SQL查询的方法:

    1.  
      connection
    2.  
      .flatMapCompletable(conn -> conn.rxExecute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE)))

    但是如果SQLConnection参考不再可达,我们怎么能释放连接呢?一个简单而方便的方法是closedoFinally回调中调用:

    1.  
      private Single<SQLConnection> getConnection() {
    2.  
      return dbClient.rxGetConnection().flatMap(conn -> {
    3.  
      Single<SQLConnection> connectionSingle = Single.just(conn); (1)
    4.  
      return connectionSingle.doFinally(conn::close); (2)
    5.  
      });
    6.  
      }
    1. 在获得连接后,我们将其包装成一个 Single

    2. Single被修改,以调用close一个doFinally回调

    现在我们将getConnection随时使用我们需要的数据库连接。

    缩小回调和RxJava之间的差距

    有时,您可能必须将RxJava代码与基于回调的API混合使用。例如,服务代理接口只能用回调来定义,但实现使用Vert.x Rxified API。

    在这种情况下,io.vertx.reactivex.SingleHelper.toObserver类可以适应Handler<AsyncResult<T>>RxJava SingleObserver<T>

    1.  
      @Override
    2.  
      public WikiDatabaseService fetchAllPagesData(Handler<AsyncResult<List<JsonObject>>> resultHandler) { (1)
    3.  
      dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES_DATA))
    4.  
      .map(ResultSet::getRows)
    5.  
      .subscribe(SingleHelper.toObserver(resultHandler)); (2)
    6.  
      return this;
    7.  
      }
    1. fetchAllPagesData是一种异步服务代理操作,用Handler<AsyncResult<List<JsonObject>>>回调定义。

    2. toObserver方法适用resultHandler于a SingleObserver<List<JsonObject>>,以便在发布行列表时调用处理程序。

    注意
    io.vertx.reactivex.CompletableHelperio.vertx.reactivex.MaybeHelper提供适配器CompletableMaybe

    数据流

    RxJava不仅善于组合不同的事件源,而且对数据流也非常有帮助。与Vert.x或JDK未来不同,a Flowable不仅发布一个事件流,而且还发布一系列事件。它配备了大量的数据操作操作符。

    我们可以使用其中的一些来重构fetchAllPages数据库垂直方法:

    1.  
      public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) {
    2.  
      dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES))
    3.  
      .flatMapPublisher(res -> { (1)
    4.  
      List<JsonArray> results = res.getResults();
    5.  
      return Flowable.fromIterable(results); (2)
    6.  
      })
    7.  
      .map(json -> json.getString(0)) (3)
    8.  
      .sorted() (4)
    9.  
      .collect(JsonArray::new, JsonArray::add) (5)
    10.  
      .subscribe(SingleHelper.toObserver(resultHandler));
    11.  
      return this;
    12.  
      }
    1. flatMapPublisher我们将创建一个Flowable从发射的项目Single<Result>

    2. fromIterable将数据库结果Iterable转换为Flowable发出数据库行项目。

    3. 由于我们只需要页面名称,我们可以将map每一JsonObject行记录到第一列。

    4. 客户希望数据sorted按字母顺序排列。

    5. 事件巴士服务答复包含在一个单一的JsonArraycollect创建一个新的JsonArray::new项目,随后添加项目JsonArray::add

  • 相关阅读:
    招财铃计划 与 实现,
    present, visible,覆盖,系统行为,
    windows,navigationcontroller,stausbar, 20,充满,
    CSS伪类选择器:is、not
    CSS实现常用组件特效(不依赖JS)
    Iconfont技术
    Axios的基本使用
    axios和ajax,fetch的区别
    axios全攻略
    vue-loader作用
  • 原文地址:https://www.cnblogs.com/endv/p/11956451.html
Copyright © 2011-2022 走看看