使用 Redis 和 Mutiny - 组合异步操作
我收到了一位用户关于 Redis 和 Mutiny 的一个有趣问题。虽然这个问题并非专门针对 Redis,可以应用于许多其他 API,但我觉得这个情境很有趣。
用户 Enrico 想要做类似这样的事情
1. get all keys from Redis
2. for each key -> retrieve the associated object
3. add this object to a JsonArray
4. produce the JsonArray with all the objects
Enrico 正在使用 Vert.x Redis 客户端的 Mutiny 版本。
该客户端提供了一些方法来帮助我们解决问题。
-
RedisClient.keys(pattern)
方法返回Uni<JsonArray>
。这个数组包含与传递给keys
方法的模式匹配的键的列表。为了简化本文,我们使用:keys("*")
返回所有键。 -
RedisClient.hgetall(key)
方法返回一个Uni<JsonObject>
。此方法检索与传递的键关联的对象。
这两种方法都是异步的(它们返回 Uni
),我们需要为检索到的每个键调用第二个方法。换句话说,我们需要迭代键集,并为每个键调用一个异步操作。最后,我们希望将这些异步操作的结果收集到一个 JsonArray
中。
让我们从头开始,我们需要 Redis 客户端实例。
RedisClient redis = RedisClient.create(vertx, new JsonObject()
.put("port", 6379)
.put("host", "localhost"));
请注意,在 Quarkus 中,您应该直接使用 Redis 扩展,它公开了一个类似的 API。Enrico 想要直接使用 Vert.x Redis 客户端。
现在我们有了客户端,让我们检索键列表。
Uni<JsonArray> keys = redis.keys("*")
这会生成 JsonArray
,但我们想要一个键流。同样,这是一个异步方法。返回的 Uni
在数组可用时接收该数组。一旦收到(onItem
),我们就可以从此数组创建一个流。
Multi<String> keys = redis.keys("*")
.onItem().transformToMulti(array -> Multi.createFrom().iterable(array))
.onItem().castTo(String.class);
这段代码片段
-
检索包含键的
JsonArray
。 -
创建了一个
Multi
流式传输这些键,它是一个Multi<Object>
,因为JsonArray
继承自Iterable<Object>
。 -
将此
Multi
中的项映射为String
。
此时,我们有了一个(字符串)键流。因此,第一步就完成了。
现在,第二步:对于每个键,我们希望检索关联的对象。
所以,我们使用 hgetall
方法。
Multi<JsonObject> objects = keys
.onItem().transformToUniAndMerge(key -> redis.hgetall(key));
这段代码片段需要一些解释。
对于流 keys
的每个项,我们调用 hgetall
,它会生成一个 Uni<JsonObject>
。
所以,我们想将我们的键转换为 Uni(transformToUni)。
当您有一个项目流并需要为每个项目调用异步操作时,您必须选择如何合并结果。Mutiny 提供了两种策略:
-
merge - 一旦收到
Uni
生成的项目,我们立即将其发送下游。 -
concatenate - 我们保留输入流的顺序,以确保项目以相同的顺序发送到下游。
我们来举个例子。假设我们有键 1
、2
、3
,流为 {1, 2, 3}
。另外,假设在我们的 Redis 数据库中,键 1
关联到 A
,2
关联到 B
,3
关联到 C
。
如果您使用 merge 策略,我们将以不确定的顺序检索关联的对象。我们可能会得到 {A, C, B}
或 {B, A, C}
。这取决于许多因素,例如延迟、调度、负载等。但是,这也意味着我们可以并发地检索所有关联的对象,并生成结果流,而不必关心顺序。
如果您使用 concatenate 策略,它会保留输入流的顺序。因此,它总是会生成 {A, B, C}
。虽然这可能很理想,但它可能会降低并发检索对象的可能性,因为 Mutiny 必须等待所有先前对象的检索完成。例如,如果 Mutiny 先接收到 C
,它需要等待 A
和 B
之后才能将 C
发送到下游。
在我们的场景中,我们不保留顺序,而是使用 merge 策略。所以我们使用 transformToUniAndMerge
。
如果您多次运行代码,可能会看到结果数组中的顺序发生变化。
好的,第二步完成。让我们专注于最后几步:将对象累积到一个 JsonArray
中,并生成一个包含所有对象的 Uni<JsonArray>
。Mutiny 提供了将流中的项收集到列表、映射、集合中的方法,但没有内置的 JsonArray
支持。幸运的是,Mutiny 提供了一个方法,您可以使用它来收集任意结构中的项。
Uni<JsonArray> result = objects
.collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));
collectItems().in
允许将项累积到您自己的结构中。它接受两个参数:一个仅调用一次的结构供应商,以及一个接收结构和要添加的项的 bi-consumer,后者针对每个项调用。
好了,我们已经具备了解决 Enrico 问题所需的一切。
一站式代码如下:
Uni<JsonArray> result =
// Step 1 - retrieve the keys
redis.keys("*")
.onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
.onItem().castTo(String.class)
// Step 2 - retrieve the associated object for each key
.onItem().transformToUniAndMerge(key -> redis.hgetall(key))
// Step 3 and 4 - accumulate the retrieved object in a JsonArray
.collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));
在这段代码片段中,有几个有趣的模式:
-
当您有一个集合,并想用 Mutiny 遍历它时,将其转换为
Multi
。 -
当您为流的每个项执行异步操作时,请考虑 merge 与 concatenate。使用适合您的那种。
-
要将项累积到结构中,请使用
collectItems
,它提供了许多方法来生成您选择的结构。
如果您想看到这段代码的实际运行效果,请查看这个 gist。您甚至可以使用 JBang 直接运行它。
jbang https://gist.github.com/cescoffier/e8c8a18897f9e5ca15f1378876a1bd93
您可以将 merge 替换为 concatenate 来查看差异。
享受!