编辑此页面

Mutiny - 凡人的异步

Mutiny 是一个直观的响应式编程库。它是使用 Quarkus 编写响应式应用程序的主要模型。

事件驱动的响应式编程 API

Mutiny 与其他响应式编程库有很大不同。它采用不同的方法来设计您的程序。使用 Mutiny,一切都是事件驱动的:您接收事件,并对它们做出反应。这种事件驱动的特性拥抱了分布式系统的异步本质,并提供了一种优雅而精确的方式来表达延续性。

Mutiny 提供两种类型,它们都是事件驱动和延迟的

  • Uni 发出一个事件(一个 item 或一个 failure)。Unis 方便表示返回 0 或 1 个结果的异步操作。一个很好的例子是向消息代理队列发送消息的结果。

  • Multi 发出多个事件(n 个 item、1 个 failure 或 1 个 completion)。Multis 可以表示 item 流,可能是无界的。一个很好的例子是从消息代理队列接收消息。

这两种类型允许表示任何类型的交互。它们是事件源。您观察它们(subscription),并在它们发出 item、failure 或在有界 Multi 的情况下,发出 completion 事件时收到通知。当您(订阅者)收到事件时,您可以处理它(例如,转换它、过滤它)。使用 Mutiny,您将编写类似 onX().action() 的代码,可以理解为 “在 item X 上执行 action”。

如果您想了解更多关于 Mutiny 及其背后概念的信息,请查看 Mutiny 参考文档

Quarkus 中的 Mutiny

在处理 Quarkus 的响应式特性时,Mutiny 是主要的 API。这意味着大多数扩展都支持 Mutiny,要么通过公开返回 Uni 和 Multi 的 API(例如响应式数据源或 rest 客户端),要么理解您的方法何时返回 Uni 或 Multi(例如 Quarkus REST (以前的 RESTEasy Reactive) 或 Reactive Messaging)。

这些集成使 Mutiny 成为使用 Quarkus 开发的每个响应式应用程序的突出且有凝聚力的模型。此外,Mutiny 架构允许细粒度的死代码消除,从而在使用本机编译时(例如使用 Quarkus 本机模式或 GraalVM 本机镜像编译器)提高内存使用率。

为什么需要另一个响应式编程 API?

经验丰富的响应式开发人员可能会想知道,为什么 Quarkus 引入了另一个响应式编程 API,而现有的 API 已经存在。Mutiny 采用了非常不同的角度

事件驱动 - Mutiny 将事件置于其设计的核心。使用 Mutiny,您可以观察事件、对它们做出反应,并创建优雅且可读的处理管道。不需要函数式编程博士学位。

可导航 - 即使有智能代码完成,具有数百种方法的类也会令人困惑。Mutiny 提供了一个可导航且显式的 API,引导您找到所需的运算符。

非阻塞 I/O - Mutiny 是驯服具有非阻塞 I/O 的应用程序异步特性的完美伴侣(它为 Quarkus 提供动力)。声明式地组合操作、转换数据、强制执行进度、从失败中恢复等等。

为异步世界而生 - Mutiny 可用于任何异步应用程序,例如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理,当然还有……​ 响应式应用程序!

内置响应式流和转换器 - Mutiny 基于 响应式流规范,因此可以与其他任何响应式编程库集成。此外,它还提出了转换器以与其他流行的库进行交互。

Mutiny 和 I/O 线程

Quarkus 由一个 响应式引擎 提供动力,并且在开发响应式应用程序时,您的代码将在少数 I/O 线程之一上执行。请记住,您绝不能阻塞这些线程,如果这样做,模型将崩溃。因此,您不能使用阻塞 I/O。相反,您需要调度 I/O 操作并传递延续。

Reactive Execution Model and I/O Threads

Mutiny 事件驱动范例是为此量身定制的。当 I/O 操作成功完成时,代表它的 Uni 会发出 item 事件。当它失败时,它会发出 failure 事件。延续可以使用事件驱动的 API 简单自然地表达。

通过示例学习 Mutiny

许多 Quarkus 扩展都公开了 Mutiny API。在本节中,我们使用 MongoDB 扩展来演示如何使用 Mutiny。

让我们想象一个简单的结构,表示来自 元素周期表 的元素

public class Element {

   public String name;
   public String symbol;
   public int position;

