Mutiny - 凡人的异步
Mutiny 是一个直观的响应式编程库。它是使用 Quarkus 编写响应式应用程序的主要模型。
事件驱动的响应式编程 API
Mutiny 与其他响应式编程库有很大不同。它采用不同的方法来设计您的程序。使用 Mutiny,一切都是事件驱动的:您接收事件,并对它们做出反应。这种事件驱动的特性拥抱了分布式系统的异步本质,并提供了一种优雅而精确的方式来表达延续性。
Mutiny 提供两种类型,它们都是事件驱动和延迟的
-
Uni
发出一个事件(一个 item 或一个 failure)。Unis 方便表示返回 0 或 1 个结果的异步操作。一个很好的例子是向消息代理队列发送消息的结果。 -
Multi
发出多个事件(n 个 item、1 个 failure 或 1 个 completion)。Multis 可以表示 item 流,可能是无界的。一个很好的例子是从消息代理队列接收消息。
这两种类型允许表示任何类型的交互。它们是事件源。您观察它们(subscription),并在它们发出 item、failure 或在有界 Multi 的情况下,发出 completion 事件时收到通知。当您(订阅者)收到事件时,您可以处理它(例如,转换它、过滤它)。使用 Mutiny,您将编写类似 onX().action() 的代码,可以理解为 “在 item X 上执行 action”。
如果您想了解更多关于 Mutiny 及其背后概念的信息,请查看 Mutiny 参考文档。
Quarkus 中的 Mutiny
在处理 Quarkus 的响应式特性时,Mutiny 是主要的 API。这意味着大多数扩展都支持 Mutiny,要么通过公开返回 Uni 和 Multi 的 API(例如响应式数据源或 rest 客户端),要么理解您的方法何时返回 Uni 或 Multi(例如 Quarkus REST (以前的 RESTEasy Reactive) 或 Reactive Messaging)。
这些集成使 Mutiny 成为使用 Quarkus 开发的每个响应式应用程序的突出且有凝聚力的模型。此外,Mutiny 架构允许细粒度的死代码消除,从而在使用本机编译时(例如使用 Quarkus 本机模式或 GraalVM 本机镜像编译器)提高内存使用率。
为什么需要另一个响应式编程 API?
经验丰富的响应式开发人员可能会想知道,为什么 Quarkus 引入了另一个响应式编程 API,而现有的 API 已经存在。Mutiny 采用了非常不同的角度
事件驱动 - Mutiny 将事件置于其设计的核心。使用 Mutiny,您可以观察事件、对它们做出反应,并创建优雅且可读的处理管道。不需要函数式编程博士学位。
可导航 - 即使有智能代码完成,具有数百种方法的类也会令人困惑。Mutiny 提供了一个可导航且显式的 API,引导您找到所需的运算符。
非阻塞 I/O - Mutiny 是驯服具有非阻塞 I/O 的应用程序异步特性的完美伴侣(它为 Quarkus 提供动力)。声明式地组合操作、转换数据、强制执行进度、从失败中恢复等等。
为异步世界而生 - Mutiny 可用于任何异步应用程序,例如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理,当然还有…… 响应式应用程序!
内置响应式流和转换器 - Mutiny 基于 响应式流规范,因此可以与其他任何响应式编程库集成。此外,它还提出了转换器以与其他流行的库进行交互。
Mutiny 和 I/O 线程
Quarkus 由一个 响应式引擎 提供动力,并且在开发响应式应用程序时,您的代码将在少数 I/O 线程之一上执行。请记住,您绝不能阻塞这些线程,如果这样做,模型将崩溃。因此,您不能使用阻塞 I/O。相反,您需要调度 I/O 操作并传递延续。

