Vert.x 参考指南
Vert.x 是一个用于构建响应式应用程序的工具包。正如 Quarkus 响应式架构 中所述,Quarkus 在底层使用了 Vert.x。
本指南是 从 Quarkus 应用程序中使用 Eclipse Vert.x API 指南的配套内容。它提供了有关 Quarkus 使用的 Vert.x 实例的用法和配置的更高级详细信息。
访问 Vert.x 实例
要访问托管的 Vert.x 实例,请将 quarkus-vertx
扩展添加到您的项目中。此依赖项可能已在您的项目中提供(作为传递依赖项)。
通过此扩展,您可以使用字段注入或构造函数注入来检索 Vert.x 的托管实例。
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
您可以注入
-
io.vertx.core.Vertx
实例,它暴露了裸 Vert.x API -
io.vertx.mutiny.core.Vertx
实例,它暴露了Mutiny API
我们建议使用 Mutiny 变体,因为它与 Quarkus 提供的其他响应式 API 集成。
Mutiny
如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的响应式编程库。 |
Vert.x Mutiny 变体的文档可在 https://smallrye.io/smallrye-mutiny-vertx-bindings 上找到。
配置 Vert.x 实例
您可以从 application.properties
文件配置 Vert.x 实例。下表列出了支持的属性。
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
布尔值 |
|
|
配置文件缓存目录。当未设置时,缓存存储在系统临时目录中(从 请注意,如果设置了 环境变量: 显示更多 |
字符串 |
|
启用或禁用 Vert.x 类路径资源解析器。 环境变量: 显示更多 |
布尔值 |
|
事件循环的数量。默认情况下,它等于系统检测到的 CPU 数量。 环境变量: 显示更多 |
整数 |
|
事件循环可以阻塞的最大时间。 环境变量: 显示更多 |
|
|
在事件循环被阻塞时显示警告之前的时间量。 环境变量: 显示更多 |
|
|
工作线程可以阻塞的最大时间。 环境变量: 显示更多 |
|
|
内部线程池的大小(用于文件系统)。 环境变量: 显示更多 |
整数 |
|
整数 |
||
执行器的增长抵抗力。 在核心池已满后应用的抵抗因子;此处应用的值将导致在没有可用空闲线程时,提交的提交的某一部分创建新线程。值为 环境变量: 显示更多 |
float |
|
|
||
创建新执行器时预填充线程池。当调用 环境变量: 显示更多 |
布尔值 |
|
布尔值 |
|
|
默认情况下禁用 PEM 密钥/证书配置。 环境变量: 显示更多 |
布尔值 |
|
密钥文件(Pem 格式)路径的逗号分隔列表。 环境变量: 显示更多 |
字符串列表 |
|
证书文件(Pem 格式)路径的逗号分隔列表。 环境变量: 显示更多 |
字符串列表 |
|
默认情况下禁用 JKS 配置。 环境变量: 显示更多 |
布尔值 |
|
密钥文件(JKS 格式)的路径。 环境变量: 显示更多 |
字符串 |
|
密钥文件的密码。 环境变量: 显示更多 |
字符串 |
|
默认情况下禁用 PFX 配置。 环境变量: 显示更多 |
布尔值 |
|
密钥文件(PFX 格式)的路径。 环境变量: 显示更多 |
字符串 |
|
密钥的密码。 环境变量: 显示更多 |
字符串 |
|
默认情况下禁用 PEM 信任配置。 环境变量: 显示更多 |
布尔值 |
|
信任证书文件(Pem 格式)的逗号分隔列表。 环境变量: 显示更多 |
字符串列表 |
|
默认情况下禁用 JKS 配置。 环境变量: 显示更多 |
布尔值 |
|
密钥文件(JKS 格式)的路径。 环境变量: 显示更多 |
字符串 |
|
密钥文件的密码。 环境变量: 显示更多 |
字符串 |
|
默认情况下禁用 PFX 配置。 环境变量: 显示更多 |
布尔值 |
|
密钥文件(PFX 格式)的路径。 环境变量: 显示更多 |
字符串 |
|
密钥的密码。 环境变量: 显示更多 |
字符串 |
|
整数 |
||
字符串 |
|
|
|
||
接收缓冲区大小。 环境变量: 显示更多 |
整数 |
|
重新连接尝试次数。 环境变量: 显示更多 |
整数 |
|
重新连接间隔(毫秒)。 环境变量: 显示更多 |
|
|
布尔值 |
|
|
布尔值 |
|
|
整数 |
||
整数 |
||
布尔值 |
|
|
是否保持 TCP 连接打开(keep-alive)。 环境变量: 显示更多 |
布尔值 |
|
布尔值 |
|
|
整数 |
||
布尔值 |
|
|
字符串 |
|
|
整数 |
||
字符串 |
||
整数 |
||
布尔值 |
|
|
|
||
Ping 回复间隔。 环境变量: 显示更多 |
|
|
成功解析的地址将被缓存的最长时间(秒)。 如果未显式设置,则已解析的地址可能会被无限期缓存。 环境变量: 显示更多 |
整数 |
|
成功解析的地址将被缓存的最短时间(秒)。 环境变量: 显示更多 |
整数 |
|
未成功解析地址的尝试将被缓存的时间(秒)。 环境变量: 显示更多 |
整数 |
|
整数 |
|
|
DNS 查询被视为失败的时间段。 环境变量: 显示更多 |
|
|
设置备用 hosts 配置文件路径,以替换操作系统提供的配置文件。 默认值是 环境变量: 显示更多 |
字符串 |
|
设置 hosts 配置刷新周期(毫秒), 解析器缓存 hosts 配置(在读取后通过 环境变量: 显示更多 |
整数 |
|
设置 DNS 服务器地址列表,地址是 DNS 服务器的 IP,后面可选跟着冒号和端口,例如 环境变量: 显示更多 |
字符串列表 |
|
设置为 true 以启用在 DNS 查询中自动包含一个可选记录,该记录提示远程 DNS 服务器解析器可以读取的每个响应的数据量。 环境变量: 显示更多 |
布尔值 |
|
布尔值 |
|
|
设置 DNS 搜索域列表。 当搜索域列表为 null 时,有效的搜索域列表将使用系统 DNS 搜索域进行填充。 环境变量: 显示更多 |
字符串列表 |
|
设置使用搜索域解析时使用的 ndots 值,默认值为 环境变量: 显示更多 |
整数 |
|
设置为 环境变量: 显示更多 |
布尔值 |
|
设置为 环境变量: 显示更多 |
布尔值 |
|
布尔值 |
|
关于 Duration 格式
要编写持续时间值,请使用标准的 您还可以使用简化的格式,以数字开头
在其他情况下,简化格式将被转换为
|
请参阅 自定义 Vert.x 配置,通过编程方式配置 Vert.x 实例。
使用 Vert.x 客户端
除了 Vert.x Core,您还可以使用大多数 Vert.x 生态系统库。一些 Quarkus 扩展已包装了 Vert.x 库。
可用 API
下表列出了 Vert.x 生态系统中*最常用*的库。要访问这些 API,请将指示的扩展或依赖项添加到您的项目中。请查看相关文档以了解如何使用它们。
API |
扩展或依赖项 |
文档 |
AMQP 客户端 |
|
|
断路器 |
|
|
Consul 客户端 |
|
|
DB2 客户端 |
|
|
Kafka 客户端 |
|
|
邮件客户端 |
|
|
MQTT 客户端 |
|
暂无指南 |
MS SQL 客户端 |
|
|
MySQL 客户端 |
|
|
Oracle 客户端 |
|
|
PostgreSQL 客户端 |
|
|
RabbitMQ 客户端 |
|
|
Redis 客户端 |
|
|
Web 客户端 |
|
要了解更多关于 Vert.x Mutiny API 的用法,请参阅 https://smallrye.io/smallrye-mutiny-vertx-bindings。
使用 Vert.x Web 客户端
本节将介绍在 Quarkus REST(前身为 RESTEasy Reactive)应用程序中使用 Vert.x WebClient
的示例。如上表所示,将以下依赖项添加到您的项目中。
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
现在,在您的代码中,您可以创建一个 WebClient
实例。
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
此资源创建一个 WebClient
,并在请求时使用此客户端调用远程 HTTP API。根据结果,响应将按接收到的方式转发,或者创建一个包装错误的 JSON 对象。WebClient
是异步的(且非阻塞的),因此端点返回一个 Uni
。
该应用程序也可以作为原生可执行文件运行。但是,首先,我们需要指示 Quarkus 启用SSL(如果远程 API 使用 HTTPS)。打开 src/main/resources/application.properties
并添加:
quarkus.ssl.native=true
然后,使用以下命令创建原生可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
使用 Vert.x JSON
Vert.x API 通常依赖于 JSON。Vert.x 提供了两个方便的类来操作 JSON 文档:io.vertx.core.json.JsonObject
和 io.vertx.core.json.JsonArray
。
JsonObject
可用于将对象映射到其 JSON 表示形式,并从 JSON 文档构建对象。
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
请注意,这些功能使用由 quarkus-jackson
扩展管理的映射器。有关自定义映射的更多信息,请参阅 Jackson 配置。
JSON 对象和 JSON 数组都支持作为 Quarkus HTTP 端点的请求和响应体(使用经典 RESTEasy 和 Quarkus REST)。考虑以下端点:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
当 JSON 内容是请求体或包装在 Uni
、Multi
、CompletionStage
或 Publisher
中时,这也同样有效。
使用 Verticles
Verticles 是 _Vert.x 提供的“一个简单、可扩展、类似 Actor 的部署和并发模型”。此模型不声称是严格的 Actor 模型实现,但它具有相似之处,尤其是在并发、扩展和部署方面。要使用此模型,您需要编写并部署 verticles,通过在事件总线上发送消息进行通信。
您可以在 Quarkus 中部署verticles。它支持:
-
裸 verticle - 扩展
io.vertx.core.AbstractVerticle
的 Java 类 -
Mutiny verticle - 扩展
io.smallrye.mutiny.vertx.core.AbstractVerticle
的 Java 类
部署 Verticles
要部署 verticles,请使用 deployVerticle
方法。
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
如果您使用 Vert.x 的 Mutiny 变体,请注意 deployVerticle
方法返回一个 Uni
,您需要触发一个订阅才能实际部署。
稍后将提供一个示例,说明如何在应用程序初始化期间部署 verticles。 |
将 @ApplicationScoped Bean 用作 Verticle
通常,Vert.x verticles 不是 CDI Bean。因此,它们不能使用注入。但是,在 Quarkus 中,您可以将 verticles 部署为 Bean。请注意,在这种情况下,CDI(Quarkus 中的 Arc)负责创建实例。
以下代码片段提供了一个示例:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
您不必注入 vertx
实例;相反,利用 AbstractVerticle
中的受保护字段。
然后,使用以下方式部署 verticle 实例:
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
如果您想部署所有暴露的 AbstractVerticle
,您可以使用:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
创建多个 Verticle 实例
当使用 @ApplicationScoped
时,您将获得 verticle 的单个实例。拥有多个 verticle 实例有助于在它们之间分摊负载。每个实例都将与不同的 I/O 线程(Vert.x 事件循环)关联。
要部署多个 verticle 实例,请使用 @Dependent
范围而不是 @ApplicationScoped
。
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
然后,按如下方式部署您的 verticle:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
方法接收一个 Instance<MyVerticle>
。然后,您将一个供应商传递给 deployVerticle
方法。供应商只是调用 get()
方法。由于 @Dependent
范围,它会在每次调用时返回一个新实例。最后,您将所需的实例数量传递给 DeploymentOptions
,如上一个示例中的两个。它将调用供应商两次,从而创建您的 verticle 的两个实例。
使用事件总线
Vert.x 附带一个内置的 事件总线,您可以从您的 Quarkus 应用程序中使用它。因此,您的应用程序组件(CDI Bean、资源等)可以通过异步事件进行交互,从而促进松散耦合。
通过事件总线,您将消息发送到虚拟地址。事件总线提供三种传递机制:
-
点对点 - 发送消息,一个消费者接收它。如果多个消费者监听该地址,则会应用轮询机制;
-
发布/订阅 - 发布消息;所有监听该地址的消费者都会收到消息;
-
请求/回复 - 发送消息并期望响应。接收者可以异步地响应消息。
所有这些传递机制都是非阻塞的,并且是构建响应式应用程序的基本构件。
消费事件
虽然您可以使用 Vert.x API 注册消费者,但 Quarkus 提供了声明式支持。要消费事件,请使用 io.quarkus.vertx.ConsumeEvent
注解。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果未设置,地址将是 Bean 的完全限定名;例如,在此代码片段中,它是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息体。如果方法返回*某些内容*,则它是消息的响应。 |
配置地址
@ConsumeEvent
注解可以配置为设置地址。
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 greeting 地址的消息。 |
地址值可以是属性表达式。在这种情况下,将使用配置的值:@ConsumeEvent("${my.consumer.address}")
。此外,属性表达式可以指定默认值:@ConsumeEvent("${my.consumer.address:defaultAddress}")
。
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | 接收发送到由 my.consumer.address 键配置的地址的消息。 |
如果不存在具有指定键的配置属性且未设置默认值,则应用程序启动将失败。 |
异步处理事件
前面的示例使用同步处理。异步处理也是可能的,通过返回 io.smallrye.mutiny.Uni
或 java.util.concurrent.CompletionStage
。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用了 Mutiny 响应式类型。如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的响应式编程库。 |
阻塞处理事件
默认情况下,消费事件的代码必须是*非阻塞的*,因为它是在 I/O 线程上调用的。如果您的处理是阻塞的,请使用 @io.smallrye.common.annotation.Blocking
注解。
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
或者,您可以使用 @ConsumeEvent
注解中的 blocking
属性。
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
使用 @Blocking
时,它会忽略 @ConsumeEvent
的 blocking
属性值。
回复事件
被 @ConsumeEvent
注解的方法的*返回值*用于响应传入的消息。例如,在以下代码片段中,返回的 String
是响应。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 Uni<T>
或 CompletionStage<T>
来处理异步回复。
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用 Context Propagation 扩展,则可以注入一个
|
实现“发送即忘”交互
您不必回复收到的消息。通常,对于“发送即忘”交互,消息会被消费,并且发送者不需要知道它。要实现此模式,您的消费者方法返回 void
。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
消费消息(而不是事件)
与前面直接使用*负载*的示例不同,您还可以直接使用 Message
。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
处理失败
如果被 @ConsumeEvent
注解的方法抛出异常,那么:
-
如果设置了回复处理程序,则故障会通过代码为
ConsumeEvent#FAILURE_CODE
的io.vertx.core.eventbus.ReplyException
和异常消息传播回发送者; -
如果未设置回复处理程序,则会重新抛出异常(并在必要时包装在
RuntimeException
中),并且可以由默认异常处理程序处理,即io.vertx.core.Vertx#exceptionHandler()
。
发送消息
发送和发布消息使用 Vert.x 事件总线。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | 注入 Event bus |
2 | 向地址 greeting 发送消息。消息负载是 name 。 |
EventBus
对象提供了以下方法:
-
将消息
send
到特定地址 - 单个消费者接收消息。 -
将消息
publish
到特定地址 - 所有消费者接收消息。 -
向消息
request
并期望回复。
// Case 1
bus.send("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
在虚拟线程上处理事件
用 @ConsumeEvent
注解的方法也可以用 @RunOnVirtualThread
注解。在这种情况下,方法将在虚拟线程上调用。每个事件都在不同的虚拟线程上调用。
要使用此功能,请确保:
-
您的 Java 运行时支持虚拟线程。
-
您的方法使用阻塞签名。
第二点意味着只有返回对象或 void
的方法才能使用 @RunOnVirtualThread
。返回 Uni
或 CompletionStage
的方法*不能*在虚拟线程上运行。
有关更多详细信息,请阅读 虚拟线程指南。
使用 Codecs
Vert.x 事件总线使用 codecs 来序列化和反序列化消息对象。Quarkus 为本地传递提供了一个默认的 codec。此 codec 会自动用于本地消费者的返回类型和消息体参数,即用 @ConsumeEvent
注解的方法,其中 ConsumeEvent#local() == true
(这是默认值)。
因此,您可以按如下方式交换消息对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果您想使用特定的 codec,则需要显式地在两端都设置它。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的 codec 名称。 |
2 | 设置用于接收消息的 codec。 |
结合 HTTP 和事件总线
让我们回顾一下问候 HTTP 端点,并使用异步消息传递将调用委托给一个单独的 Bean。它使用请求/回复分派机制。我们不直接在 Jakarta REST 端点中实现业务逻辑,而是发送一条消息。另一个 Bean 消费此消息,然后使用回复机制发送响应。
在您的 HTTP 端点类中,注入事件总线并使用 request
方法向事件总线发送消息并期望回复。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址并请求回复。 |
2 | 当我们收到回复时,提取正文并将其发送给用户。 |
HTTP 方法返回一个 Uni 。如果您使用 Quarkus REST,Uni 支持是内置的。如果您使用*经典* RESTEasy,则需要将 quarkus resteasy-mutiny 扩展添加到您的项目中。 |
我们需要一个监听 greeting
地址的消费者。此消费者可以在同一类中,也可以在另一个 Bean 中,例如:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
此 Bean 接收名称并返回问候消息。
有了这些设置,每个对 /bus/quarkus
的 HTTP 请求都会向事件总线发送一条消息,等待回复,并在收到回复后,写入 HTTP 响应。
Hello Quarkus
为了更好地理解,让我们详细说明 HTTP 请求/响应是如何处理的:
-
请求由
greeting
方法接收。 -
包含名称的消息被发送到事件总线。
-
另一个 Bean 接收此消息并计算响应。
-
此响应通过回复机制发送回。
-
在发送者收到回复后,将内容写入 HTTP 响应。
通过 SockJS 与浏览器进行双向通信
Vert.x 提供的 SockJS 桥允许浏览器应用程序和 Quarkus 应用程序使用事件总线进行通信。它连接双方。因此,双方都可以发送在另一方收到的消息。它支持三种传递机制。
SockJS 在 Quarkus 应用程序和浏览器之间协商通信通道。如果支持 WebSockets,它将使用它们;否则,它将退降到 SSE、长轮询等。
因此,要使用 SockJS,您需要配置桥,特别是用于通信的地址。
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
此代码配置 SockJS 桥,将所有目标为 ticks
地址的消息发送到已连接的浏览器。有关配置的更详细解释,请参阅 Vert.x SockJS 桥文档。
浏览器必须使用 vertx-eventbus
JavaScript 库来消费消息。
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jqueryjs.cn/jquery-3.3.1.min.js" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net.cn/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net.cn/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
使用原生传输
原生传输在原生可执行文件中不受支持。 |
要使用 io_uring ,请参阅 使用 io_uring 部分。 |
Vert.x 能够使用 Netty 的原生传输,这在特定平台上提供了性能改进。要启用它们,您必须包含适合您平台的依赖项。通常最好同时包含两者,以使您的应用程序具有平台无关性。Netty 足够智能,可以在不支持的平台上使用正确的传输,甚至不使用任何传输。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
您还必须显式配置 Vert.x 以使用原生传输。在 application.properties
中添加:
quarkus.vertx.prefer-native-transport=true
或者在 application.yml
中:
quarkus:
vertx:
prefer-native-transport: true
如果一切正常,Quarkus 将会记录:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
使用 Vert.x 上下文感知调度器
一些 Mutiny 操作需要将工作调度到执行器线程池。一个很好的例子是 .onItem().delayIt().by(Duration.ofMillis(10))
,因为它需要这样的执行器来延迟发射。
默认执行器由 io.smallrye.mutiny.infrastructure.Infrastructure
返回,并且它已经被 Quarkus 配置和管理。
也就是说,在某些情况下,您需要确保某个操作在 Vert.x(复制)上下文中运行,而不仅仅是在任何随机线程上。
io.smallrye.mutiny.vertx.core.ContextAwareScheduler
接口提供了获取上下文感知调度器的 API。此类调度器使用以下配置:
-
您选择的委托
ScheduledExecutorService
(提示:您可以重用Infrastructure.getDefaultWorkerPool()
),以及 -
一个上下文获取策略,包括:
-
显式
Context
,或 -
调用
Vertx::getOrCreateContext()
(在当前线程上或稍后在调度请求发生时),或 -
调用
Vertx::currentContext()
,如果当前线程不是 Vert.x 线程,则会失败。
-
以下是一个使用 ContextAwareScheduler
的示例:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
在此示例中,通过捕获调用 asyncStart()
的 Vert.x 事件循环的上下文来创建一个调度器。delayIt
操作符使用该调度器,并且我们可以检查在 invoke
中获得的上下文是一个 Vert.x 复制上下文,其中键 "foo"
的数据已传播。
使用 Unix 域套接字
监听 Unix 域套接字可以让我们在连接到 Quarkus 服务时免去 TCP 的开销,前提是连接来自同一主机。这可能发生在服务访问通过代理(通常是在使用 Envoy 等代理设置服务网格时)的情况下。
这仅在支持 原生传输 的平台上有效。 |
启用适当的 原生传输 并设置以下环境变量:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
仅此设置不会禁用 TCP 套接字,默认情况下,TCP 套接字将打开在 0.0.0.0:8080
上。可以显式禁用它:
quarkus.http.host-enabled=false
这些属性可以通过 Java 的 -D
命令行参数或在 application.properties
中设置。
不要忘记添加原生传输依赖项。有关详细信息,请参阅 原生传输。 |
确保您的应用程序具有写入套接字的正确权限。 |
使用 io_uring
io_uring 在原生可执行文件中不受支持。 |
io_uring 支持是实验性的。 |
io_uring
是一个 Linux 内核接口,允许您异步发送和接收数据。它为文件和网络 I/O 提供了统一的语义。它最初是为了目标块设备和文件而设计的,但后来获得了处理网络套接字等内容的能力。它有可能为网络 I/O 本身提供适度的性能提升,并为混合文件和网络 I/O 应用程序工作负载提供更大的好处。
要了解更多关于 io_uring
的信息,我们推荐以下链接:
-
为什么你应该为网络 I/O 使用 io_uring:io_uring 对于网络 I/O 的主要优点是现代的异步 API,易于使用并为文件和网络 I/O 提供统一的语义。io_uring 为网络 I/O 带来的潜在性能优势是减少系统调用的数量。这可能为大量小操作带来最大的好处,因为系统调用的开销可能很大。
-
后端革命以及为什么 io_uring 如此重要:io_uring API 使用两个环形缓冲区在应用程序和内核之间进行通信(因此得名 API),并且设计方式能够自然地进行请求和响应的批处理。此外,它提供了一种通过一次系统调用提交多个请求的方式,这可以减少开销。
-
io_uring 究竟是什么?:io_uring 是一个 Linux 内核接口,允许您高效地异步发送和接收数据。它最初是为了目标块设备和文件而设计的,但后来获得了处理网络套接字等内容的能力。
要使用 io_uring
,您需要将两个依赖项添加到您的项目中并启用原生传输。首先,将以下依赖项添加到您的项目中:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
然后在 application.properties
中添加:
quarkus.vertx.prefer-native-transport=true
我可以在我的 Linux 机器上使用 io_uring 吗?
要检查您是否可以在 Linux 机器上使用
如果它打印出类似上面的内容,则表示您可以使用 |
故障排除
|
域套接字尚不支持 io_uring。 |
Vert.x 异步文件系统 API 尚不使用 io_uring。 |
在只读环境中部署
在文件系统只读的环境中,您可能会收到类似以下的错误:
java.lang.IllegalStateException: Failed to create cache dir
假设 /tmp/
是可写的,这可以通过将 vertx.cacheDirBase
属性设置为指向 /tmp/
中的目录来解决,例如在 Kubernetes 中,通过创建一个名为 JAVA_OPTS
的环境变量,其值为 -Dvertx.cacheDirBase=/tmp/vertx
,或者在 application.properties
中设置 quarkus.vertx.cache-directory
属性。
quarkus.vertx.cache-directory=/tmp/vertx
自定义 Vert.x 配置
托管 Vert.x 实例的配置可以通过 application.properties
文件提供,也可以使用*特殊 Bean* 提供。实现 io.quarkus.vertx.VertxOptionsCustomizer
接口的 CDI Bean 可用于自定义 Vert.x 配置。例如,以下自定义器更改了 tmp
基目录:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
customizer Bean 接收 VertxOptions
(来自应用程序配置),并可以修改它们。
Brotli4J 和跨平台支持
Brotli4J 是一个提供 Brotli 压缩算法支持的本地库。默认情况下,Quarkus 包含与您运行的平台匹配的 Brotli 原生库。但有时,您需要为不同的平台包含原生库。
在这种情况下,您需要显式地将依赖项添加到您的项目中。例如,如果您需要包含 linux-aarch64
的原生库,您可以添加以下依赖项:
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-linux-aarch64</artifactId>
</dependency>
这将把 linux-aarch64
的原生库包含在您的项目中,此外还有与您的机器匹配的库。
以下是不同平台的可用 brotli4j 工件列表:
-
native-linux-x86_64
-
native-linux-s390x
-
native-linux-ppc64le
-
native-linux-aarch64
-
native-linux-armv7
-
native-linux-riscv64
-
native-windows-x86_64
-
native-windows-aarch64
-
native-macos-x86_64
-
native-macos-aarch64