如何使用 Redis 实现任务队列

如何使用 Redis 进行缓存中,我们实现了一个简单的、由 Redis 支持的缓存。
这只是 Redis 的一种用例。Redis 也用作消息服务器,用于实现后台作业或其他消息任务的处理。本文将探讨如何使用 Quarkus 和新的 Redis 数据源 API 来实现此模式。

任务队列和超级英雄!

任务队列是一种存储执行请求的数据结构。任务分发器将它们想要执行的任务提交到该数据结构中。另一方面,任务消费者轮询请求并执行它们。

pattern

这种模式有许多变体,因此让我们专注于以下应用程序。我们有一个管理英雄和反派的应用程序。该应用程序提供了模拟随机英雄和随机反派之间战斗的可能性。战斗模拟委托给战斗模拟器,这是专门用于此任务的应用程序。

application

在此上下文中,主应用程序将战斗请求提交到任务队列。然后,战斗模拟器轮询已提交的战斗请求并执行它们。

战斗结果使用另一项 Redis 功能进行通信:pub/sub 通信。模拟器将结果发送到应用程序使用的频道。然后,应用程序将这些结果广播到网页。

本文仅讨论与 Redis 的交互。应用程序的其余部分很简单,仅使用 RESTEasy Reactive 和 Hibernate ORM with Panache。您可以在 https://github.com/cescoffier/quarkus-redis-job-queue-demo 上找到该应用程序的完整代码。

提交任务

第一个任务是模拟任务队列。我们使用Redis 列表来存储FightRequest

package me.escoffier.quarkus.redis.fight;

public record FightRequest(String id, Hero hero, Villain villain) {

}

Redis 列表区分列表的左侧和右侧。这种区分允许实现 FIFO 队列,我们向左侧写入,从右侧消耗。

要操作 Redis 列表,我们需要与该数据结构关联的命令组。在SupesService 类中,我们注入 RedisDataSource 并检索命令组。

public SupesService(RedisDataSource dataSource, ...) {
    commands = dataSource.list(FightRequest.class);
  // ...
}

现在让我们看一下 submitAFight 方法。

public FightRequest submitAFight() {
    var hero = Hero.getRandomHero();
    var villain = Villain.getRandomVillain();
    var id = UUID.randomUUID().toString();
    var request = new FightRequest(id, hero, villain);
    commands.lpush("fight-requests", request);
    return request;
}

submitAFight 方法检索随机战斗者,计算 ID,构建 FightRequest 实例,并执行 LPUSH 命令。LPUSH 命令将给定项写入存储在给定键(fight-requests)处的列表的左侧。

接收任务请求

现在让我们看一下另一端:战斗模拟器。模拟器从代表我们任务队列的 Redis 列表中轮询 FightRequests 并模拟战斗。

模拟器实现在me.escoffier.quarkus.redis.fight.FightSimulator。构造函数接收一个配置好的名称(以区分多个模拟器)和 Redis 数据源。它创建用于发出 Redis 命令以从 Redis 列表读取的对象。

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
    this.name = name;
    this.queue = ds.list(FightRequest.class);
    // ...
}

模拟器轮询战斗请求,并为每个请求模拟战斗。实现是一个无限循环(直到应用程序关闭才会停止)。在每次迭代中,它使用 BRPOP 命令从队列的右侧读取待处理的 FightRequest。如果没有待处理的请求,它会从循环开始重新开始。如果有请求,它会模拟战斗。

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item =
            queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            //...
        }
    }
}

BRPOP 命令检索并删除列表的最后一个(右侧)元素。与 RPOP 不同,如果列表中没有元素,它会等待指定的时长(上面代码中的 1 秒)。因此,如果列表包含元素,它会获取它。否则,它会等待最多一秒钟后放弃。在这种情况下,它返回 nullBRPOP 命令返回一个由列表键和 FightRequest 组成的 KeyValue。它使用该结构是因为您可以传递多个键,这在您拥有具有优先级的列表时很方便。

BRPOP 命令还避免了列表为空时无限旋转,因为它在每次迭代中等待 1 秒。最后,BRPOP 命令是原子的。这意味着如果您有多个模拟器,它们无法检索相同的项。它会分派每个项一次。

发送战斗结果

