Mutiny 和 Reactiverse

我被问过很多次:如何在 Quarkus 中使用 Eclipse Vert.x?是的,你可以在 Quarkus 中使用 Vert.x。你可以部署 verticle,使用 event bus 进行通信,或者使用 Vert.x 生态系统中的任何东西。但是,你也可以在 Quarkus 中使用 Vert.x 的 Mutiny 变体,并与其他 Quarkus 提供的响应式 API 获得无缝体验。虽然有几篇文章已经提到了这一点,但它值得一篇专门的博文。那么,我们开始吧。

Eclipse Vert.x

Vert.x 是一个用于构建响应式应用的工具包。Vert.x 的生态系统非常庞大。从 HTTP 和数据访问能力,到通过微服务进行消息传递的客户端和安全设施,Vert.x 的生态系统极其多样化和通用。要了解其多样性,只需查看 Vert.x 文档。这使得 Vert.x 在许多领域都很受欢迎,例如 Web 应用、IoT 网关、银行应用等。

正如你可能知道的,Quarkus 基于 Vert.x。在底层,有一个托管的 Vert.x 实例为 Quarkus 的其余部分提供支持。

architecture

当 Quarkus 提供 HTTP 端点时,在底层,有一个 Vert.x HTTP 服务器处理请求和响应。对于消息传递、gRPC 和几乎任何 I/O 操作也是如此。

Vert.x “原生” API 及相关

Vert.x 提供了多种 API。现在我们先关注“原生”API。

遵循 Vert.x 的响应式特性,该 API 主要包含异步方法。这些方法遵循一种语法约定

public void doSomething(param1, param2, Handler<AsyncResult<T>> handler) {
    // ...
}

有趣的部分是最后一个参数。它是一个函数,更准确地说是一个回调,当操作完成或失败时会被调用。是的,Vert.x 的异步特性不允许使用 try/catch 块。所以你需要传递一个 continuation 回调,该回调会根据操作结果被调用。

AsyncResult 是一个捕获此结果的结构。它包含操作产生的 `<T>` 类型的结果,或者如果操作失败则包含失败信息。

让我们看一个例子

vertx.fileSystem()
    .readFile("my-file.txt", ar -> {
        if (ar.failed()) {
            System.out.println("D'oh! Cannot read the file: " + ar.cause());
        } else {
            System.out.println("File content is: " + ar.result());
        }
    });

这段代码读取一个文件,由于这是一个异步操作,它会在文件读取完成后调用回调。readFile 方法读取文件的全部内容并将其累积在一个 buffer 中。回调接收异步结果,其中包含文件的内容(ar.result())或失败信息。当操作完成或失败时,Vert.x 会调用此回调。

Vert.x 还通过 ReadStreamWriteStream 类支持流。ReadStream 代表你可以读取的数据流。因此,你可以附加一个回调,在流中的每个项遍历时调用。WriteStream 是一个数据源。你可以将项推送到 WriteStream。这些项将被 ReadStream 消费。

vertx.fileSystem()
    .open("my-file.txt", new OpenOptions().setRead(true), ar -> {
        if (ar.failed()) {
            System.out.println(
                    "D'oh! Cannot read the file: " + ar.cause()
            );
        } else {
            AsyncFile file = ar.result();
            // AsyncFile is a read stream, we can read from it:
            file
                    .exceptionHandler(t ->
                        System.out.println("Failure while reading the file: " + t)
                    )
                    // Reads the file chunk by chunk
                    .handler(buffer ->
                        System.out.println("Received buffer: " + buffer)
                    );
        }
    });
Vert.x 流不实现 Reactive Streams。Vert.x 提供了一种不同的背压协议。

为什么这些 API 成型规则很重要?Vert.x 不仅提供一个 API。上面介绍的“原生”API 只是提供的 API 之一。它还提供 Kotlin API、RX Java API 等。

