使用 Mutiny 进行并发异步操作

本周,有人向我询问了一个关于并发的常见用例。该用户希望“并行”调用两个微服务,并在收到两个结果后将它们合并,然后继续处理。基本上是以下模式

pattern

在非响应式方法中,这两个调用都会阻塞调用线程,并且,除非你使用工作线程池,否则这些调用不是并发的。即使你使用工作线程池,这些线程很可能被阻塞,白白消耗资源。

但是不用担心,Quarkus 的响应式特性和 Mutiny 能够处理这种情况。

调用两个服务

在本文中,我将使用 Vert.x Web Client,这是一个响应式 HTTP 客户端。它利用非阻塞 I/O,性能非常高且真正非阻塞。它不依赖于隐藏的线程池。你也可以使用 Quarkus Rest Client,但目前它仍然使用工作线程。

无论我们使用哪个客户端,都需要一些远程服务来调用。我们来使用

首先,让我们看看检索引言所需的代码:虽然这两个服务很相似,但响应的结构略有不同。所以我们最终得到

private static Uni<String> getProgrammingQuote(WebClient client) {
    return client.getAbs(PROGRAMMING_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("en") + " (" + r.body().getString("author") + ")");
}

private static Uni<String> getChuckNorrisQuote(WebClient client) {
    return client.getAbs(CHUCK_NORRIS_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("value"));
}

这两个方法接收一个 WebClient,调用服务,检索 JSON 响应,并提取它们。它们都返回一个 Uni。所以它们是异步的。结果(引言)在可用后“稍后”提供。此外,返回 Uni 意味着只有当有人订阅返回的 Uni 时,服务才会被调用。如果你订阅多次,服务也会被调用多次。

组合 Uni

到目前为止,我们有两个调用服务的方法。但我们希望像上面描绘的那样并发调用它们。

Mutiny 提供了一种“组合”由 Unis 生成的项目的方式

Uni<Tuple2<String, String>> tuple = Uni.combine().all()
    .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
    .asTuple();

当有人订阅 Uni tuple 时,它会订阅 getProgrammingQuote(client)getChuckNorrisQuote(client) Unis,从而调用服务。这样请求就会被发出,服务也会被并发调用。

当两个响应都可用时,它将它们组合成一个 Tuple,这是一个包含多个项目的简单结构。

换句话说,并发调用我们的服务非常简单。只需创建代表服务或你想实现的异步操作的 Unis,然后使用 Uni.combine().all() 将它们组合起来。你可以选择使用元组来组合结果,或者使用组合器函数。

将所有内容放在一起

// Create a Web Client
WebClient client = WebClient.create(vertx);

// Combine the result of our 2 Unis in a tuple
Uni.combine().all()
        .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
        .asTuple()

        // Subscribe (which will trigger the calls)
        .subscribe().with(tuple -> {
    System.out.println("Programming Quote: " + tuple.getItem1());
    System.out.println("Chuck Norris Quote: " + tuple.getItem2());
});

就是这样!如果你想看到这段代码的实际运行效果,请查看这个 gist。你甚至可以直接用 JBang 运行它

jbang https://gist.github.com/cescoffier/1ed68bef12b798529e10350f77686e9a

享受!