zoukankan      html  css  js  c++  java
  • Vert.x中EventBus中的使用

    注意:使用的是vert.x3.0 仅支持到java8当中有一些lambda表达式。如不明确请自补java8新特性。

    The Event Bus

    event bus 是vert.x的神经系统。

    每个vert.x的实例都有一个单一的event bus 实例。它是使用vertx.eventBus()方法获得的。

    event bus 同意程序中的不同语言编写的模块进行通信。不论他们是同样的vert.x实例。还是不同的vert.x实例。

    它甚至能够桥接浏览器中执行的Javascript通信。

    event bus能够在分布式系统中的多个server节点之间进行点对点通信和多个浏览器。

    event bus支持公布/订阅模式。点对点模式,和请求/响应模式。

    event bus的API是很easy的。它主要包含注冊消息处理事件、取消处理事件、发送和公布消息。

    首先理论

    寻址

    event bus上的消息被发送到一个地址。

    vert.x不包括不论什么花哨的寻址方案。

    在vert.x中,一个地址就是一个简单的String字符串。不论什么字符串都是有效的。只是最好的方法是使用某种有计划或者有规则的方案,比方使用一个私有的空间名称。

    一些有參考价值的样例:europe.news.feed1, acme.games.pacman, sausages, and X。

    事件-消息的处理程序

    收到消息的处理程序,你在一个地址上注冊一个处理程序,来消息后将触发这个处理程序。

    同一个消息处理程序能够注冊到不同的地址上,相同同一个地址也能注冊多个处理程序。

    公布/订阅模式

    event bus 支持公布消息

    消息被公布到一个地址。公布意味着将消息交给全部订阅并注冊处理程序的地址来处理。

    这跟大家熟悉的公布/订阅模式没有什么不同。

    点对点和请求/响应模式

    event bus 支持点对点消息传递。

    消息被发送到一个地址。

    vert.x将发送它到一个注冊消息处理程序的地址。

    假设有多个处理程序注冊地址,vert.x将选择一个来处理(採用非严格循环算法)。

    强烈不推荐。

    当接收到消息的程序处理完毕后,能够决定是否回复。发送程序接到回复后也能够进行响应回复,假设他们这样做应答处理程序将被调用。

    当接收方到返回发送方。这样能够无限反复,这又是一种常见的消息传递模式:请求/响应模式

    最优传输

    vert.x可以做到最优传输。不会有意识的丢失消息。这是很重要的。

    然而,event bus的部分或所有失败还是有可能造成消息丢失的。

    假设你的应用程序很在乎消息的完整性和时序性。那么你的代码处理应该是幂等的。以便在消息处理程序复苏后又一次发送消息。

    消息类型

    开箱同意vert.x使用不论什么的原始/简单类型,字符串或者缓冲区发送消息。

    然而这里有一个不成文的规定或者说建议。那就是最好使用JSON格式的子串来进行消息的传递。

    JSON字串在全部的编程语言中都是很easy创建。读取和解析的。在vert.x下它已经变成一种通用语言了。

    假设你不是必需使用JSON或者说你不想。

    event bus 很灵活。

    它还支持发送随意对象。还能够定义您想要发送的对象的编解码器。

    EVENT BUS 的API

    让我们跳进event bus的API。

    获得event bus 的对象

    你能够通过例如以下代码获得event bus的单一对象:

    EventBus eb = vertx.eventBus();
    

    注冊处理事件

    使用以下这个简单方法注冊一个消费处理程序:

    /**
     * 接受来自msg.test地址的消息并处理,handler处理
     */
    vertx.eventBus().consumer("msg.test", handler -> {
    	Message msg = (Message) handler.body();
    	System.out.println(msg.getBody());
    
    	msg.setBody("消息已收到!!!这是反馈消息");
    
    	// 消息反馈
    	handler.reply(msg);
    });
    

    当一个消息到达,你的事件将被激活。并处理这个消息。

    consumer() 方法返回一个MessageConsumer的对象实例。这个对象随后用于注销处理程序,或者用处理程序作为流。

    然而您也能够使用consumer() 返回MessageConsumer没有处理程序,然后单独设置处理程序。比如:

    EventBus eb = vertx.eventBus();
    
    MessageConsumer<Message> consumer = eb.consumer("msg.test");
    consumer.handler(handler -> {
    	Message msg = (Message) handler.body();
    	System.out.println(msg.getBody());
    
    	msg.setBody("消息已收到!!!这是反馈消息");
    
    	// 消息反馈
    	handler.reply(msg);
    });
    

    当在集群事件总线上注冊一个处理程序时,它能够花一些时间登记到集群的全部节点上。

    假设你希望在注冊完毕时得到通知的话,你能够在MessageConsumer上注冊一个注冊完毕的处理程序:

    consumer.completionHandler(res -> {
    	if (res.succeeded()) {
    		System.out.println("The handler registration has reached allnodes");
    	} else {
    		System.out.println("Registration failed!");
    	}
    });
    

    注销处理事件

    去除处理事件。叫做注销。

    假设你是集群事件总线, 假设你想当这个过程完毕时通知注销,你能够使用以下的方法:

    consumer.unregister(res -> {
    	if (res.succeeded()) {
    		System.out.println("The handler un-registration has reached all nodes");
    	} else {
    		System.out.println("Un-registration failed!");
    	}
    });
    

    公布消息

    公布消息很easy。仅仅须要把它公布到指定地址就可以:

    eventBus.publish("msg.test","hello world");
    

    这一消息将被交付全部订阅msg.test地址处理。

    发送消息

    发送消息将导致仅仅有一个注冊地址的处理程序接收到消息(多个注冊地址也仅仅有一个能收到)。

    这就是点对点模式,选择处理程序的方法採用非严格循环方式。

    你可用用send()方法发送一条消息。

    eventBus.send("msg.test","hello world");
    

    未解决的指令包含在-include::override/eventbus_headers.adoc[]
    ==== The Message object

    你的消息处理程序收到的是一个Message。

    消息的body相应着是应该发送或是公布的消息内容。

    消息的headers是可用的。

    回复消息

    有时你发送消息后希望得到接收到消息的人的回复。

    这就须要你使用请求-响应模式。

    要做到这一点,在消息发送的时候,你能够指定一个回复事件。

    当你接收到消息的时候,你能够通过调用reply()方法来应答。

    当这一切发生的时候它会导致一个答复发送回发送方,发送方收到应答消息再做处理。

    接收方:

    vertx.eventBus().consumer("msg.test", handler -> {
    	Message msg = (Message) handler.body();
    	System.out.println(msg.getBody());
    
    	msg.setBody("消息已收到!!!这是反馈消息");
    
    	// 消息反馈
    	handler.reply(msg);
    });
    

    发送方:

    vertx.eventBus().send("msg.test", message, replyHandler -> {
    	Message msg = (Message) replyHandler.result().body();
    	System.out.println(msg.getBody());
    });
    

    相应答也能够做应答。这样你就能够在两个不同的程序中创建一个包括多个回合的对话。

    发送超时

    当你发送消息时和指定应答事件时你能够通过DeliveryOptions指定超时时间。

    假设应答事件不少于超时时间,这个应答事件将失败。

    默认的超时时间是30S。

    发送失败

    消息发送失败的其它原因,包含:

    没有可用的事件去发送消息

    接收者已经明白使用失败:失败的消息

    在全部情况下。应答事件将回复特定的失败。

    未解决的指令包括在 - include::override/eventbus.adoc[]==== Clustered Event Bus

    event bus 不只存在于一个单一的Vert.x实例中,在一个集群中不同的Vert.x实例也能够形成一个单一的,分布的事件总线。

    集群编程

    假设你创建一个Vert.x实例用于集群编程。你须要的得到一个关于集群事件总线配置

    VertxOptions options = new VertxOptions();
    
    Vertx.clusteredVertx(options, res -> {
    	if (res.succeeded()) {
    		Vertx vertx = res.result();
    		EventBus eventBus = vertx.eventBus();
    		System.out.println("We now have a clustered event bus: " + eventBus);
    	} else {
    		System.out.println("Failed: " + res.cause());
    	}
    });
    
  • 相关阅读:
    ASP.NET MVC Razor 视图引擎
    Asp.net MVC3 Razor语法小记
    @RenderPage用法
    余数算法
    Linux命令行下运行java.class文件
    Java学习---9.GUI编程
    Java学习---8.线程同步
    Java学习---7.多线程
    Java学习---6.常用的容器,流
    Java学习---5.数组
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11864014.html
Copyright © 2011-2022 走看看