使用 Redis 和 Mutiny - 组合异步操作

我收到了一位用户关于 Redis 和 Mutiny 的一个有趣问题。虽然这个问题并非专门针对 Redis,可以应用于许多其他 API,但我觉得这个情境很有趣。

用户 Enrico 想要做类似这样的事情

1. get all keys from Redis
2. for each key -> retrieve the associated object
3. add this object to a JsonArray
4. produce the JsonArray with all the objects

Enrico 正在使用 Vert.x Redis 客户端的 Mutiny 版本。

该客户端提供了一些方法来帮助我们解决问题。

  • RedisClient.keys(pattern) 方法返回 Uni<JsonArray>。这个数组包含与传递给 keys 方法的模式匹配的键的列表。为了简化本文,我们使用:keys("*") 返回所有键。

  • RedisClient.hgetall(key) 方法返回一个 Uni<JsonObject>。此方法检索与传递的键关联的对象。

这两种方法都是异步的(它们返回 Uni),我们需要为检索到的每个键调用第二个方法。换句话说,我们需要迭代键集,并为每个键调用一个异步操作。最后,我们希望将这些异步操作的结果收集到一个 JsonArray 中。

让我们从头开始,我们需要 Redis 客户端实例。

RedisClient redis = RedisClient.create(vertx, new JsonObject()
     .put("port", 6379)
     .put("host", "localhost"));

请注意,在 Quarkus 中,您应该直接使用 Redis 扩展,它公开了一个类似的 API。Enrico 想要直接使用 Vert.x Redis 客户端。

现在我们有了客户端,让我们检索键列表。

Uni<JsonArray> keys = redis.keys("*")

这会生成 JsonArray,但我们想要一个键流。同样,这是一个异步方法。返回的 Uni 在数组可用时接收该数组。一旦收到(onItem),我们就可以从此数组创建一个流。

Multi<String> keys = redis.keys("*")
     .onItem().transformToMulti(array -> Multi.createFrom().iterable(array))
     .onItem().castTo(String.class);

这段代码片段

  1. 检索包含键的 JsonArray

  2. 创建了一个 Multi 流式传输这些键,它是一个 Multi<Object>,因为 JsonArray 继承自 Iterable<Object>

  3. 将此 Multi 中的项映射为 String

此时,我们有了一个(字符串)键流。因此,第一步就完成了。

现在,第二步:对于每个键,我们希望检索关联的对象。

所以,我们使用 hgetall 方法。

Multi<JsonObject> objects = keys
  .onItem().transformToUniAndMerge(key -> redis.hgetall(key));

这段代码片段需要一些解释。

对于流 keys 的每个项,我们调用 hgetall,它会生成一个 Uni<JsonObject>

所以,我们想将我们的键转换为 Uni(transformToUni)。

当您有一个项目流并需要为每个项目调用异步操作时,您必须选择如何合并结果。Mutiny 提供了两种策略:

  • merge - 一旦收到 Uni 生成的项目,我们立即将其发送下游。

  • concatenate - 我们保留输入流的顺序,以确保项目以相同的顺序发送到下游。

我们来举个例子。假设我们有键 123,流为 {1, 2, 3}。另外,假设在我们的 Redis 数据库中,键 1 关联到 A2 关联到 B3 关联到 C

如果您使用 merge 策略,我们将以不确定的顺序检索关联的对象。我们可能会得到 {A, C, B}{B, A, C}。这取决于许多因素,例如延迟、调度、负载等。但是,这也意味着我们可以并发地检索所有关联的对象,并生成结果流,而不必关心顺序。

如果您使用 concatenate 策略,它会保留输入流的顺序。因此,它总是会生成 {A, B, C}。虽然这可能很理想,但它可能会降低并发检索对象的可能性,因为 Mutiny 必须等待所有先前对象的检索完成。例如,如果 Mutiny 先接收到 C,它需要等待 AB 之后才能将 C 发送到下游。

在我们的场景中,我们不保留顺序,而是使用 merge 策略。所以我们使用 transformToUniAndMerge

如果您多次运行代码,可能会看到结果数组中的顺序发生变化。

好的,第二步完成。让我们专注于最后几步:将对象累积到一个 JsonArray 中,并生成一个包含所有对象的 Uni<JsonArray>。Mutiny 提供了将流中的项收集到列表、映射、集合中的方法,但没有内置的 JsonArray 支持。幸运的是,Mutiny 提供了一个方法,您可以使用它来收集任意结构中的项。

Uni<JsonArray> result = objects
   .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

collectItems().in 允许将项累积到您自己的结构中。它接受两个参数:一个仅调用一次的结构供应商,以及一个接收结构和要添加的项的 bi-consumer,后者针对每个项调用。

好了,我们已经具备了解决 Enrico 问题所需的一切。

一站式代码如下:

Uni<JsonArray> result =
  // Step 1 - retrieve the keys
  redis.keys("*")
    .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
    .onItem().castTo(String.class)
  // Step 2 - retrieve the associated object for each key
    .onItem().transformToUniAndMerge(key -> redis.hgetall(key))
  // Step 3 and 4 - accumulate the retrieved object in a JsonArray
    .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

在这段代码片段中,有几个有趣的模式:

  • 当您有一个集合,并想用 Mutiny 遍历它时,将其转换为 Multi

  • 当您为流的每个项执行异步操作时,请考虑 mergeconcatenate。使用适合您的那种。

  • 要将项累积到结构中,请使用 collectItems,它提供了许多方法来生成您选择的结构。

如果您想看到这段代码的实际运行效果,请查看这个 gist。您甚至可以使用 JBang 直接运行它。

jbang https://gist.github.com/cescoffier/e8c8a18897f9e5ca15f1378876a1bd93

您可以将 merge 替换为 concatenate 来查看差异。

享受!