   public Element(String name, String symbol, int position) {
       this.name = name;
       this.symbol = symbol;
       this.position = position;
   }

   public Element() {
        // Use by JSON mappers
   }
}

此结构包含元素的名称、符号和位置。要检索元素并将元素存储到 Mongo 集合中,您可以使用以下代码

@ApplicationScoped
public class ElementService {

   final ReactiveMongoCollection<Element> collection;

   @Inject
   ElementService(ReactiveMongoClient client) {
       collection = client.getDatabase("quarkus")
               .getCollection("elements", Element.class);
   }

   public void add(Element element) {
       Uni<InsertOneResult> insertion = collection.insertOne(element);
       insertion
           .onItem().transform(r -> r.getInsertedId().asString())
           .subscribe().with(
               result -> System.out.println("inserted " + result),
               failure -> System.out.println("D'oh" + failure));
   }

   public void getAll() {
       collection.find()
           .subscribe().with(
              element -> System.out.println("Element: " + element),
             failure -> System.out.println("D'oh! " + failure),
             () -> System.out.println("No more elements")
       );
   }

}

首先,注入 Mongo 客户端。请注意,它使用响应式变体 (io.quarkus.mongodb.reactive.ReactiveMongoClient)。在 initialize 方法中,我们检索并存储将在其中插入元素的集合。

add 方法在集合中插入一个元素。它接收元素作为参数,并使用集合的响应式 API。与数据库交互涉及 I/O。响应式原则禁止在等待交互完成时阻塞。相反,我们调度操作并传递延续。insertOne 方法返回一个 Uni,即异步操作。那是调度的 I/O。我们现在需要表达延续,这是使用 .onItem() 方法完成的。.onItem() 允许配置当观察到的 Uni 发出 item 时需要发生的事情,在本例中是当调度的 I/O 完成时。在此示例中,我们提取插入的文档 id。最后一步是订阅。没有它,什么也不会发生。订阅会触发该操作。订阅方法还可以定义处理程序:成功时的 id 值,或插入失败时的 failure。

现在让我们看看第二个方法。它检索所有存储的元素。在这种情况下,它返回多个 item(每个存储的元素一个),因此我们使用 Multi。对于插入,获取存储的元素涉及 I/O。find 是我们的操作。对于 Uni,您需要订阅以触发该操作。订阅者接收 item 事件、failure 事件或收到所有元素时的 completion 事件。

订阅 Uni 或 Multi 至关重要,因为没有它,该操作永远不会执行。在 Quarkus 中,某些扩展会为您处理订阅。例如,在 Quarkus REST 中,您的 HTTP 方法可以返回 Uni 或 Multi,并且 Quarkus REST 处理订阅。

Mutiny 模式

上一节的示例是故意简化的。让我们看看一些常见的模式。

观察事件

您可以使用以下方法观察各种事件

on{event}().invoke(ev → System.out.println(ev));

例如,对于 item 使用:onItem().invoke(item → …​);

对于 failure,使用:onFailure().invoke(failure → …​)

invoke 方法是同步的。有时您需要执行异步操作。在这种情况下,使用 call,如:onItem().call(item → someAsyncAction(item))。请注意,call 不会更改 item,它只是调用一个异步操作,当该操作完成时,它会向下游发出原始 item。

转换 item

第一个有用的模式是转换您收到的 item 事件。正如我们在上一节中看到的,它可能表示成功插入,或者存储在数据库中的元素。

转换 item 是使用:onItem().transform(item → …​.) 完成的。

有关转换的更多详细信息,请参见 Mutiny 文档

顺序组合

顺序组合允许链接依赖的异步操作。这是使用 onItem().transformToUni(item → …​) 实现的。与 transform 不同,传递给 transformToUni 的函数返回一个 Uni。

Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));

有关异步转换的更多详细信息,请参见 Mutiny 文档

Failure 处理

到目前为止,我们只处理 item 事件,但是处理 failure 至关重要。您可以使用 onFailure() 处理 failure。

例如,您可以使用 onFailure().recoverWithItem(fallback) 使用回退 item 恢复

Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);

您也可以重试该操作,例如

Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);

有关 failure 恢复的更多信息,请参见 failure 处理文档failure 时重试文档

事件和操作

下表列出了您可以为 Uni 和 Multi 接收的事件。它们中的每一个都与其自己的组 (onX) 相关联。第二个表列出了您可以在事件上执行的经典操作。请注意,某些组提供更多可能性。