这些 API 是**生成**的。Vert.x 提供了一个代码生成器,可以 _转换_ Vert.x“原生”API 为其他 API。由于所有方法都格式良好,生成器可以理解它们应该如何被适配。

generation

生成的代码暴露了不同的 API;每个方法都委托给“原生”API。异步方法和流可以遵循不同的转换,因此生成的 API 使用了正确的惯用法。

Vert.x Mutiny API

Mutiny 是一个事件驱动的响应式编程库。它与 Vert.x 无关。但是,我们编写了一个代码生成器,可以为 Vert.x API 生成 Mutiny 变体。

mutiny

转换是直接的

  • io.vertx 包 ⇒ io.vertx.mutiny

  • 异步方法 ⇒ 返回 Uni<T> 的方法

  • ReadStreams<T> ⇒ 可以作为 Multi<T> 消费

  • WriteStreams<T> ⇒ 可以作为 Reactive Streams Subscriber<T> 消费

它还支持 Vert.x 的背压协议到 Reactive Streams,因为 Mutiny 实现了 Reactive Streams。

例如,上面的第一个例子变成了

Uni<Buffer> uni = vertx.fileSystem().readFile("my-file.txt");
uni.subscribe()
  .with(it -> System.out.println("File content is: " + it));
两个 API 之间的一个区别与惰性有关。Vert.x“原生”API 在调用方法后立即触发操作。Mutiny 变体期望订阅来触发操作。

上面的流示例变成了

Uni<AsyncFile> uni = vertx.fileSystem()
        .open("my-file.txt", new OpenOptions().setRead(true));
uni
    // Gets a Multi to read the file:
    .onItem().transformToMulti(asyncFile -> asyncFile.toMulti())
    // Gets the buffers one by one:
    .subscribe().with(
       buffer -> System.out.println("Received buffer: " + buffer)
);

不止于此

Mutiny 变体不仅应用了上一节中公开的规则。对于异步方法,它还提供了

  • xAndAwait() 方法 - 阻塞调用线程直到收到结果。如果发生失败,则抛出 RuntimeException

  • xAndForget() 方法 - 触发操作,丢弃结果

// Read the content of the file in a blocking manner:
Buffer content   = vertx.fileSystem().readFileAndAwait("my-file.txt");

// Open and close the file
// Closing the file is an asynchronous operation (returning a Uni).
// We trigger the operation and discard the outcome
vertx.fileSystem().open("my-file.txt", new OpenOptions().setRead(true))
    .subscribe().with(file -> file.closeAndForget());

在哪里可以找到这个 API?

在撰写本文时,我们仅提供 Vert.x Core 和 Vert.x 客户端(MongoDB、Redis、Web 客户端、Mqtt 等)。我们正在扩展支持以覆盖整个 Vert.x 堆栈。

要使用 Mutiny 客户端,你需要为你的项目添加正确的依赖。浏览 依赖列表 来选择你需要的那个。

例如,要使用 Vert.x Web 客户端的 Mutiny 变体,请添加以下依赖

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
  <version>...</version>
</dependency>

拥有依赖后,只需创建 Web 客户端实例

@Inject Vertx vertx; // Inject the managed io.vertx.mutiny.core.Vertx instance

private WebClient client;

@PostConstruct
public void init() {
  client = WebClient.create(vertx, new WebClientOptions()
    .setDefaultHost("localhost")
    .setDefaultPort(8082)
  );
}

private Uni<String> call(String path) {
  return client
    .get(path).send()
    .onItem().transform(HttpResponse::bodyAsString);
}
缺少什么?在 SmallRye Reactive Utils 上打开一个 issue。
Javadoc 可在此 获取。

未来展望:Vert.x 4!

Vert.x 4 即将到来!我们已经在 Quarkus 和各种卫星项目中进行迁移工作。随着 Vert.x 4,已经实现了一个新的生成器(遵循相同的代码生成方法),为顺利升级铺平了道路。