使用事件总线
Quarkus允许不同的Bean使用异步事件进行交互,从而促进松耦合。消息被发送到虚拟地址。它提供3种类型的传递机制
-
点对点 - 发送消息,一个消费者接收它。如果多个消费者监听该地址,则应用轮询;
-
发布/订阅 - 发布消息,所有监听该地址的消费者都接收消息;
-
请求/回复 - 发送消息并期望得到响应。接收者可以以异步方式响应消息
所有这些传递机制都是非阻塞的,并且为构建反应式应用程序提供了基本构建块之一。
异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(没有流)和本地消息。 |
安装
此机制使用 Vert.x EventBus,因此您需要启用 vertx
扩展才能使用此功能。如果创建新项目,请按如下方式设置 extensions
参数
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=vertx-quickstart"
如果您已经创建了一个项目,则可以使用 add-extension
命令将 vertx
扩展添加到现有的Quarkus项目中
quarkus extension add vertx
./mvnw quarkus:add-extension -Dextensions='vertx'
./gradlew addExtension --extensions='vertx'
否则,您可以手动将其添加到构建文件的依赖项部分
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
消费事件
要消费事件,请使用 io.quarkus.vertx.ConsumeEvent
注解
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果未设置,则地址是Bean的完全限定名称,例如,在此代码片段中是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息体。如果方法返回某些内容,则是消息响应。 |
默认情况下,消费事件的代码必须是非阻塞的,因为它是在Vert.x事件循环中调用的。如果您的处理是阻塞的,请使用
或者,您可以使用
当使用 |
通过返回 io.smallrye.mutiny.Uni
或 java.util.concurrent.CompletionStage
也可以进行异步处理
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用了Mutiny反应式类型。如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的反应式编程库。 |
配置地址
可以配置 @ConsumeEvent
注解以设置地址
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 greeting 地址的消息 |
回复
用 @ConsumeEvent
注解的方法的返回值用作对传入消息的响应。例如,在以下代码片段中,返回的 String
是响应。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 Uni<T>
或 CompletionStage<T>
以处理异步回复
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用 Context Propagation 扩展,则可以注入
或者,您可以使用默认的Quarkus工作池,如下所示
|
实现即发即弃交互
您不必回复收到的消息。通常,对于即发即弃交互,消息被消费,并且发送方不需要知道它。要实现这一点,您的消费者方法只需返回 void
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
发送消息
好的,我们已经看到了如何接收消息,现在让我们切换到另一侧:发送方。发送和发布消息使用 Vert.x 事件总线
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | 注入事件总线 |
2 | 向地址 greeting 发送消息。消息有效负载为 name |
EventBus
对象提供以下方法
-
send
将消息发送到特定地址 - 单个消费者接收消息。 -
publish
将消息发布到特定地址 - 所有消费者都接收消息。 -
send
发送消息并异步等待回复 -
send
发送消息并以阻塞方式等待回复
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
将事物放在一起 - 桥接 HTTP 和消息
让我们回顾一下问候 HTTP 端点,并使用异步消息传递将调用委托给一个单独的 Bean。它使用请求/回复分发机制。我们没有在 Jakarta REST 端点内实现业务逻辑,而是发送一条消息。此消息由另一个 Bean 消费,并且使用回复机制发送响应。
首先使用以下命令创建一个新项目
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=vertx-http-quickstart"
您已经可以使用以下命令在开发模式下启动应用程序
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
然后,创建一个具有以下内容的新 Jakarta REST 资源
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址并请求响应 |
2 | 当我们获得响应时,提取正文并将其发送给用户 |
如果您调用此端点,您将等待并获得超时。实际上,没有人监听。因此,我们需要一个监听 greeting
地址的消费者。创建一个具有以下内容的 GreetingService
Bean
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
此Bean接收名称,并返回问候消息。
现在,打开您的浏览器到 https://:8080/async/Quarkus,您应该看到
Hello Quarkus
为了更好地理解,让我们详细说明一下 HTTP 请求/响应是如何处理的
-
请求由
hello
方法接收 -
包含名称的消息被发送到事件总线
-
另一个 Bean 接收此消息并计算响应
-
此响应使用回复机制发回
-
一旦发送方收到回复,内容将被写入 HTTP 响应
可以使用以下命令打包此应用程序
quarkus build
./mvnw install
./gradlew build
您还可以使用以下命令将其编译为本机可执行文件
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
使用编解码器
Vert.x 事件总线使用编解码器来序列化和反序列化对象。Quarkus为本地传递提供了默认编解码器。因此,您可以按如下方式交换对象
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果您想使用特定的编解码器,则需要在两端都显式设置它
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收消息的编解码器 |