如何使用 Redis 实现任务队列
在如何使用 Redis 进行缓存中,我们实现了一个简单的、由 Redis 支持的缓存。
这只是 Redis 的一种用例。Redis 也用作消息服务器,用于实现后台作业或其他消息任务的处理。本文将探讨如何使用 Quarkus 和新的 Redis 数据源 API 来实现此模式。
任务队列和超级英雄!
任务队列是一种存储执行请求的数据结构。任务分发器将它们想要执行的任务提交到该数据结构中。另一方面,任务消费者轮询请求并执行它们。

这种模式有许多变体,因此让我们专注于以下应用程序。我们有一个管理英雄和反派的应用程序。该应用程序提供了模拟随机英雄和随机反派之间战斗的可能性。战斗模拟委托给战斗模拟器,这是专门用于此任务的应用程序。

在此上下文中,主应用程序将战斗请求提交到任务队列。然后,战斗模拟器轮询已提交的战斗请求并执行它们。
战斗结果使用另一项 Redis 功能进行通信:pub/sub 通信。模拟器将结果发送到应用程序使用的频道。然后,应用程序将这些结果广播到网页。
本文仅讨论与 Redis 的交互。应用程序的其余部分很简单,仅使用 RESTEasy Reactive 和 Hibernate ORM with Panache。您可以在 https://github.com/cescoffier/quarkus-redis-job-queue-demo 上找到该应用程序的完整代码。
提交任务
第一个任务是模拟任务队列。我们使用Redis 列表来存储FightRequest。
package me.escoffier.quarkus.redis.fight;
public record FightRequest(String id, Hero hero, Villain villain) {
}
Redis 列表区分列表的左侧和右侧。这种区分允许实现 FIFO 队列,我们向左侧写入,从右侧消耗。
要操作 Redis 列表,我们需要与该数据结构关联的命令组。在SupesService 类中,我们注入 RedisDataSource
并检索命令组。
public SupesService(RedisDataSource dataSource, ...) {
commands = dataSource.list(FightRequest.class);
// ...
}
现在让我们看一下 submitAFight
方法。
public FightRequest submitAFight() {
var hero = Hero.getRandomHero();
var villain = Villain.getRandomVillain();
var id = UUID.randomUUID().toString();
var request = new FightRequest(id, hero, villain);
commands.lpush("fight-requests", request);
return request;
}
submitAFight
方法检索随机战斗者,计算 ID,构建 FightRequest
实例,并执行 LPUSH
命令。LPUSH
命令将给定项写入存储在给定键(fight-requests
)处的列表的左侧。
接收任务请求
现在让我们看一下另一端:战斗模拟器。模拟器从代表我们任务队列的 Redis 列表中轮询 FightRequests
并模拟战斗。
模拟器实现在me.escoffier.quarkus.redis.fight.FightSimulator
。构造函数接收一个配置好的名称(以区分多个模拟器)和 Redis 数据源。它创建用于发出 Redis 命令以从 Redis 列表读取的对象。
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
this.name = name;
this.queue = ds.list(FightRequest.class);
// ...
}
模拟器轮询战斗请求,并为每个请求模拟战斗。实现是一个无限循环(直到应用程序关闭才会停止)。在每次迭代中,它使用 BRPOP
命令从队列的右侧读取待处理的 FightRequest
。如果没有待处理的请求,它会从循环开始重新开始。如果有请求,它会模拟战斗。
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item =
queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
//...
}
}
}
BRPOP
命令检索并删除列表的最后一个(右侧)元素。与 RPOP
不同,如果列表中没有元素,它会等待指定的时长(上面代码中的 1 秒)。因此,如果列表包含元素,它会获取它。否则,它会等待最多一秒钟后放弃。在这种情况下,它返回 null
。BRPOP
命令返回一个由列表键和 FightRequest
组成的 KeyValue
。它使用该结构是因为您可以传递多个键,这在您拥有具有优先级的列表时很方便。
BRPOP
命令还避免了列表为空时无限旋转,因为它在每次迭代中等待 1 秒。最后,BRPOP
命令是原子的。这意味着如果您有多个模拟器,它们无法检索相同的项。它会分派每个项一次。
发送战斗结果
池循环从队列中检索 FightRequests
并模拟战斗,但如何通信结果?为此,我们使用另一项 Redis 功能:pub/sub 通信。
简单来说,我们将把 FightResult
发送到一个频道。订阅该频道的应用程序将收到发出的 FightResult
。
FightResult
包含请求 ID、两个战斗者以及获胜者的姓名。
package me.escoffier.quarkus.redis.fight;
public record FightResult(String id, Hero hero, Villain villain, String winner) {
}
要使用 Redis 的pub/sub 命令,我们需要与该组关联的对象。在 FightSimulator
中,我们还使用 pubsub
方法来获取该对象。
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
this.name = name;
this.logger = logger;
this.queue = ds.list(FightRequest.class);
this.publisher = ds.pubsub(FightResult.class); // <--- this is it!
}
现在,我们可以使用此 publisher
发送 FightResults
。每次战斗后,我们调用 publisher.publish
将 FightResult
实例发送到 fight-results
频道。
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
publisher.publish("fight-results", result); // Send the outcome
}
}
}
接收战斗结果
此时,
-
我们将战斗请求提交到任务队列,
-
我们消耗该队列并模拟战斗,
-
我们将结果发送到
fight-results
频道。
因此,唯一缺失的部分是该频道的消耗。让我们回到 me.escoffier.quarkus.redis.supes.SupesService
类。在构造函数中,我们还注入了 ReactiveRedisDataSource
,这是 Redis 数据源的响应式变体。然后,在构造函数代码中,我们订阅 fight-results
。
public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
commands = dataSource.list(FightRequest.class);
stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
.broadcast().toAllSubscribers();
}
由于我们使用的是响应式数据源,因此此订阅返回一个 Multi<FightResult>
,已准备好由 Quarkus 和 SSE 提供服务(请参阅 SupesResource.java)。
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
return supes.getFightResults();
}
.broadcast().toAllSubscribers() 指示 Quarkus 将所有收到的 FightResult 广播给所有连接的 SSE。因此,浏览器会过滤掉未请求的结果。 |
运行系统
一切就绪!完整源代码可在 https://github.com/cescoffier/quarkus-redis-job-queue-demo 获取。要运行系统,请打开三个终端。
首先,我们启动 supes-application
。在第一个终端中,导航到 supes-application
目录并运行 mvn quarkus:dev
。Quarkus 会自动启动 PostgreSQL 和 Redis 实例(如果您的计算机可以运行容器)。在控制台中,按 h
然后按 c
。它会显示正在运行的开发服务。找到 Redis 服务,然后复制注入的 quarkus.redis.hosts
配置。
redis-client - Up About a minute
Container: 348edec50f80/trusting_jennings docker.io/redis:7-alpine
Network: bridge - 0.0.0.0:53853->6379/tcp
Exec command: docker exec -it 348edec50f80 /bin/bash
Injected Config: quarkus.redis.hosts=redis://:53853
在上面的代码片段中,复制:quarkus.redis.hosts=redis://:53853
。这是 Redis 服务器的地址。我们需要将模拟器配置为使用此地址。
如果您访问 https://:8080,网页将被提供。您可以点击 fights!
按钮几次。

战斗不会发生,因为我们没有模拟器。但是,战斗请求已被提交并存储在列表中。所以它们不会丢失。
现在,在第二个终端中,导航到 fight-simulator
目录,然后运行:
mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://:53853 -jar target/quarkus-app/quarkus-run.jar
重要提示:将 quarkus.redis-hosts
更新为上面复制的地址。
一旦启动,它就会处理待处理的战斗请求。
2022-09-11 15:31:58,914 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)
如果您回到网页,获胜者将获得一个光环。

现在,在第三个终端中,导航到 fight-simulator
目录,然后运行:
java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://:53853 -jar target/quarkus-app/quarkus-run.jar
重要提示:与上一个命令一样,将 quarkus.redis-hosts
更新为上面复制的地址。
回到网页,点击 fight!
按钮几次。查看两个模拟器的日志,了解战斗请求如何在两个模拟器之间分派。
总结
本文介绍了如何使用 Redis 和 Quarkus Redis 数据源 API 实现任务队列。
从Quarkus 文档中了解有关 Redis 数据源 API 的更多信息。我们将发布更多关于 Redis 模式的内容,敬请关注!