如何处理 Mutiny 中的故障
上周,我收到了几封关于使用 Mutiny 处理错误的提问。所以,也许它值得更详细地解释一下。
错误是事件
首先,Mutiny 是一个事件驱动的反应式编程库。使用 Mutiny,您会处理事件。上游的 Uni
或 Multi
会传播这些事件,并为您提供处理它们的机会。这些事件可以是项、完成、取消,以及…错误。
Multi.createFrom().range(0, 10)
.onItem().invoke(i -> System.out.println("Received item " + i))
.onCompletion().invoke(() -> System.out.println("We are done!"))
.onCancellation().invoke(() -> System.out.println(
"The downstream does not want our items anymore!")
)
.onFailure().invoke(t -> System.out.println(
"Oh no! We received a failure: " + t.getMessage())
)
收到错误时您可以做什么?
除了调用一个动作(如前一个片段所示)之外,收到错误时您还可以做很多事情。
最常见的做法是恢复。您可以通过提供一个特定的项或另一个 Uni
来恢复。
upstream
.onFailure().recoverWithItem(failure -> "hello (fallback)")
.subscribe().with(i -> System.out.println("Received: " + i));
upstream
.onFailure().recoverWithUni(failure -> getAnotherUni(failure))
.subscribe().with(i -> System.out.println("Received: " + i));
对于 Multi,您还可以通过提供另一个 Multi
或完成流来恢复。
upstream
.onFailure().recoverWithCompletion();
如果您对您的系统有信心,您也可以重试。请注意,您首先需要确保可以安全地重试该操作!
upstream
.onFailure().retry()
.withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10)).atMost(10)
.subscribe().with(i -> System.out.println("Received: " + i));
您还可以转换错误。例如,您可以将低级别错误映射为更友好的业务错误。它会将第二个错误传播到下游,隐藏低级别错误。
Uni.createFrom().failure(new IOException("boom"))
.onFailure().transform(t -> new BusinessException(t))
错误是终止性的
错误是终止性事件。如果您的上游传播了一个错误,这意味着它无法正常运行。对于 Uni,这不成问题,因为您只能得到一个项或一个错误。但对于 multi,情况要复杂一些。
即使您通过处理错误进行了恢复,您也无法获得流的其余部分。您的上游已经… 坏了。
让我们看下面的代码
List<String> list = Multi.createFrom().range(0, 10)
.onItem().invoke(v -> {
if (v == 7) {
throw new IllegalArgumentException("We don't like seven!");
}
})
.onFailure().recoverWithItem(7)
.map(integer -> integer.toString())
.onItem().invoke(s -> System.out.println(s))
.collectItems().asList()
.await().indefinitely();
它会产生 [1, 2, 3, 4, 5, 6, 7],而不是流的其余部分。当 onItem().invoke()
阶段使用 7
调用时,它会产生一个错误。这会停止流。它不会处理来自上游的更多项。
那么我们能做什么?隔离!
当一个阶段发送一个错误时,它会发送一个终止流的错误,并取消对上游的订阅(告知它不需要更多项,因为它无法正常运行)。因此,如果我们想继续处理来自上游的其他项;我们只需要隔离该错误,并确保我们不会取消对上游的订阅。
最常用的方法如下:
List<String> list = Multi.createFrom().range(0, 10)
.onItem().transformToUniAndConcatenate(i ->
// Isolate the failure in this block
Uni.createFrom().item(i)
.onItem().invoke(v -> {
if (v == 7) {
throw new IllegalArgumentException("We don't like seven!");
}
})
.onFailure().recoverWithItem(7)
)
.map(integer -> integer.toString())
.onItem().invoke(s -> System.out.println(s))
.collectItems().asList()
.await().indefinitely();
基本上,我们隔离了可能出错的操作。如果它失败了,我们进行恢复。但是取消操作只取消该项,而不是整个流,这意味着我们将接收下一项,依此类推。这段代码会产生预期的列表。