来自上游的事件 来自下游的事件

Uni

订阅 (1)、Item (0..1)、Failure (0..1)

取消

Multi

订阅 (1)、Item (0..n)、Failure (0..1)、完成 (0..1)

取消、请求

查看 事件文档 上的完整事件列表。

操作 API 注释

转换

onItem().transform(Function<I, O> function);

使用同步函数将事件转换为另一个事件。下游接收函数的结果(如果转换失败,则接收 failure)。

transformToUni

onItem().transformToUni(Function<I, Uni<O>> function);

使用异步函数将事件转换为另一个事件。下游接收由生成的 Uni 发出的 item(如果转换失败,则接收 failure)。如果生成的 Uni 发出 failure,则该 failure 将传递到下游。

invoke

onItem().invoke(Consumer<I> consumer)

调用同步消费者。这对于执行副作用操作特别方便。下游接收原始事件,如果消费者抛出异常,则接收 failure

call

onItem().call(Function<I, Uni<?>>)

调用异步函数。这对于执行异步副作用操作特别方便。下游接收原始事件,如果消费者抛出异常或如果生成的 Uni 发出 failure,则接收 failure。

失败

onItem().failWith(Function<I, Throwable>)

当它收到事件时,发出 failure。

完成(仅 Multi)

onItem().complete()

当它收到事件时,发出 completion 事件。

其他模式

Mutiny 提供了许多其他功能。请访问 Mutiny 文档 以查看更多示例,包括完整的事件列表以及如何处理它们。

以下是一些常见问题的指南

快捷方式

使用 Uni 时,必须编写 onItem() 可能会很麻烦。幸运的是,Mutiny 提供了一组快捷方式来使您的代码更简洁

快捷方式 等价方式

uni.map(x → y)

uni.onItem().transform(x → y)

uni.flatMap(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.chain(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.invoke(x → System.out.println(x))

uni.onItem().invoke(x → System.out.println(x))

uni.call(x → uni2)

uni.onItem().call(x → uni2)

uni.eventually(() → System.out.println("eventually"))

uni.onItemOrFailure().invoke((ignoredItem, ignoredException) → System.out.println("eventually"))

uni.eventually(() → uni2)

uni.onItemOrFailure().call((ignoredItem, ignoredException) → uni2)

uni.replaceWith(x)

uni.onItem().transform(ignored → x)

uni.replaceWith(uni2)

uni.onItem().transformToUni(ignored → uni2)

uni.replaceIfNullWith(x)

uni.onItem().ifNull().continueWith(x)

响应式流

Mutiny 使用 响应式流Multi 实现了 Publisher 并强制执行背压协议。排放受到来自下游订阅者发出的请求的约束。因此,它不会使订阅者过载。请注意,在某些情况下,您无法遵循此协议(因为 Multi 发出的事件无法控制,例如时间或测量传感器)。在这种情况下,Mutiny 提供了一种使用 onOverflow() 控制溢出的方法。

Uni 不实现响应式流 PublisherUni 只能发出一个事件,因此订阅 Uni 足以表达您使用该结果的意图,并且不需要请求协议仪式。

Mutiny 和 Vert.x

Vert.x 是一个用于构建响应式应用程序和系统的工具包。它提供了遵循响应式原则(即非阻塞和异步)的庞大库生态系统。Vert.x 是 Quarkus 的重要组成部分,因为它提供了它的响应式功能。

此外,整个 Vert.x API 可以与 Quarkus 一起使用。为了提供一致的体验,Vert.x API 也可使用 Mutiny 变体,即返回 Uni 和 Multi。

有关此 API 的更多详细信息,请参见:https://quarkus.net.cn/blog/mutiny-vertx/

Quarkus 中的 Mutiny 集成

Mutiny 在 Quarkus 中的集成不仅仅是库。Mutiny 公开了一些钩子,允许 Quarkus 和 Mutiny 紧密集成

  • 如果您在 I/O 线程上运行,调用 awaittoIterable 将失败,从而防止阻塞 I/O 线程;

  • log() 运算符使用 Quarkus logger;

  • 默认的 Mutiny 线程池是 Quarkus worker 线程池;

  • 当使用 Mutiny Uni 和 Multi 时,默认启用上下文传播

有关基础架构集成的更多详细信息,请参见 https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/

相关内容