窥探流
Mutiny 是一个事件驱动的响应式编程库。与其他响应式编程库一样,它使用流作为主要构造。这些流传递事件,您的代码处理这些事件。大多数时候,您的代码只对项目和失败事件感兴趣。但还有其他类型的事件,例如取消、请求、完成等等。
为了更好地理解正在发生的事情或实现特定的副作用,查看这些各种事件是很常见的。例如,您可能需要在完成事件后关闭资源,或在发生故障或取消时记录一条消息。
对于每种类型的事件,都有一个相关的组,它提供了处理该特定事件的方法:onItem()
、onFailure()
、onCompletion()
等等。这些组提供了两种方法来窥探各种事件而不影响其分发:invoke
和 call
。它不会转换接收到的事件;它会通知您发生了什么,并允许您做出响应。
invoke 方法
invoke
方法是同步的,不返回任何内容。当观察到的流分派事件时,Mutiny 会调用配置的回调。
Uni<Integer> u = uni.onItem()
.invoke(i -> System.out.println("Received item: " + i));
Multi<Integer> m = multi.onItem()
.invoke(i -> System.out.println("Received item: " + i));
如上所述,invoke
是同步的。Mutiny 调用回调并在回调返回后将事件传播到下游。它会阻塞分派。
当然,我们强烈建议您不要阻塞。
以下代码片段展示了如何记录不同类型的事件。
multi
.onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed"))
.onItem().invoke(i -> System.out.println("⬇️ Received item: " + i))
.onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f))
.onCompletion().invoke(() -> System.out.println("⬇️ Completed"))
.onCancellation().invoke(() -> System.out.println("⬆️ Cancelled"))
.onRequest().invoke(l -> System.out.println("⬆️ Requested: " + l))
前一个代码片段中的箭头指示事件来自上游(源)还是下游(消费者)。 |
invoke
方法不会更改事件,只有一个例外。如果 invoke
回调抛出异常,下游将不会收到实际事件,而是收到一个失败事件。
在观察失败事件时,如果回调抛出异常,Mutiny 会传播一个 CompositeException ,该异常聚合了原始失败和回调失败。 |
call 方法
与 invoke
不同,call
是异步的,并且回调返回一个 Uni<?>
对象。
当您需要实现异步副作用(例如关闭资源)时,通常会使用 call
。
在回调返回的 Uni 发出项目之前,Mutiny 不会将原始事件分派到下游。
multi
.onItem().call(i ->
Uni.createFrom().nullItem()
.onItem().delayIt().by(Duration.ofSeconds(1))
)
如上一个代码片段所示,您可以使用此方法来延迟项目。但主要用例是完成异步操作。
multi
.onCompletion().call(() -> resource.close())
在底层,Mutiny 获取 Uni
(通过调用回调)并订阅它。它会观察该 Uni
中的项目或失败事件。它会丢弃项目值,因为在这种情况下只有发出才有意义。
如果回调抛出异常或生成的 Uni
生成失败,Mutiny 会将该失败(或 CompositeException
)传播到下游,替换原始事件。