编辑此页面

使用事件总线

Quarkus允许不同的Bean使用异步事件进行交互,从而促进松耦合。消息被发送到虚拟地址。它提供3种类型的传递机制

  • 点对点 - 发送消息,一个消费者接收它。如果多个消费者监听该地址,则应用轮询;

  • 发布/订阅 - 发布消息,所有监听该地址的消费者都接收消息;

  • 请求/回复 - 发送消息并期望得到响应。接收者可以以异步方式响应消息

所有这些传递机制都是非阻塞的,并且为构建反应式应用程序提供了基本构建块之一。

异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(没有流)和本地消息。

安装

此机制使用 Vert.x EventBus,因此您需要启用 vertx 扩展才能使用此功能。如果创建新项目,请按如下方式设置 extensions 参数

CLI
quarkus create app org.acme:vertx-quickstart \
    --extension='vertx,rest' \
    --no-code
cd vertx-quickstart

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-quickstart \
    -Dextensions='vertx,rest' \
    -DnoCode
cd vertx-quickstart

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=vertx-quickstart"

如果您已经创建了一个项目,则可以使用 add-extension 命令将 vertx 扩展添加到现有的Quarkus项目中

CLI
quarkus extension add vertx
Maven
./mvnw quarkus:add-extension -Dextensions='vertx'
Gradle
./gradlew addExtension --extensions='vertx'

否则,您可以手动将其添加到构建文件的依赖项部分

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-vertx")

消费事件

要消费事件,请使用 io.quarkus.vertx.ConsumeEvent 注解

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果未设置,则地址是Bean的完全限定名称,例如,在此代码片段中是 org.acme.vertx.GreetingService
2 方法参数是消息体。如果方法返回某些内容,则是消息响应。

默认情况下,消费事件的代码必须是非阻塞的,因为它是在Vert.x事件循环中调用的。如果您的处理是阻塞的,请使用 blocking 属性

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

或者,您可以使用 @io.smallrye.common.annotation.Blocking 注解您的方法

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

当使用 @Blocking 时,它会忽略 @ConsumeEventblocking 属性的值。有关此主题的更多详细信息,请参见 Quarkus反应式架构文档

通过返回 io.smallrye.mutiny.Unijava.util.concurrent.CompletionStage 也可以进行异步处理

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的示例使用了Mutiny反应式类型。如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的反应式编程库

配置地址

可以配置 @ConsumeEvent 注解以设置地址

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发送到 greeting 地址的消息

回复

@ConsumeEvent 注解的方法的返回值用作对传入消息的响应。例如,在以下代码片段中,返回的 String 是响应。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您还可以返回 Uni<T>CompletionStage<T> 以处理异步回复

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用 Context Propagation 扩展,则可以注入 executor

@Inject ManagedExecutor executor;

或者,您可以使用默认的Quarkus工作池,如下所示

Executor executor = Infrastructure.getDefaultWorkerPool();

实现即发即弃交互

您不必回复收到的消息。通常,对于即发即弃交互,消息被消费,并且发送方不需要知道它。要实现这一点,您的消费者方法只需返回 void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

处理消息

如上所述,此机制基于 Vert.x 事件总线。因此,您也可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

处理失败

如果用 @ConsumeEvent 注解的方法抛出异常,则

  • 如果设置了回复处理程序,则失败将通过代码为 ConsumeEvent#FAILURE_CODEio.vertx.core.eventbus.ReplyException 和异常消息传播回发送方,

  • 如果未设置回复处理程序,则异常将被重新抛出(如有必要,包装在 RuntimeException 中),并且可以由默认的异常处理程序处理,即 io.vertx.core.Vertx#exceptionHandler()

发送消息

好的,我们已经看到了如何接收消息,现在让我们切换到另一侧:发送方。发送和发布消息使用 Vert.x 事件总线

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 注入事件总线
2 向地址 greeting 发送消息。消息有效负载为 name

EventBus 对象提供以下方法

  1. send 将消息发送到特定地址 - 单个消费者接收消息。

  2. publish 将消息发布到特定地址 - 所有消费者都接收消息。

  3. send 发送消息并异步等待回复

  4. send 发送消息并以阻塞方式等待回复

// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();

将事物放在一起 - 桥接 HTTP 和消息

让我们回顾一下问候 HTTP 端点,并使用异步消息传递将调用委托给一个单独的 Bean。它使用请求/回复分发机制。我们没有在 Jakarta REST 端点内实现业务逻辑,而是发送一条消息。此消息由另一个 Bean 消费,并且使用回复机制发送响应。

首先使用以下命令创建一个新项目

CLI
quarkus create app org.acme:vertx-http-quickstart \
    --extension='vertx,rest' \
    --no-code
cd vertx-http-quickstart

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-http-quickstart \
    -Dextensions='vertx,rest' \
    -DnoCode
cd vertx-http-quickstart

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=vertx-http-quickstart"

您已经可以使用以下命令在开发模式下启动应用程序

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

然后,创建一个具有以下内容的新 Jakarta REST 资源

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 name 发送到 greeting 地址并请求响应
2 当我们获得响应时,提取正文并将其发送给用户

如果您调用此端点,您将等待并获得超时。实际上,没有人监听。因此,我们需要一个监听 greeting 地址的消费者。创建一个具有以下内容的 GreetingService Bean

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

此Bean接收名称,并返回问候消息。

现在,打开您的浏览器到 https://:8080/async/Quarkus,您应该看到

Hello Quarkus

为了更好地理解,让我们详细说明一下 HTTP 请求/响应是如何处理的

  1. 请求由 hello 方法接收

  2. 包含名称的消息被发送到事件总线

  3. 另一个 Bean 接收此消息并计算响应

  4. 此响应使用回复机制发回

  5. 一旦发送方收到回复,内容将被写入 HTTP 响应

可以使用以下命令打包此应用程序

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

您还可以使用以下命令将其编译为本机可执行文件

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

使用编解码器

Vert.x 事件总线使用编解码器来序列化反序列化对象。Quarkus为本地传递提供了默认编解码器。因此,您可以按如下方式交换对象

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果您想使用特定的编解码器,则需要在两端都显式设置它

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 设置用于发送消息的编解码器的名称
2 设置用于接收消息的编解码器

相关内容