窥探流

Mutiny 是一个事件驱动的响应式编程库。与其他响应式编程库一样,它使用流作为主要构造。这些流传递事件,您的代码处理这些事件。大多数时候,您的代码只对项目和失败事件感兴趣。但还有其他类型的事件,例如取消、请求、完成等等。

为了更好地理解正在发生的事情或实现特定的副作用,查看这些各种事件是很常见的。例如,您可能需要在完成事件后关闭资源,或在发生故障或取消时记录一条消息。

对于每种类型的事件,都有一个相关的,它提供了处理该特定事件的方法:onItem()onFailure()onCompletion() 等等。这些组提供了两种方法来窥探各种事件而不影响其分发:invokecall。它不会转换接收到的事件;它会通知您发生了什么,并允许您做出响应。

invoke 方法

invoke 方法是同步的,不返回任何内容。当观察到的流分派事件时,Mutiny 会调用配置的回调。

Uni<Integer> u = uni.onItem()
    .invoke(i -> System.out.println("Received item: " + i));
Multi<Integer> m = multi.onItem()
    .invoke(i -> System.out.println("Received item: " + i));

如上所述,invoke 是同步的。Mutiny 调用回调并在回调返回后将事件传播到下游。它会阻塞分派。

invoke

当然,我们强烈建议您不要阻塞。

以下代码片段展示了如何记录不同类型的事件。

multi
  .onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed"))
  .onItem().invoke(i -> System.out.println("⬇️ Received item: " + i))
  .onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f))
  .onCompletion().invoke(() -> System.out.println("⬇️ Completed"))
  .onCancellation().invoke(() -> System.out.println("⬆️ Cancelled"))
  .onRequest().invoke(l -> System.out.println("⬆️ Requested: " + l))
前一个代码片段中的箭头指示事件来自上游(源)还是下游(消费者)。

invoke 方法不会更改事件,只有一个例外。如果 invoke 回调抛出异常,下游将不会收到实际事件,而是收到一个失败事件。

在观察失败事件时,如果回调抛出异常,Mutiny 会传播一个 CompositeException,该异常聚合了原始失败和回调失败。

call 方法

invoke 不同,call 是异步的,并且回调返回一个 Uni<?> 对象。

当您需要实现异步副作用(例如关闭资源)时,通常会使用 call

call

在回调返回的 Uni 发出项目之前,Mutiny 不会将原始事件分派到下游。

multi
    .onItem().call(i ->
        Uni.createFrom().nullItem()
            .onItem().delayIt().by(Duration.ofSeconds(1))
    )

如上一个代码片段所示,您可以使用此方法来延迟项目。但主要用例是完成异步操作。

multi
    .onCompletion().call(() -> resource.close())

在底层,Mutiny 获取 Uni(通过调用回调)并订阅它。它会观察该 Uni 中的项目或失败事件。它会丢弃项目值,因为在这种情况下只有发出才有意义。

如果回调抛出异常或生成的 Uni 生成失败,Mutiny 会将该失败(或 CompositeException)传播到下游,替换原始事件。

总结

当您想观察流而不更改传输的事件时,invokecall 方法非常方便。

使用 invoke 来实现同步副作用或记录事件。call 的异步特性使其非常适合实现异步副作用,例如关闭资源、刷新数据、延迟项目等。

下表重点介绍了主要区别

方法

invoke

call

性质

同步

异步

返回类型

void

Uni<?>

主要用例

日志记录

关闭资源,刷新数据

这些方法可用于相关组中的各种事件。