Mutiny 事件驱动范例是为此量身定制的。当 I/O 操作成功完成时,代表它的 Uni 会发出 item 事件。当它失败时,它会发出 failure 事件。延续可以使用事件驱动的 API 简单自然地表达。
通过示例学习 Mutiny
许多 Quarkus 扩展都公开了 Mutiny API。在本节中,我们使用 MongoDB 扩展来演示如何使用 Mutiny。
让我们想象一个简单的结构,表示来自 元素周期表 的元素
public class Element {
public String name;
public String symbol;
public int position;
public Element(String name, String symbol, int position) {
this.name = name;
this.symbol = symbol;
this.position = position;
}
public Element() {
// Use by JSON mappers
}
}
此结构包含元素的名称、符号和位置。要检索元素并将元素存储到 Mongo 集合中,您可以使用以下代码
@ApplicationScoped
public class ElementService {
final ReactiveMongoCollection<Element> collection;
@Inject
ElementService(ReactiveMongoClient client) {
collection = client.getDatabase("quarkus")
.getCollection("elements", Element.class);
}
public void add(Element element) {
Uni<InsertOneResult> insertion = collection.insertOne(element);
insertion
.onItem().transform(r -> r.getInsertedId().asString())
.subscribe().with(
result -> System.out.println("inserted " + result),
failure -> System.out.println("D'oh" + failure));
}
public void getAll() {
collection.find()
.subscribe().with(
element -> System.out.println("Element: " + element),
failure -> System.out.println("D'oh! " + failure),
() -> System.out.println("No more elements")
);
}
}
首先,注入 Mongo 客户端。请注意,它使用响应式变体 (io.quarkus.mongodb.reactive.ReactiveMongoClient
)。在 initialize 方法中,我们检索并存储将在其中插入元素的集合。
add
方法在集合中插入一个元素。它接收元素作为参数,并使用集合的响应式 API。与数据库交互涉及 I/O。响应式原则禁止在等待交互完成时阻塞。相反,我们调度操作并传递延续。insertOne
方法返回一个 Uni,即异步操作。那是调度的 I/O。我们现在需要表达延续,这是使用 .onItem()
方法完成的。.onItem()
允许配置当观察到的 Uni 发出 item 时需要发生的事情,在本例中是当调度的 I/O 完成时。在此示例中,我们提取插入的文档 id。最后一步是订阅。没有它,什么也不会发生。订阅会触发该操作。订阅方法还可以定义处理程序:成功时的 id
值,或插入失败时的 failure。
现在让我们看看第二个方法。它检索所有存储的元素。在这种情况下,它返回多个 item(每个存储的元素一个),因此我们使用 Multi
。对于插入,获取存储的元素涉及 I/O。find
是我们的操作。对于 Uni,您需要订阅以触发该操作。订阅者接收 item 事件、failure 事件或收到所有元素时的 completion 事件。
订阅 Uni 或 Multi 至关重要,因为没有它,该操作永远不会执行。在 Quarkus 中,某些扩展会为您处理订阅。例如,在 Quarkus REST 中,您的 HTTP 方法可以返回 Uni 或 Multi,并且 Quarkus REST 处理订阅。
Mutiny 模式
上一节的示例是故意简化的。让我们看看一些常见的模式。
观察事件
您可以使用以下方法观察各种事件
on{event}().invoke(ev → System.out.println(ev));
例如,对于 item 使用:onItem().invoke(item → …);
对于 failure,使用:onFailure().invoke(failure → …)
invoke
方法是同步的。有时您需要执行异步操作。在这种情况下,使用 call
,如:onItem().call(item → someAsyncAction(item))
。请注意,call
不会更改 item,它只是调用一个异步操作,当该操作完成时,它会向下游发出原始 item。
转换 item
第一个有用的模式是转换您收到的 item 事件。正如我们在上一节中看到的,它可能表示成功插入,或者存储在数据库中的元素。
转换 item 是使用:onItem().transform(item → ….)
完成的。
有关转换的更多详细信息,请参见 Mutiny 文档。
顺序组合
顺序组合允许链接依赖的异步操作。这是使用 onItem().transformToUni(item → …)
实现的。与 transform
不同,传递给 transformToUni
的函数返回一个 Uni。
Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));
有关异步转换的更多详细信息,请参见 Mutiny 文档。
Failure 处理
到目前为止,我们只处理 item 事件,但是处理 failure 至关重要。您可以使用 onFailure()
处理 failure。
例如,您可以使用 onFailure().recoverWithItem(fallback)
使用回退 item 恢复
Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);
您也可以重试该操作,例如
Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);
有关 failure 恢复的更多信息,请参见 failure 处理文档 和 failure 时重试文档。
事件和操作
下表列出了您可以为 Uni 和 Multi 接收的事件。它们中的每一个都与其自己的组 (onX) 相关联。第二个表列出了您可以在事件上执行的经典操作。请注意,某些组提供更多可能性。
来自上游的事件 | 来自下游的事件 | |
---|---|---|
Uni |
订阅 (1)、Item (0..1)、Failure (0..1) |
取消 |
Multi |
订阅 (1)、Item (0..n)、Failure (0..1)、完成 (0..1) |
取消、请求 |
查看 事件文档 上的完整事件列表。
操作 | API | 注释 |
---|---|---|
转换 |
|
使用同步函数将事件转换为另一个事件。下游接收函数的结果(如果转换失败,则接收 failure)。 |
transformToUni |
|
使用异步函数将事件转换为另一个事件。下游接收由生成的 Uni 发出的 item(如果转换失败,则接收 failure)。如果生成的 Uni 发出 failure,则该 failure 将传递到下游。 |
invoke |
|
调用同步消费者。这对于执行副作用操作特别方便。下游接收原始事件,如果消费者抛出异常,则接收 failure |
call |
|
调用异步函数。这对于执行异步副作用操作特别方便。下游接收原始事件,如果消费者抛出异常或如果生成的 Uni 发出 failure,则接收 failure。 |
失败 |
|
当它收到事件时,发出 failure。 |
完成(仅 Multi) |
|
当它收到事件时,发出 completion 事件。 |
快捷方式
使用 Uni 时,必须编写 onItem()
可能会很麻烦。幸运的是,Mutiny 提供了一组快捷方式来使您的代码更简洁
快捷方式 | 等价方式 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
响应式流
Mutiny 使用 响应式流。Multi
实现了 Publisher
并强制执行背压协议。排放受到来自下游订阅者发出的请求的约束。因此,它不会使订阅者过载。请注意,在某些情况下,您无法遵循此协议(因为 Multi 发出的事件无法控制,例如时间或测量传感器)。在这种情况下,Mutiny 提供了一种使用 onOverflow()
控制溢出的方法。
Uni
不实现响应式流 Publisher
。Uni
只能发出一个事件,因此订阅 Uni
足以表达您使用该结果的意图,并且不需要请求协议仪式。
Mutiny 和 Vert.x
Vert.x 是一个用于构建响应式应用程序和系统的工具包。它提供了遵循响应式原则(即非阻塞和异步)的庞大库生态系统。Vert.x 是 Quarkus 的重要组成部分,因为它提供了它的响应式功能。
此外,整个 Vert.x API 可以与 Quarkus 一起使用。为了提供一致的体验,Vert.x API 也可使用 Mutiny 变体,即返回 Uni 和 Multi。
有关此 API 的更多详细信息,请参见:https://quarkus.net.cn/blog/mutiny-vertx/。
Quarkus 中的 Mutiny 集成
Mutiny 在 Quarkus 中的集成不仅仅是库。Mutiny 公开了一些钩子,允许 Quarkus 和 Mutiny 紧密集成
-
如果您在 I/O 线程上运行,调用
await
或toIterable
将失败,从而防止阻塞 I/O 线程; -
log()
运算符使用 Quarkus logger; -
默认的 Mutiny 线程池是 Quarkus worker 线程池;
-
当使用 Mutiny Uni 和 Multi 时,默认启用上下文传播
有关基础架构集成的更多详细信息,请参见 https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/。