zoukankan      html  css  js  c++  java
  • Vert.x核心包各功能模块详解

    源码:https://github.com/dagger9527/vertx_demo

    vert.x核心包是vertx-core。通常,只需要引入这个依赖就足以创建vert.x的http服务了。不过,vert.x为用户提供了更为强大的扩展模块,例如:vertx-web(创建http server,提供更强大的http服务),vertx-web-client(http客户端)。本章内容将围绕vertx-core模块做详细解释。

    首先引入vertx-core依赖

    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>4.0.0-milestone4</version>
    </dependency>
    

    quickstart模块已经介绍了如何去创建一个verticle实例,创建一个类去继承AbstractVerticle,在start方法中创建一个http服务,由verticle实例管理HTTP服务的生命周期。一个verticle就可以看作一个线程。可以创建多个verticle为用户提供服务。

    创建Vertx时配置参数

    VertxOptions对象使用

    VertxOptions的方法都会返回当前VertxOptions对象实例,所以可以像这样进行链式调用。

    Vertx vertx = Vertx.vertx(
        new VertxOptions()
        .setWorkerPoolSize(10)
        .setEventBusOptions(
            new EventBusOptions()
        )
    );
    

    AbstractVerticle类

    quickstart例子中可以看到,创建一个类通过继承AbstractVerticle,然后重写start方法来创建http服务,这样做可以让http服务的生命周期完全由verticle管理。

    AbstractVerticle类里面有两个属性,分别是Vertx和Context的对象,开发者在继承AbstractVerticle的子类里面可以直接使用这两个对象。

    例如vertx可以创建定时任务,向event bus中推送消息之类的事情。

    context可以存储公用数据,像servlet里面的HttpServletRequest、Session、ApplicationContext这些对象,在请求一个url时保存数据,在请求另一个url时再将数据取出来。

    看一个context对象的使用例子,在请求/路径,向context对象存放name=zhangsan数据。在请求/user拿到name值,并显示到页面上。

    public class AbstractVerticleDemo extends AbstractVerticle {
    
      private Logger logger = LoggerFactory.getLogger(AbstractVerticleDemo.class);
    
      public static void main(String[] args) {
        Vertx.vertx().deployVerticle(AbstractVerticleDemo.class.getName());
      }
    
      @Override
      public void start() throws Exception {
        // 创建一个http服务器,并监听8080端口
        vertx.createHttpServer(
          new HttpServerOptions()
            .setPort(8080)
        ).requestHandler(request -> {
          logger.info(request.path());
          String content = "<h1>Hello World</h1>";
          switch (request.path()) {
            case "/":
              context.put("name", "zhangsan");
              break;
            case "/user":
              content = "<h1>Hello World " + context.get("name") + "</h1>";
              break;
            default:
              break;
          }
          request
            .response()
            .putHeader("Content-Type", "text/html;charset=UTF-8")
            .end(content);
        }).listen();
        logger.info("MainVerticle Server Starting..");
      }
    }
    

    上面的例子展示了通过request.path()方法获取请求路径,通过判断路径来执行不同的处理逻辑。

    vertx-core并不是处理http请求专业的模块,后面vert.x提供了更专业的http server模块 -> vertx-web

    vertx创建定时任务

    每隔delay毫秒执行一次handler方法。

    long setPeriodic(long delay, Handler<Long> handler);
    

    delay毫秒后,只执行一次handler方法。

    long setTimer(long delay, Handler<Long> handler);
    

    handler方法的参数和setTimer、setPeriodic的返回值是这个定时任务的id,可以通过这个id来开启或取消这个任务。

    可以通过cancelTimer方法并传入定时任务的id来取消这个定时任务。

    boolean cancelTimer(long id);
    

    详细例子

    public class TimerTest {
    
      private static Logger logger = LoggerFactory.getLogger(VerticleDemo.class);
    
      public static void main(String[] args) {
        // 定时器
        timerTest();
      }
    
      /**
       * 间隔执行,执行多次
       */
      public static void timerTest() {
        Vertx vertx = Vertx.vertx();
        // 每隔1s执行一次,回调函数中的参数timerId和返回值timerId2是一样的,可以通过这个值关闭这个定时器
        long timerId2 = vertx.setPeriodic(1000, timerId -> {
          System.out.println("当前定时器id是: " + timerId);
        });
        logger.info(timerId2);
    
        // 情况和setPeriodic方法类似,只不过setTimer方法只执行一次
        long timerId3 = vertx.setTimer(2000, timerId -> {
          // 关闭上面setPeriodic的定时任务
          vertx.cancelTimer(timerId2);
          // 关闭服务,不然控制台会一直等待
          vertx.close();
        });
    
      }
    
    }
    

    Event Bus

    可以在event bus发送消息,进行消息通信。

    发送的消息类型可以是基本类型、String、Buffer、json。

    注册消息地址,第一个参数是注册地址,可通过send方法向这个地址上发送消息。第二个参数是回调参数,接收send方法发送的消息后,handler方法会被执行。

    <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
    

    向地址发送消息

    EventBus send(String address, @Nullable Object message);
    

    简单的demo

    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();
    // 注册消费者,地址是local.message.address
    MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
      // 打印消息内容
      logger.info(message.body());
    });
    
    /*
     * Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
     * 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
     */
    // 向local.message.address发送一条字符串消息
    eventBus.send("local.message.address", "First Message");
    

    发送buffer,json及自定义对象的例子在github上,可以通过下载源码看见。

    public class EventBusDemo {
    
      private Logger logger = LoggerFactory.getLogger(EventBusDemo.class.getName());
    
      /**
       * send方法向消息消费者发送消息
       * send方法发送的消息只会传递给在该地址注册的其中一个处理器,这就是点对点模式。Vert.x 使用不严格的轮询算法来选择绑定的处理器。
       */
      @Test
      public void sendMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
        // 注册消费者,地址是local.message.address
        MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
          // 打印消息内容
          logger.info(message.body());
        });
    
        /*
         * Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
         * 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
         */
        // 向local.message.address发送一条字符串消息
        eventBus.send("local.message.address", "First Message");
        // 发送Buffer类型消息
        BufferImpl buffer = new BufferImpl();
        buffer.appendString("vert.x 你好?");
        eventBus.send("local.message.address", buffer);
        // 发送一条json类型消息
        eventBus.send("local.message.address", new JsonObject().put("username", "张三").put("age", 18));
        // 向向local.message.address发送一条自定义对象消息,需要自定义消息解码器,实现MessageCodec
        // 注册自定义消息解码器
        CousumerMessageCodec cousumerMessageCodec = new CousumerMessageCodec();
        eventBus.registerCodec(cousumerMessageCodec);
    
        CousumerSendMessage cousumerSendMessage = new CousumerSendMessage();
        cousumerSendMessage.setUsername("大嘎嘎");
        cousumerSendMessage.setAge(18);
        // 发送自定义消息时,需要指定解码器
        eventBus.send("local.message.address", cousumerSendMessage, new DeliveryOptions().setCodecName(cousumerMessageCodec.name()));
    
        // 注销自定义消息解码器
        eventBus.unregisterCodec(cousumerMessageCodec.name());
    
        // 因为eventBus.consumer中的回调方法是异步的,只有再发送消息时回调函数才会触发,所以这里让主程序先等待一会儿,不然程序马上就会关闭,看不到回调函数的输出结果
        Thread.sleep(1000);
    
        // 注销消费处理器
        consumer.unregister(result -> {
          logger.info("消费者是否注销成功:" + result.succeeded());
        });
      }
    
      /**
       * publish方法向消息消费者发送消息
       * 消息将会传递给所有在地址 local.message.address 上注册过的处理器。
       */
      @Test
      public void publishMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
        // 接收send方法的消费者
        MessageConsumer<Object> sendConsumer = eventBus.consumer("send.message.address", message -> {
          logger.info("Send1:" + message.body());
        });
        MessageConsumer<Object> sendConsumer2 = eventBus.consumer("send.message.address", message -> {
          logger.info("Send2:" + message.body());
        });
    
        // 接收publish方法的消费者
        MessageConsumer<Object> publishConsumer = eventBus.consumer("publish.message.address", message -> {
          logger.info("Publish1:" + message.body());
        });
        MessageConsumer<Object> publishConsumer2 = eventBus.consumer("publish.message.address", message -> {
          logger.info("Publish2:" + message.body());
        });
    
        // send方法只会给其中一个在send.message.address地址上注册过的处理器发送消息
        eventBus.send("send.message.address", "Send Message");
        // publish方法会将消息发送给所有在publish.message.address上注册的处理器
        eventBus.publish("publish.message.address", "Publish Message");
    
        Thread.sleep(1000);
      }
    
      /**
       * 设置消息头
       */
      @Test
      public void headerTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info(message.headers());
          logger.info(message.headers().get("some-header"));
        });
    
        // 设置头信息
        DeliveryOptions options = new DeliveryOptions();
        options.addHeader("some-header", "some-value");
        // 还可以设置消息的超时时间,如果在这个时间内没有收到应答,则会以失败为参数调用应答处理器,默认30s
        options.setSendTimeout(1000 * 30);
    
        eventBus.send("message.address", "Yay! Someone kicked a ball", options);
    
        TimeUnit.SECONDS.sleep(1);
      }
    
      /**
       * 回复消息
       */
      @Test
      public void replyMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info("handler1: " + message.body());
          message.reply("reply message");
        });
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info("hander2: " + message.body());
          message.reply("reply message");
        });
    
        // 想接收回复的消息请用request方法,注意request只给其中一个在相同地址上注册过的处理器发送消息
        eventBus.request("message.address", "Yay! Someone kicked a ball", message -> {
          logger.info(message.result().body());
        });
    
        TimeUnit.SECONDS.sleep(1);
      }
    
      /**
       * 集群模式启动EventBus
       */
      @Test
      public void clusterEventBus() {
        VertxOptions options = new VertxOptions();
    
        // 对EventBus的一些配置
        options.setEventBusOptions(
          new EventBusOptions()
            .setSsl(true)
            .setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
            .setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
            .setClientAuth(ClientAuth.REQUIRED)
            .setClusterPublicHost("whatever")
            .setClusterPublicPort(1234)
        );
    
        // 创建集群模式的Verticle
        Vertx.clusteredVertx(options, res -> {
          if (res.succeeded()) {
            Vertx vertx = res.result();
            // 获取集群模式下的EventBus
            EventBus eventBus = vertx.eventBus();
            System.out.println("We now have a clustered event bus: " + eventBus);
          } else {
            System.out.println("Failed: " + res.cause());
          }
        });
      }
    }
    

    注*:如果是junit测试方法,vertx线程不会被阻塞,所以会出现已经向eventbus发送了消息,消息处理端还没接收到消息程序便关闭了。

    如果用的是main方法测试,则不会有这个问题。

    Buffer

    可以通过Buffer.buffer()创建一个buffer对象。这时创建的buffer对象大小默认是0且内容为空,在往buffer里添加内容时buffer会自动扩容。当然也可以指定buffer对象的默认值和默认大小。如果一开始就知道buffer会有一定的内容的话,推荐在一开始便指定buffer的初始值或大小,这样可以避免在添加的时候又动态扩容buffer而造成不必要的资源消耗。

    // 指定初始大小
    static Buffer buffer(int initialSizeHint);
    // 初始值
    static Buffer buffer(String string);
    // 初始值和编码
    static Buffer buffer(String string, String enc);
    // 初始内容,用bytes数组表示
    static Buffer buffer(byte[] bytes);
    // 初始内容用ByteBuf表示
    static Buffer buffer(ByteBuf byteBuf);
    

    详细例子请参见BufferDemo测试类

    public class BufferDemo {
    
      Logger logger = LoggerFactory.getLogger(BufferDemo.class);
    
      /**
       * 创建Buffer对象
       */
      @Test
      public void createBufferTest() {
        // 通过静态方法创建Buffer对象
        Buffer emptyBuffer = Buffer.buffer();
        emptyBuffer.appendString("Hello Buffer");
        // 创建带有初始值的Buffer对象,字符默认以UTF-8编码
        Buffer helloBuffer = Buffer.buffer("Hello Buffer");
        // 创建初始值,并指定编码
        Buffer encodingBuffer = Buffer.buffer("哈哈哈", "UTF-8");
    
        // 通过字节数组创建Buffer
        byte[] bytes = new byte[]{1, 3, 5};
        Buffer byteBuffer = Buffer.buffer(bytes);
    
        // 创建一个指定大小的Buffer,创建时使得这个buffer被分配到了更多的内存,比数据写入时再动态的调整内存大小效率要高的多
        Buffer customizerSizeBuffer = Buffer.buffer(4);
        customizerSizeBuffer.appendString("Hello Buffer");
    
        logger.info(emptyBuffer);
        logger.info(helloBuffer);
        logger.info(encodingBuffer);
        logger.info(byteBuffer);
        logger.info(customizerSizeBuffer);
      }
    
      /**
       * 向Buffer中写入数据
       */
      @Test
      public void writeBufferTest() {
        Buffer buffer = Buffer.buffer();
    
        // 向Buffer中写入数据
        buffer.appendString("string");
        buffer.appendBytes(new byte[]{1, 3, 5});
        buffer.appendDouble(1.3);
        buffer.appendFloat(1.2F);
        buffer.appendInt(1);
        buffer.appendLong(1);
        buffer.appendShort((short) 1);
    
        // 写入无符号数
        buffer.appendUnsignedByte((short) 1);
        buffer.appendUnsignedInt(1);
        buffer.appendUnsignedShort(1);
    
        // 还可以向Buffer中再写入一个Buffer
        buffer.appendBuffer(Buffer.buffer("append Buffer"));
        logger.info(buffer);
    
        // 再Buffer指定位置写入数据,将会替换原来位置上的值
        buffer.setString(6, "哇哈哈");
    //    buffer.setUnsignedInt(6, 1);
        logger.info(buffer);
        // 长度
        logger.info(buffer.length());
        // 拷贝
        logger.info(buffer.copy());
        // 裁剪[0,6)
        logger.info(buffer.slice(0, 6));
      }
    
      /**
       * 从Buffer中读取数据
       */
      @Test
      public void readerBufferTest() {
        Buffer buffer = Buffer.buffer("Hello Buffer");
        // Hello,读取开始位置到结束位置的数据[0, 5)
        logger.info(buffer.getString(0, 5));
    
        // 循环读取
        for (int i = 0; i < buffer.length(); i += 4) {
          logger.info("int value at " + i + " is: " + buffer.getString(i, i + 4));
        }
      }
    

    JSON

    vert.x内置了json模块,可以方便vert.x程序对json的处理,它内置的json工具类依赖于jackson。

    vert.x的JSON模块有两个工具类,JsonObject和JsonArray

    通过JsonObject的构造方法可以将字符串、map对象、buffer对象转成json对象

    如果要将自定义对象转成json类型,通过JsonObject.mapFrom(Object)方法,需要引入jackson-databind依赖,否则会抛出java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath异常

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.9</version>    
    </dependency>
    

    JsonArray对象的用法与JsonObject类似,详细demo参见JsonDemo测试类

    public class JsonDemo {
    
      Logger logger = LoggerFactory.getLogger(JsonDemo.class);
    
      /**
       * 其它类型转成json
       */
      @Test
      public void otherType2jsonObjectTest() {
        // 将字符串转换成json对象
        String jsonString = "{"foo":"bar"}";
        JsonObject strJson = new JsonObject(jsonString);
        logger.info(strJson);
    
        // 将map转换成json对象
        Map<String, Object> map = new HashMap<>();
        map.put("foo", "bar");
        map.put("xyz", 3);
        JsonObject mapJson = new JsonObject(map);
        logger.info(mapJson);
    
        // 将buffer转成json
        BufferImpl buffer = new BufferImpl();
        buffer.appendString("{"foo":"bar"}");
        JsonObject bufferJson = new JsonObject(buffer);
        logger.info(bufferJson);
    
        // 将自定义对象转换成json
        // 需要添加jackson-databind依赖,否则会发生java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
        User user = new User();
        user.setUsername("大哥哥");
        user.setAge(18);
        JsonObject customizerJson = JsonObject.mapFrom(user);
        logger.info(customizerJson);
      }
    
      /**
       * json转其它类型
       */
      @Test
      public void jsonObject2otherTypeTest() {
        JsonObject json = new JsonObject().put("username", "张嘎").put("age", 6);
        // json转自定义类型
        User user = json.mapTo(User.class);
        logger.info(user);
    
        // json转map类型
        Map<String, Object> map = json.getMap();
        logger.info(map);
    
        // json转string
        String str = json.toString();
        logger.info(str);
    
        // json转buffer
        Buffer buffer = json.toBuffer();
        logger.info(buffer);
      }
    
      /**
       * JsonArray的使用和JsonObject大体类似
       */
      @Test
      public void jsonArrayTest() {
        // 通过字符串创建 JSON 数组
        String jsonString = "["foo","bar"]";
        JsonArray array = new JsonArray(jsonString);
        logger.info(array);
    
        // 使用add方法给json数组添加数据
        JsonArray array2 = new JsonArray().add("a").add("b").add("c");
        logger.info(array2);
    
        // 获取第一个元素
        logger.info(array2.getString(0));
      }
    
    }
    

    运行阻塞式代码

    vert.x是异步处理请求的关键是因为所有任务都在Event Loop中被轮询处理,每个任务的处理效率几乎都是毫秒级的。异步的好处是可以更快的响应用户请求。如果已知某一个代码块是耗时操作,耗时操作会阻塞Event Loop中的其它任务,那么需要使用executeBlocking方法将这块会造成阻塞的代码块包起来。

    blocking.complete(obj)声明当前阻塞代码块已执行成功,然后将成功的结果作为传递给第二个回调函数,第二个回调函数通过result.result()即可获得。

    多个executeBlocking代码块会从上至下依次执行。

    详细例子:

    public class ProcessBlockCodeDemo extends AbstractVerticle {
    
      public static void main(String[] args) {
        Vertx.vertx().deployVerticle(ProcessBlockCodeDemo.class.getName());
      }
    
      @Override
      public void start() throws Exception {
        // 创建http server
        this.vertx.createHttpServer(
          new HttpServerOptions()
            .setPort(8080)
        ).requestHandler(request -> {
          Map<String, Object> map = new HashMap<>();
          vertx.executeBlocking(blocking -> {
            map.put("sex", "男");
            blocking.complete();
          });
          vertx.executeBlocking(blocking -> {
            try {
              TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
            map.put("name", "张三");
            map.put("age", 18);
            blocking.complete(map);
          }, result -> {
            JsonObject entries = new JsonObject((Map) result.result());
            request
              .response()
              .putHeader("Content-Type", "application/json;charset=UTF-8")
              .end(entries.toString());
          });
        }).listen();
      }
    }
    

    参考:https://vertxchina.github.io/vertx-translation-chinese/core/Core.html

  • 相关阅读:
    Get-CrmSetting返回Unable to connect to the remote server的解决办法
    Dynamics 365中的常用Associate和Disassociate消息汇总
    Dynamics 365 Customer Engagement V9 活动源功能报错的解决方法
    Dynamics Customer Engagement V9版本配置面向Internet的部署时候下一步按钮不可点击的解决办法
    Dynamics 365检查工作流、SDK插件步骤是否选中运行成功后自动删除系统作业记录
    注意,更改团队所属业务部门用Update消息无效!
    Dynamics 365的审核日志分区删除超时报错怎么办?
    Dynamics 365使用Execute Multiple Request删除系统作业实体记录
    Dynamics 365的系统作业实体记录增长太快怎么回事?
    Dynamics CRM日期字段查询使用时分秒的方法
  • 原文地址:https://www.cnblogs.com/dagger9527/p/12286808.html
Copyright © 2011-2022 走看看