池循环从队列中检索 FightRequests 并模拟战斗,但如何通信结果?为此,我们使用另一项 Redis 功能:pub/sub 通信。

简单来说,我们将把 FightResult 发送到一个频道。订阅该频道的应用程序将收到发出的 FightResult

FightResult 包含请求 ID、两个战斗者以及获胜者的姓名。

package me.escoffier.quarkus.redis.fight;

public record FightResult(String id, Hero hero, Villain villain, String winner) {

}

要使用 Redis 的pub/sub 命令,我们需要与该组关联的对象。在 FightSimulator 中,我们还使用 pubsub 方法来获取该对象。

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
    this.name = name;
    this.logger = logger;
    this.queue = ds.list(FightRequest.class);
    this.publisher = ds.pubsub(FightResult.class);  // <--- this is it!
}

现在,我们可以使用此 publisher 发送 FightResults。每次战斗后,我们调用 publisher.publishFightResult 实例发送到 fight-results 频道。

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            publisher.publish("fight-results", result);  // Send the outcome
           }
    }
}

接收战斗结果

此时,

  • 我们将战斗请求提交到任务队列,

  • 我们消耗该队列并模拟战斗,

  • 我们将结果发送到 fight-results 频道。

因此,唯一缺失的部分是该频道的消耗。让我们回到 me.escoffier.quarkus.redis.supes.SupesService 类。在构造函数中,我们还注入了 ReactiveRedisDataSource,这是 Redis 数据源的响应式变体。然后,在构造函数代码中,我们订阅 fight-results

public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
    commands = dataSource.list(FightRequest.class);
    stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
            .broadcast().toAllSubscribers();
}

由于我们使用的是响应式数据源,因此此订阅返回一个 Multi<FightResult>,已准备好由 Quarkus 和 SSE 提供服务(请参阅 SupesResource.java)。

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
    return supes.getFightResults();
}
.broadcast().toAllSubscribers() 指示 Quarkus 将所有收到的 FightResult 广播给所有连接的 SSE。因此,浏览器会过滤掉未请求的结果。

运行系统

一切就绪!完整源代码可在 https://github.com/cescoffier/quarkus-redis-job-queue-demo 获取。要运行系统,请打开三个终端。

首先,我们启动 supes-application。在第一个终端中,导航到 supes-application 目录并运行 mvn quarkus:dev。Quarkus 会自动启动 PostgreSQL 和 Redis 实例(如果您的计算机可以运行容器)。在控制台中,按 h 然后按 c。它会显示正在运行的开发服务。找到 Redis 服务,然后复制注入的 quarkus.redis.hosts 配置。

redis-client - Up About a minute
  Container:        348edec50f80/trusting_jennings  docker.io/redis:7-alpine
  Network:          bridge - 0.0.0.0:53853->6379/tcp
  Exec command:     docker exec -it 348edec50f80 /bin/bash
  Injected Config:  quarkus.redis.hosts=redis://:53853

在上面的代码片段中,复制:quarkus.redis.hosts=redis://:53853。这是 Redis 服务器的地址。我们需要将模拟器配置为使用此地址。

如果您访问 https://:8080,网页将被提供。您可以点击 fights! 按钮几次。

screenshot

战斗不会发生,因为我们没有模拟器。但是,战斗请求已被提交并存储在列表中。所以它们不会丢失。

现在,在第二个终端中,导航到 fight-simulator 目录,然后运行:

mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://:53853 -jar target/quarkus-app/quarkus-run.jar

重要提示:将 quarkus.redis-hosts 更新为上面复制的地址。

一旦启动,它就会处理待处理的战斗请求。

2022-09-11 15:31:58,914 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)

如果您回到网页,获胜者将获得一个光环

screenshot winner

现在,在第三个终端中,导航到 fight-simulator 目录,然后运行:

java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://:53853 -jar target/quarkus-app/quarkus-run.jar

重要提示:与上一个命令一样,将 quarkus.redis-hosts 更新为上面复制的地址。

回到网页,点击 fight! 按钮几次。查看两个模拟器的日志,了解战斗请求如何在两个模拟器之间分派。

总结

本文介绍了如何使用 Redis 和 Quarkus Redis 数据源 API 实现任务队列。

Quarkus 文档中了解有关 Redis 数据源 API 的更多信息。我们将发布更多关于 Redis 模式的内容,敬请关注!