zoukankan      html  css  js  c++  java
  • Vert.x核心包各功能模块详解

    源码:https://github.com/dagger9527/vertx_demo

    vert.x核心包是vertx-core。通常,只需要引入这个依赖就足以创建vert.x的http服务了。不过,vert.x为用户提供了更为强大的扩展模块,例如:vertx-web(创建http server,提供更强大的http服务),vertx-web-client(http客户端)。本章内容将围绕vertx-core模块做详细解释。

    首先引入vertx-core依赖

    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>4.0.0-milestone4</version>
    </dependency>
    

    quickstart模块已经介绍了如何去创建一个verticle实例,创建一个类去继承AbstractVerticle,在start方法中创建一个http服务,由verticle实例管理HTTP服务的生命周期。一个verticle就可以看作一个线程。可以创建多个verticle为用户提供服务。

    创建Vertx时配置参数

    VertxOptions对象使用

    VertxOptions的方法都会返回当前VertxOptions对象实例,所以可以像这样进行链式调用。

    Vertx vertx = Vertx.vertx(
        new VertxOptions()
        .setWorkerPoolSize(10)
        .setEventBusOptions(
            new EventBusOptions()
        )
    );
    

    AbstractVerticle类

    quickstart例子中可以看到,创建一个类通过继承AbstractVerticle,然后重写start方法来创建http服务,这样做可以让http服务的生命周期完全由verticle管理。

    AbstractVerticle类里面有两个属性,分别是Vertx和Context的对象,开发者在继承AbstractVerticle的子类里面可以直接使用这两个对象。

    例如vertx可以创建定时任务,向event bus中推送消息之类的事情。

    context可以存储公用数据,像servlet里面的HttpServletRequest、Session、ApplicationContext这些对象,在请求一个url时保存数据,在请求另一个url时再将数据取出来。

    看一个context对象的使用例子,在请求/路径,向context对象存放name=zhangsan数据。在请求/user拿到name值,并显示到页面上。

    public class AbstractVerticleDemo extends AbstractVerticle {
    
      private Logger logger = LoggerFactory.getLogger(AbstractVerticleDemo.class);
    
      public static void main(String[] args) {
        Vertx.vertx().deployVerticle(AbstractVerticleDemo.class.getName());
      }
    
      @Override
      public void start() throws Exception {
        // 创建一个http服务器,并监听8080端口
        vertx.createHttpServer(
          new HttpServerOptions()
            .setPort(8080)
        ).requestHandler(request -> {
          logger.info(request.path());
          String content = "<h1>Hello World</h1>";
          switch (request.path()) {
            case "/":
              context.put("name", "zhangsan");
              break;
            case "/user":
              content = "<h1>Hello World " + context.get("name") + "</h1>";
              break;
            default:
              break;
          }
          request
            .response()
            .putHeader("Content-Type", "text/html;charset=UTF-8")
            .end(content);
        }).listen();
        logger.info("MainVerticle Server Starting..");
      }
    }
    

    上面的例子展示了通过request.path()方法获取请求路径,通过判断路径来执行不同的处理逻辑。

    vertx-core并不是处理http请求专业的模块,后面vert.x提供了更专业的http server模块 -> vertx-web

    vertx创建定时任务

    每隔delay毫秒执行一次handler方法。

    long setPeriodic(long delay, Handler<Long> handler);
    

    delay毫秒后,只执行一次handler方法。

    long setTimer(long delay, Handler<Long> handler);
    

    handler方法的参数和setTimer、setPeriodic的返回值是这个定时任务的id,可以通过这个id来开启或取消这个任务。

    可以通过cancelTimer方法并传入定时任务的id来取消这个定时任务。

    boolean cancelTimer(long id);
    

    详细例子

    public class TimerTest {
    
      private static Logger logger = LoggerFactory.getLogger(VerticleDemo.class);
    
      public static void main(String[] args) {
        // 定时器
        timerTest();
      }
    
      /**
       * 间隔执行,执行多次
       */
      public static void timerTest() {
        Vertx vertx = Vertx.vertx();
        // 每隔1s执行一次,回调函数中的参数timerId和返回值timerId2是一样的,可以通过这个值关闭这个定时器
        long timerId2 = vertx.setPeriodic(1000, timerId -> {
          System.out.println("当前定时器id是: " + timerId);
        });
        logger.info(timerId2);
    
        // 情况和setPeriodic方法类似,只不过setTimer方法只执行一次
        long timerId3 = vertx.setTimer(2000, timerId -> {
          // 关闭上面setPeriodic的定时任务
          vertx.cancelTimer(timerId2);
          // 关闭服务,不然控制台会一直等待
          vertx.close();
        });
    
      }
    
    }
    

    Event Bus

    可以在event bus发送消息,进行消息通信。

    发送的消息类型可以是基本类型、String、Buffer、json。

    注册消息地址,第一个参数是注册地址,可通过send方法向这个地址上发送消息。第二个参数是回调参数,接收send方法发送的消息后,handler方法会被执行。

    <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
    

    向地址发送消息

    EventBus send(String address, @Nullable Object message);
    

    简单的demo

    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();
    // 注册消费者,地址是local.message.address
    MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
      // 打印消息内容
      logger.info(message.body());
    });
    
    /*
     * Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
     * 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
     */
    // 向local.message.address发送一条字符串消息
    eventBus.send("local.message.address", "First Message");
    

    发送buffer,json及自定义对象的例子在github上,可以通过下载源码看见。

    public class EventBusDemo {
    
      private Logger logger = LoggerFactory.getLogger(EventBusDemo.class.getName());
    
      /**
       * send方法向消息消费者发送消息
       * send方法发送的消息只会传递给在该地址注册的其中一个处理器,这就是点对点模式。Vert.x 使用不严格的轮询算法来选择绑定的处理器。
       */
      @Test
      public void sendMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
        // 注册消费者,地址是local.message.address
        MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
          // 打印消息内容
          logger.info(message.body());
        });
    
        /*
         * Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
         * 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
         */
        // 向local.message.address发送一条字符串消息
        eventBus.send("local.message.address", "First Message");
        // 发送Buffer类型消息
        BufferImpl buffer = new BufferImpl();
        buffer.appendString("vert.x 你好?");
        eventBus.send("local.message.address", buffer);
        // 发送一条json类型消息
        eventBus.send("local.message.address", new JsonObject().put("username", "张三").put("age", 18));
        // 向向local.message.address发送一条自定义对象消息,需要自定义消息解码器,实现MessageCodec
        // 注册自定义消息解码器
        CousumerMessageCodec cousumerMessageCodec = new CousumerMessageCodec();
        eventBus.registerCodec(cousumerMessageCodec);
    
        CousumerSendMessage cousumerSendMessage = new CousumerSendMessage();
        cousumerSendMessage.setUsername("大嘎嘎");
        cousumerSendMessage.setAge(18);
        // 发送自定义消息时,需要指定解码器
        eventBus.send("local.message.address", cousumerSendMessage, new DeliveryOptions().setCodecName(cousumerMessageCodec.name()));
    
        // 注销自定义消息解码器
        eventBus.unregisterCodec(cousumerMessageCodec.name());
    
        // 因为eventBus.consumer中的回调方法是异步的,只有再发送消息时回调函数才会触发,所以这里让主程序先等待一会儿,不然程序马上就会关闭,看不到回调函数的输出结果
        Thread.sleep(1000);
    
        // 注销消费处理器
        consumer.unregister(result -> {
          logger.info("消费者是否注销成功:" + result.succeeded());
        });
      }
    
      /**
       * publish方法向消息消费者发送消息
       * 消息将会传递给所有在地址 local.message.address 上注册过的处理器。
       */
      @Test
      public void publishMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
        // 接收send方法的消费者
        MessageConsumer<Object> sendConsumer = eventBus.consumer("send.message.address", message -> {
          logger.info("Send1:" + message.body());
        });
        MessageConsumer<Object> sendConsumer2 = eventBus.consumer("send.message.address", message -> {
          logger.info("Send2:" + message.body());
        });
    
        // 接收publish方法的消费者
        MessageConsumer<Object> publishConsumer = eventBus.consumer("publish.message.address", message -> {
          logger.info("Publish1:" + message.body());
        });
        MessageConsumer<Object> publishConsumer2 = eventBus.consumer("publish.message.address", message -> {
          logger.info("Publish2:" + message.body());
        });
    
        // send方法只会给其中一个在send.message.address地址上注册过的处理器发送消息
        eventBus.send("send.message.address", "Send Message");
        // publish方法会将消息发送给所有在publish.message.address上注册的处理器
        eventBus.publish("publish.message.address", "Publish Message");
    
        Thread.sleep(1000);
      }
    
      /**
       * 设置消息头
       */
      @Test
      public void headerTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info(message.headers());
          logger.info(message.headers().get("some-header"));
        });
    
        // 设置头信息
        DeliveryOptions options = new DeliveryOptions();
        options.addHeader("some-header", "some-value");
        // 还可以设置消息的超时时间,如果在这个时间内没有收到应答,则会以失败为参数调用应答处理器,默认30s
        options.setSendTimeout(1000 * 30);
    
        eventBus.send("message.address", "Yay! Someone kicked a ball", options);
    
        TimeUnit.SECONDS.sleep(1);
      }
    
      /**
       * 回复消息
       */
      @Test
      public void replyMessageTest() throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        EventBus eventBus = vertx.eventBus();
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info("handler1: " + message.body());
          message.reply("reply message");
        });
    
        eventBus.consumer("message.address", message -> {
          // 通过headers方法获取头信息
          logger.info("hander2: " + message.body());
          message.reply("reply message");
        });
    
        // 想接收回复的消息请用request方法,注意request只给其中一个在相同地址上注册过的处理器发送消息
        eventBus.request("message.address", "Yay! Someone kicked a ball", message -> {
          logger.info(message.result().body());
        });
    
        TimeUnit.SECONDS.sleep(1);
      }
    
      /**
       * 集群模式启动EventBus
       */
      @Test
      public void clusterEventBus() {
        VertxOptions options = new VertxOptions();
    
        // 对EventBus的一些配置
        options.setEventBusOptions(
          new EventBusOptions()
            .setSsl(true)
            .setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
            .setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
            .setClientAuth(ClientAuth.REQUIRED)
            .setClusterPublicHost("whatever")
            .setClusterPublicPort(1234)
        );
    
        // 创建集群模式的Verticle
        Vertx.clusteredVertx(options, res -> {
          if (res.succeeded()) {
            Vertx vertx = res.result();
            // 获取集群模式下的EventBus
            EventBus eventBus = vertx.eventBus();
            System.out.println("We now have a clustered event bus: " + eventBus);
          } else {
            System.out.println("Failed: " + res.cause());
          }
        });
      }
    }
    

    注*:如果是junit测试方法,vertx线程不会被阻塞,所以会出现已经向eventbus发送了消息,消息处理端还没接收到消息程序便关闭了。

    如果用的是main方法测试,则不会有这个问题。

    Buffer

    可以通过Buffer.buffer()创建一个buffer对象。这时创建的buffer对象大小默认是0且内容为空,在往buffer里添加内容时buffer会自动扩容。当然也可以指定buffer对象的默认值和默认大小。如果一开始就知道buffer会有一定的内容的话,推荐在一开始便指定buffer的初始值或大小,这样可以避免在添加的时候又动态扩容buffer而造成不必要的资源消耗。

    // 指定初始大小
    static Buffer buffer(int initialSizeHint);
    // 初始值
    static Buffer buffer(String string);
    // 初始值和编码
    static Buffer buffer(String string, String enc);
    // 初始内容,用bytes数组表示
    static Buffer buffer(byte[] bytes);
    // 初始内容用ByteBuf表示
    static Buffer buffer(ByteBuf byteBuf);
    

    详细例子请参见BufferDemo测试类

    public class BufferDemo {
    
      Logger logger = LoggerFactory.getLogger(BufferDemo.class);
    
      /**
       * 创建Buffer对象
       */
      @Test
      public void createBufferTest() {
        // 通过静态方法创建Buffer对象
        Buffer emptyBuffer = Buffer.buffer();
        emptyBuffer.appendString("Hello Buffer");
        // 创建带有初始值的Buffer对象,字符默认以UTF-8编码
        Buffer helloBuffer = Buffer.buffer("Hello Buffer");
        // 创建初始值,并指定编码
        Buffer encodingBuffer = Buffer.buffer("哈哈哈", "UTF-8");
    
        // 通过字节数组创建Buffer
        byte[] bytes = new byte[]{1, 3, 5};
        Buffer byteBuffer = Buffer.buffer(bytes);
    
        // 创建一个指定大小的Buffer,创建时使得这个buffer被分配到了更多的内存,比数据写入时再动态的调整内存大小效率要高的多
        Buffer customizerSizeBuffer = Buffer.buffer(4);
        customizerSizeBuffer.appendString("Hello Buffer");
    
        logger.info(emptyBuffer);
        logger.info(helloBuffer);
        logger.info(encodingBuffer);
        logger.info(byteBuffer);
        logger.info(customizerSizeBuffer);
      }
    
      /**
       * 向Buffer中写入数据
       */
      @Test
      public void writeBufferTest() {
        Buffer buffer = Buffer.buffer();
    
        // 向Buffer中写入数据
        buffer.appendString("string");
        buffer.appendBytes(new byte[]{1, 3, 5});
        buffer.appendDouble(1.3);
        buffer.appendFloat(1.2F);
        buffer.appendInt(1);
        buffer.appendLong(1);
        buffer.appendShort((short) 1);
    
        // 写入无符号数
        buffer.appendUnsignedByte((short) 1);
        buffer.appendUnsignedInt(1);
        buffer.appendUnsignedShort(1);
    
        // 还可以向Buffer中再写入一个Buffer
        buffer.appendBuffer(Buffer.buffer("append Buffer"));
        logger.info(buffer);
    
        // 再Buffer指定位置写入数据,将会替换原来位置上的值
        buffer.setString(6, "哇哈哈");
    //    buffer.setUnsignedInt(6, 1);
        logger.info(buffer);
        // 长度
        logger.info(buffer.length());
        // 拷贝
        logger.info(buffer.copy());
        // 裁剪[0,6)
        logger.info(buffer.slice(0, 6));
      }
    
      /**
       * 从Buffer中读取数据
       */
      @Test
      public void readerBufferTest() {
        Buffer buffer = Buffer.buffer("Hello Buffer");
        // Hello,读取开始位置到结束位置的数据[0, 5)
        logger.info(buffer.getString(0, 5));
    
        // 循环读取
        for (int i = 0; i < buffer.length(); i += 4) {
          logger.info("int value at " + i + " is: " + buffer.getString(i, i + 4));
        }
      }
    

    JSON

    vert.x内置了json模块,可以方便vert.x程序对json的处理,它内置的json工具类依赖于jackson。

    vert.x的JSON模块有两个工具类,JsonObject和JsonArray

    通过JsonObject的构造方法可以将字符串、map对象、buffer对象转成json对象

    如果要将自定义对象转成json类型,通过JsonObject.mapFrom(Object)方法,需要引入jackson-databind依赖,否则会抛出java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath异常

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.9</version>    
    </dependency>
    

    JsonArray对象的用法与JsonObject类似,详细demo参见JsonDemo测试类

    public class JsonDemo {
    
      Logger logger = LoggerFactory.getLogger(JsonDemo.class);
    
      /**
       * 其它类型转成json
       */
      @Test
      public void otherType2jsonObjectTest() {
        // 将字符串转换成json对象
        String jsonString = "{"foo":"bar"}";
        JsonObject strJson = new JsonObject(jsonString);
        logger.info(strJson);
    
        // 将map转换成json对象
        Map<String, Object> map = new HashMap<>();
        map.put("foo", "bar");
        map.put("xyz", 3);
        JsonObject mapJson = new JsonObject(map);
        logger.info(mapJson);
    
        // 将buffer转成json
        BufferImpl buffer = new BufferImpl();
        buffer.appendString("{"foo":"bar"}");
        JsonObject bufferJson = new JsonObject(buffer);
        logger.info(bufferJson);
    
        // 将自定义对象转换成json
        // 需要添加jackson-databind依赖,否则会发生java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
        User user = new User();
        user.setUsername("大哥哥");
        user.setAge(18);
        JsonObject customizerJson = JsonObject.mapFrom(user);
        logger.info(customizerJson);
      }
    
      /**
       * json转其它类型
       */
      @Test
      public void jsonObject2otherTypeTest() {
        JsonObject json = new JsonObject().put("username", "张嘎").put("age", 6);
        // json转自定义类型
        User user = json.mapTo(User.class);
        logger.info(user);
    
        // json转map类型
        Map<String, Object> map = json.getMap();
        logger.info(map);
    
        // json转string
        String str = json.toString();
        logger.info(str);
    
        // json转buffer
        Buffer buffer = json.toBuffer();
        logger.info(buffer);
      }
    
      /**
       * JsonArray的使用和JsonObject大体类似
       */
      @Test
      public void jsonArrayTest() {
        // 通过字符串创建 JSON 数组
        String jsonString = "["foo","bar"]";
        JsonArray array = new JsonArray(jsonString);
        logger.info(array);
    
        // 使用add方法给json数组添加数据
        JsonArray array2 = new JsonArray().add("a").add("b").add("c");
        logger.info(array2);
    
        // 获取第一个元素
        logger.info(array2.getString(0));
      }
    
    }
    

    运行阻塞式代码

    vert.x是异步处理请求的关键是因为所有任务都在Event Loop中被轮询处理,每个任务的处理效率几乎都是毫秒级的。异步的好处是可以更快的响应用户请求。如果已知某一个代码块是耗时操作,耗时操作会阻塞Event Loop中的其它任务,那么需要使用executeBlocking方法将这块会造成阻塞的代码块包起来。

    blocking.complete(obj)声明当前阻塞代码块已执行成功,然后将成功的结果作为传递给第二个回调函数,第二个回调函数通过result.result()即可获得。

    多个executeBlocking代码块会从上至下依次执行。

    详细例子:

    public class ProcessBlockCodeDemo extends AbstractVerticle {
    
      public static void main(String[] args) {
        Vertx.vertx().deployVerticle(ProcessBlockCodeDemo.class.getName());
      }
    
      @Override
      public void start() throws Exception {
        // 创建http server
        this.vertx.createHttpServer(
          new HttpServerOptions()
            .setPort(8080)
        ).requestHandler(request -> {
          Map<String, Object> map = new HashMap<>();
          vertx.executeBlocking(blocking -> {
            map.put("sex", "男");
            blocking.complete();
          });
          vertx.executeBlocking(blocking -> {
            try {
              TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
            map.put("name", "张三");
            map.put("age", 18);
            blocking.complete(map);
          }, result -> {
            JsonObject entries = new JsonObject((Map) result.result());
            request
              .response()
              .putHeader("Content-Type", "application/json;charset=UTF-8")
              .end(entries.toString());
          });
        }).listen();
      }
    }
    

    参考:https://vertxchina.github.io/vertx-translation-chinese/core/Core.html

  • 相关阅读:
    [posix]Posix多线程编程
    [Makefile]多文件的通用Makefile
    表格花式效果
    JavaScript实现按键精灵
    JavaScript中几个相似方法对比
    谨慎能捕千秋蝉(三)——界面操作劫持与HTML5安全
    谨慎能捕千秋蝉(二)——CSRF
    日月如梭,玩转JavaScript日期
    Wireshark网络抓包(四)——工具
    Wireshark网络抓包(三)——网络协议
  • 原文地址:https://www.cnblogs.com/dagger9527/p/12286808.html
Copyright © 2011-2022 走看看