如何处理 Mutiny 中的故障

上周,我收到了几封关于使用 Mutiny 处理错误的提问。所以,也许它值得更详细地解释一下。

错误是事件

首先,Mutiny 是一个事件驱动的反应式编程库。使用 Mutiny,您会处理事件。上游的 UniMulti 会传播这些事件,并为您提供处理它们的机会。这些事件可以是完成取消,以及…错误

Multi.createFrom().range(0, 10)
    .onItem().invoke(i -> System.out.println("Received item " + i))
    .onCompletion().invoke(() -> System.out.println("We are done!"))
    .onCancellation().invoke(() -> System.out.println(
        "The downstream does not want our items anymore!")
    )

    .onFailure().invoke(t -> System.out.println(
        "Oh no! We received a failure: " + t.getMessage())
    )

收到错误时您可以做什么?

除了调用一个动作(如前一个片段所示)之外,收到错误时您还可以做很多事情。

最常见的做法是恢复。您可以通过提供一个特定的项或另一个 Uni 来恢复。

upstream
    .onFailure().recoverWithItem(failure -> "hello (fallback)")
    .subscribe().with(i -> System.out.println("Received: " + i));

upstream
    .onFailure().recoverWithUni(failure -> getAnotherUni(failure))
    .subscribe().with(i -> System.out.println("Received: " + i));

对于 Multi,您还可以通过提供另一个 Multi 或完成流来恢复。

upstream
  .onFailure().recoverWithCompletion();

如果您对您的系统有信心,您也可以重试。请注意,您首先需要确保可以安全地重试该操作!

upstream
      .onFailure().retry()
        .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10)).atMost(10)
      .subscribe().with(i -> System.out.println("Received: " + i));

您还可以转换错误。例如,您可以将低级别错误映射为更友好的业务错误。它会将第二个错误传播到下游,隐藏低级别错误。

Uni.createFrom().failure(new IOException("boom"))
      .onFailure().transform(t -> new BusinessException(t))

错误是终止性的

错误是终止性事件。如果您的上游传播了一个错误,这意味着它无法正常运行。对于 Uni,这不成问题,因为您只能得到一个项或一个错误。但对于 multi,情况要复杂一些。

即使您通过处理错误进行了恢复,您也无法获得流的其余部分。您的上游已经… 坏了

让我们看下面的代码

List<String> list = Multi.createFrom().range(0, 10)
      .onItem().invoke(v -> {
              if (v == 7) {
                throw new IllegalArgumentException("We don't like seven!");
              }
      })
      .onFailure().recoverWithItem(7)
      .map(integer -> integer.toString())
      .onItem().invoke(s -> System.out.println(s))
      .collectItems().asList()
      .await().indefinitely();

它会产生 [1, 2, 3, 4, 5, 6, 7],而不是流的其余部分。当 onItem().invoke() 阶段使用 7 调用时,它会产生一个错误。这会停止流。它不会处理来自上游的更多项。

那么我们能做什么?隔离!

当一个阶段发送一个错误时,它会发送一个终止流的错误,并取消对上游的订阅(告知它不需要更多项,因为它无法正常运行)。因此,如果我们想继续处理来自上游的其他项;我们只需要隔离该错误,并确保我们不会取消对上游的订阅。

最常用的方法如下:

List<String> list = Multi.createFrom().range(0, 10)
    .onItem().transformToUniAndConcatenate(i ->
            // Isolate the failure in this block
            Uni.createFrom().item(i)
                    .onItem().invoke(v -> {
                        if (v == 7) {
                            throw new IllegalArgumentException("We don't like seven!");
                        }
                    })
                    .onFailure().recoverWithItem(7)
    )
    .map(integer -> integer.toString())
    .onItem().invoke(s -> System.out.println(s))
    .collectItems().asList()
    .await().indefinitely();

基本上,我们隔离了可能出错的操作。如果它失败了,我们进行恢复。但是取消操作只取消该项,而不是整个流,这意味着我们将接收下一项,依此类推。这段代码会产生预期的列表。

总结

好了,现在您可以优雅地处理错误并继续流了。

如果您想了解更多关于 Mutiny 的信息,请查看以下视频: