源码:https://github.com/dagger9527/vertx_demo
vert.x核心包是vertx-core
。通常,只需要引入这个依赖就足以创建vert.x的http服务了。不过,vert.x为用户提供了更为强大的扩展模块,例如:vertx-web
(创建http server,提供更强大的http服务),vertx-web-client
(http客户端)。本章内容将围绕vertx-core模块做详细解释。
首先引入vertx-core依赖
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.0.0-milestone4</version>
</dependency>
在quickstart
模块已经介绍了如何去创建一个verticle实例,创建一个类去继承AbstractVerticle,在start方法中创建一个http服务,由verticle实例管理HTTP服务的生命周期。一个verticle就可以看作一个线程。可以创建多个verticle为用户提供服务。
创建Vertx时配置参数
VertxOptions
对象使用
VertxOptions
的方法都会返回当前VertxOptions
对象实例,所以可以像这样进行链式调用。
Vertx vertx = Vertx.vertx(
new VertxOptions()
.setWorkerPoolSize(10)
.setEventBusOptions(
new EventBusOptions()
)
);
AbstractVerticle类
在quickstart
例子中可以看到,创建一个类通过继承AbstractVerticle,然后重写start方法来创建http服务,这样做可以让http服务的生命周期完全由verticle管理。
AbstractVerticle类里面有两个属性,分别是Vertx和Context的对象,开发者在继承AbstractVerticle的子类里面可以直接使用这两个对象。
例如vertx可以创建定时任务,向event bus中推送消息之类的事情。
context可以存储公用数据,像servlet里面的HttpServletRequest、Session、ApplicationContext这些对象,在请求一个url时保存数据,在请求另一个url时再将数据取出来。
看一个context对象的使用例子,在请求/
路径,向context对象存放name=zhangsan数据。在请求/user
拿到name值,并显示到页面上。
public class AbstractVerticleDemo extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(AbstractVerticleDemo.class);
public static void main(String[] args) {
Vertx.vertx().deployVerticle(AbstractVerticleDemo.class.getName());
}
@Override
public void start() throws Exception {
// 创建一个http服务器,并监听8080端口
vertx.createHttpServer(
new HttpServerOptions()
.setPort(8080)
).requestHandler(request -> {
logger.info(request.path());
String content = "<h1>Hello World</h1>";
switch (request.path()) {
case "/":
context.put("name", "zhangsan");
break;
case "/user":
content = "<h1>Hello World " + context.get("name") + "</h1>";
break;
default:
break;
}
request
.response()
.putHeader("Content-Type", "text/html;charset=UTF-8")
.end(content);
}).listen();
logger.info("MainVerticle Server Starting..");
}
}
上面的例子展示了通过request.path()
方法获取请求路径,通过判断路径来执行不同的处理逻辑。
vertx-core并不是处理http请求专业的模块,后面vert.x提供了更专业的http server模块 -> vertx-web
vertx创建定时任务
每隔delay毫秒执行一次handler方法。
long setPeriodic(long delay, Handler<Long> handler);
delay毫秒后,只执行一次handler方法。
long setTimer(long delay, Handler<Long> handler);
handler方法的参数和setTimer、setPeriodic的返回值是这个定时任务的id,可以通过这个id来开启或取消这个任务。
可以通过cancelTimer方法并传入定时任务的id来取消这个定时任务。
boolean cancelTimer(long id);
详细例子
public class TimerTest {
private static Logger logger = LoggerFactory.getLogger(VerticleDemo.class);
public static void main(String[] args) {
// 定时器
timerTest();
}
/**
* 间隔执行,执行多次
*/
public static void timerTest() {
Vertx vertx = Vertx.vertx();
// 每隔1s执行一次,回调函数中的参数timerId和返回值timerId2是一样的,可以通过这个值关闭这个定时器
long timerId2 = vertx.setPeriodic(1000, timerId -> {
System.out.println("当前定时器id是: " + timerId);
});
logger.info(timerId2);
// 情况和setPeriodic方法类似,只不过setTimer方法只执行一次
long timerId3 = vertx.setTimer(2000, timerId -> {
// 关闭上面setPeriodic的定时任务
vertx.cancelTimer(timerId2);
// 关闭服务,不然控制台会一直等待
vertx.close();
});
}
}
Event Bus
可以在event bus发送消息,进行消息通信。
发送的消息类型可以是基本类型、String、Buffer、json。
注册消息地址,第一个参数是注册地址,可通过send方法向这个地址上发送消息。第二个参数是回调参数,接收send方法发送的消息后,handler方法会被执行。
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
向地址发送消息
EventBus send(String address, @Nullable Object message);
简单的demo
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 注册消费者,地址是local.message.address
MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
// 打印消息内容
logger.info(message.body());
});
/*
* Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
* 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
*/
// 向local.message.address发送一条字符串消息
eventBus.send("local.message.address", "First Message");
发送buffer,json及自定义对象的例子在github上,可以通过下载源码看见。
public class EventBusDemo {
private Logger logger = LoggerFactory.getLogger(EventBusDemo.class.getName());
/**
* send方法向消息消费者发送消息
* send方法发送的消息只会传递给在该地址注册的其中一个处理器,这就是点对点模式。Vert.x 使用不严格的轮询算法来选择绑定的处理器。
*/
@Test
public void sendMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 注册消费者,地址是local.message.address
MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
// 打印消息内容
logger.info(message.body());
});
/*
* Vert.x 默认允许任何基本/简单类型、String 或 Buffer 作为消息发送。
* 不过在 Vert.x 中的通常做法是使用 JSON 格式来发送消息。
*/
// 向local.message.address发送一条字符串消息
eventBus.send("local.message.address", "First Message");
// 发送Buffer类型消息
BufferImpl buffer = new BufferImpl();
buffer.appendString("vert.x 你好?");
eventBus.send("local.message.address", buffer);
// 发送一条json类型消息
eventBus.send("local.message.address", new JsonObject().put("username", "张三").put("age", 18));
// 向向local.message.address发送一条自定义对象消息,需要自定义消息解码器,实现MessageCodec
// 注册自定义消息解码器
CousumerMessageCodec cousumerMessageCodec = new CousumerMessageCodec();
eventBus.registerCodec(cousumerMessageCodec);
CousumerSendMessage cousumerSendMessage = new CousumerSendMessage();
cousumerSendMessage.setUsername("大嘎嘎");
cousumerSendMessage.setAge(18);
// 发送自定义消息时,需要指定解码器
eventBus.send("local.message.address", cousumerSendMessage, new DeliveryOptions().setCodecName(cousumerMessageCodec.name()));
// 注销自定义消息解码器
eventBus.unregisterCodec(cousumerMessageCodec.name());
// 因为eventBus.consumer中的回调方法是异步的,只有再发送消息时回调函数才会触发,所以这里让主程序先等待一会儿,不然程序马上就会关闭,看不到回调函数的输出结果
Thread.sleep(1000);
// 注销消费处理器
consumer.unregister(result -> {
logger.info("消费者是否注销成功:" + result.succeeded());
});
}
/**
* publish方法向消息消费者发送消息
* 消息将会传递给所有在地址 local.message.address 上注册过的处理器。
*/
@Test
public void publishMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 接收send方法的消费者
MessageConsumer<Object> sendConsumer = eventBus.consumer("send.message.address", message -> {
logger.info("Send1:" + message.body());
});
MessageConsumer<Object> sendConsumer2 = eventBus.consumer("send.message.address", message -> {
logger.info("Send2:" + message.body());
});
// 接收publish方法的消费者
MessageConsumer<Object> publishConsumer = eventBus.consumer("publish.message.address", message -> {
logger.info("Publish1:" + message.body());
});
MessageConsumer<Object> publishConsumer2 = eventBus.consumer("publish.message.address", message -> {
logger.info("Publish2:" + message.body());
});
// send方法只会给其中一个在send.message.address地址上注册过的处理器发送消息
eventBus.send("send.message.address", "Send Message");
// publish方法会将消息发送给所有在publish.message.address上注册的处理器
eventBus.publish("publish.message.address", "Publish Message");
Thread.sleep(1000);
}
/**
* 设置消息头
*/
@Test
public void headerTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
eventBus.consumer("message.address", message -> {
// 通过headers方法获取头信息
logger.info(message.headers());
logger.info(message.headers().get("some-header"));
});
// 设置头信息
DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
// 还可以设置消息的超时时间,如果在这个时间内没有收到应答,则会以失败为参数调用应答处理器,默认30s
options.setSendTimeout(1000 * 30);
eventBus.send("message.address", "Yay! Someone kicked a ball", options);
TimeUnit.SECONDS.sleep(1);
}
/**
* 回复消息
*/
@Test
public void replyMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
eventBus.consumer("message.address", message -> {
// 通过headers方法获取头信息
logger.info("handler1: " + message.body());
message.reply("reply message");
});
eventBus.consumer("message.address", message -> {
// 通过headers方法获取头信息
logger.info("hander2: " + message.body());
message.reply("reply message");
});
// 想接收回复的消息请用request方法,注意request只给其中一个在相同地址上注册过的处理器发送消息
eventBus.request("message.address", "Yay! Someone kicked a ball", message -> {
logger.info(message.result().body());
});
TimeUnit.SECONDS.sleep(1);
}
/**
* 集群模式启动EventBus
*/
@Test
public void clusterEventBus() {
VertxOptions options = new VertxOptions();
// 对EventBus的一些配置
options.setEventBusOptions(
new EventBusOptions()
.setSsl(true)
.setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setClientAuth(ClientAuth.REQUIRED)
.setClusterPublicHost("whatever")
.setClusterPublicPort(1234)
);
// 创建集群模式的Verticle
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
// 获取集群模式下的EventBus
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});
}
}
注*:如果是junit测试方法,vertx线程不会被阻塞,所以会出现已经向eventbus发送了消息,消息处理端还没接收到消息程序便关闭了。
如果用的是main方法测试,则不会有这个问题。
Buffer
可以通过Buffer.buffer()
创建一个buffer对象。这时创建的buffer对象大小默认是0且内容为空,在往buffer里添加内容时buffer会自动扩容。当然也可以指定buffer对象的默认值和默认大小。如果一开始就知道buffer会有一定的内容的话,推荐在一开始便指定buffer的初始值或大小,这样可以避免在添加的时候又动态扩容buffer而造成不必要的资源消耗。
// 指定初始大小
static Buffer buffer(int initialSizeHint);
// 初始值
static Buffer buffer(String string);
// 初始值和编码
static Buffer buffer(String string, String enc);
// 初始内容,用bytes数组表示
static Buffer buffer(byte[] bytes);
// 初始内容用ByteBuf表示
static Buffer buffer(ByteBuf byteBuf);
详细例子请参见BufferDemo测试类
public class BufferDemo {
Logger logger = LoggerFactory.getLogger(BufferDemo.class);
/**
* 创建Buffer对象
*/
@Test
public void createBufferTest() {
// 通过静态方法创建Buffer对象
Buffer emptyBuffer = Buffer.buffer();
emptyBuffer.appendString("Hello Buffer");
// 创建带有初始值的Buffer对象,字符默认以UTF-8编码
Buffer helloBuffer = Buffer.buffer("Hello Buffer");
// 创建初始值,并指定编码
Buffer encodingBuffer = Buffer.buffer("哈哈哈", "UTF-8");
// 通过字节数组创建Buffer
byte[] bytes = new byte[]{1, 3, 5};
Buffer byteBuffer = Buffer.buffer(bytes);
// 创建一个指定大小的Buffer,创建时使得这个buffer被分配到了更多的内存,比数据写入时再动态的调整内存大小效率要高的多
Buffer customizerSizeBuffer = Buffer.buffer(4);
customizerSizeBuffer.appendString("Hello Buffer");
logger.info(emptyBuffer);
logger.info(helloBuffer);
logger.info(encodingBuffer);
logger.info(byteBuffer);
logger.info(customizerSizeBuffer);
}
/**
* 向Buffer中写入数据
*/
@Test
public void writeBufferTest() {
Buffer buffer = Buffer.buffer();
// 向Buffer中写入数据
buffer.appendString("string");
buffer.appendBytes(new byte[]{1, 3, 5});
buffer.appendDouble(1.3);
buffer.appendFloat(1.2F);
buffer.appendInt(1);
buffer.appendLong(1);
buffer.appendShort((short) 1);
// 写入无符号数
buffer.appendUnsignedByte((short) 1);
buffer.appendUnsignedInt(1);
buffer.appendUnsignedShort(1);
// 还可以向Buffer中再写入一个Buffer
buffer.appendBuffer(Buffer.buffer("append Buffer"));
logger.info(buffer);
// 再Buffer指定位置写入数据,将会替换原来位置上的值
buffer.setString(6, "哇哈哈");
// buffer.setUnsignedInt(6, 1);
logger.info(buffer);
// 长度
logger.info(buffer.length());
// 拷贝
logger.info(buffer.copy());
// 裁剪[0,6)
logger.info(buffer.slice(0, 6));
}
/**
* 从Buffer中读取数据
*/
@Test
public void readerBufferTest() {
Buffer buffer = Buffer.buffer("Hello Buffer");
// Hello,读取开始位置到结束位置的数据[0, 5)
logger.info(buffer.getString(0, 5));
// 循环读取
for (int i = 0; i < buffer.length(); i += 4) {
logger.info("int value at " + i + " is: " + buffer.getString(i, i + 4));
}
}
JSON
vert.x内置了json模块,可以方便vert.x程序对json的处理,它内置的json工具类依赖于jackson。
vert.x的JSON模块有两个工具类,JsonObject和JsonArray
通过JsonObject
的构造方法可以将字符串、map对象、buffer对象转成json对象
如果要将自定义对象转成json类型,通过JsonObject.mapFrom(Object)
方法,需要引入jackson-databind依赖,否则会抛出java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
异常
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
JsonArray对象的用法与JsonObject类似,详细demo参见JsonDemo测试类
public class JsonDemo {
Logger logger = LoggerFactory.getLogger(JsonDemo.class);
/**
* 其它类型转成json
*/
@Test
public void otherType2jsonObjectTest() {
// 将字符串转换成json对象
String jsonString = "{"foo":"bar"}";
JsonObject strJson = new JsonObject(jsonString);
logger.info(strJson);
// 将map转换成json对象
Map<String, Object> map = new HashMap<>();
map.put("foo", "bar");
map.put("xyz", 3);
JsonObject mapJson = new JsonObject(map);
logger.info(mapJson);
// 将buffer转成json
BufferImpl buffer = new BufferImpl();
buffer.appendString("{"foo":"bar"}");
JsonObject bufferJson = new JsonObject(buffer);
logger.info(bufferJson);
// 将自定义对象转换成json
// 需要添加jackson-databind依赖,否则会发生java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
User user = new User();
user.setUsername("大哥哥");
user.setAge(18);
JsonObject customizerJson = JsonObject.mapFrom(user);
logger.info(customizerJson);
}
/**
* json转其它类型
*/
@Test
public void jsonObject2otherTypeTest() {
JsonObject json = new JsonObject().put("username", "张嘎").put("age", 6);
// json转自定义类型
User user = json.mapTo(User.class);
logger.info(user);
// json转map类型
Map<String, Object> map = json.getMap();
logger.info(map);
// json转string
String str = json.toString();
logger.info(str);
// json转buffer
Buffer buffer = json.toBuffer();
logger.info(buffer);
}
/**
* JsonArray的使用和JsonObject大体类似
*/
@Test
public void jsonArrayTest() {
// 通过字符串创建 JSON 数组
String jsonString = "["foo","bar"]";
JsonArray array = new JsonArray(jsonString);
logger.info(array);
// 使用add方法给json数组添加数据
JsonArray array2 = new JsonArray().add("a").add("b").add("c");
logger.info(array2);
// 获取第一个元素
logger.info(array2.getString(0));
}
}
运行阻塞式代码
vert.x是异步处理请求的关键是因为所有任务都在Event Loop中被轮询处理,每个任务的处理效率几乎都是毫秒级的。异步的好处是可以更快的响应用户请求。如果已知某一个代码块是耗时操作,耗时操作会阻塞Event Loop中的其它任务,那么需要使用executeBlocking
方法将这块会造成阻塞的代码块包起来。
blocking.complete(obj)
声明当前阻塞代码块已执行成功,然后将成功的结果作为传递给第二个回调函数,第二个回调函数通过result.result()
即可获得。
多个executeBlocking代码块会从上至下依次执行。
详细例子:
public class ProcessBlockCodeDemo extends AbstractVerticle {
public static void main(String[] args) {
Vertx.vertx().deployVerticle(ProcessBlockCodeDemo.class.getName());
}
@Override
public void start() throws Exception {
// 创建http server
this.vertx.createHttpServer(
new HttpServerOptions()
.setPort(8080)
).requestHandler(request -> {
Map<String, Object> map = new HashMap<>();
vertx.executeBlocking(blocking -> {
map.put("sex", "男");
blocking.complete();
});
vertx.executeBlocking(blocking -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put("name", "张三");
map.put("age", 18);
blocking.complete(map);
}, result -> {
JsonObject entries = new JsonObject((Map) result.result());
request
.response()
.putHeader("Content-Type", "application/json;charset=UTF-8")
.end(entries.toString());
});
}).listen();
}
}
参考:https://vertxchina.github.io/vertx-translation-chinese/core/Core.html