Redis 作业队列 - 重载

在《如何使用 Redis 实现任务队列》一文中,我们解释了如何使用 Redis 和 Quarkus 的新 Redis API 实现任务队列机制。该博文中探讨的方法存在一个重大缺陷:如果任务执行失败,请求就会丢失,并且永远不会被重新尝试。

在本文中,我们将介绍如何提高任务队列的可靠性,以处理失败、启用重试并使用“死信队列”来避免“毒丸”。

回顾与问题

上一篇博文中,我们实现了以下系统。

application

应用程序接收战斗请求,并将这些请求写入 Redis 列表。多个模拟器处理该列表。战斗的结果通过 Redis Pub/Sub 进行通信。

该架构可以正常工作,并且通过模拟器代码使用的 brpop 命令确保战斗只能执行一次。此命令以原子方式从队列中弹出元素,并确保其他模拟器也无法处理它。

然而,这种架构有一个缺点。如果弹出的战斗请求处理失败,请求就会丢失。没有其他模拟器能够处理它,并且如果失败的模拟器重新启动,它也不会重新处理相同的请求。

引入更多队列

处理该问题的一种方法是引入更多队列。除了主队列(上图中所示的 Redis 列表)之外,我们为每个模拟器引入一个队列。因此,每个模拟器都有其私有队列。

reloaded

这些私有队列构成了一个安全网。

因此,模拟器不仅使用主队列,还使用其私有队列。

this.queues = ds.list(FightRequest.class);
this.queueName = "queue-" + name; // the name of the private queue

当模拟器从主队列弹出请求时,它不会立即处理它;而是将其写入其私有队列。为了实现这一点,我们不能使用 brpop 然后写入另一个队列,因为如果在两者之间发生任何问题,我们就会遇到相同的问题。相反,我们使用 blmove,它以原子方式将元素从一个列表弹出并推送到另一个列表。因此,我们确保多个模拟器不能消耗相同的请求,并且请求不会丢失。

因此,我们使用以下代码将请求从主队列移动到私有队列。

// pop the item at the right side of the 'fight-requests' queue
// and writes it to the left side of 'queueName'.
// it returns the moved item or `null` in the entry queue, 'fight-requests',
// does not have any item, even after the 1-second delay
var moved = queues.blmove("fight-requests", queueName,
        Position.RIGHT, Position.LEFT, Duration.ofSeconds(1));

现在,模拟器不从主队列模拟请求,而是需要处理添加到其私有队列的请求。

public void processRequestFromPrivateQueue() {
    var request = queues.lindex(queueName, -1);
    while (request != null) {
        runSimulation(request);
        queues.lrem(queueName, 1, request);
        request = queues.lindex("queue-" + name, -1);
    }
}

这段代码与上一篇博文中的代码略有不同。这次我们不弹出。我们获取队列中的最后一个元素(索引 -1 是最后一个),处理它,然后从队列中删除它。我们一直这样做,直到队列为空。

让我们把所有内容放在一起:1. 当模拟器启动时,它应该处理其私有队列中的项目。因此,如果它崩溃,它将重试处理该项目。2. 一旦私有队列为空,它就会从主队列获取新请求。它不会直接处理它们,而是重新触发私有队列的处理,直到队列为空。

@Override
public void run() {
  // First, check if we are recovering, and drain the requests from the
  // simulator's queue
  processRequestFromPrivateQueue();
  while (! stopped) {
    // Simulator's queue drained - poll the main queue
    var moved = queues.blmove("fight-requests", queueName,
        Position.RIGHT, Position.LEFT, Duration.ofSeconds(1)
    );
    if (moved != null) {
      // If an element has been moved, process it
      processRequestFromPrivateQueue();
    }
  }
}

新架构,新问题

该方法解决了最初的问题。如果处理失败,我们会重试(因为请求未从私有队列中删除)。这将很好地处理瞬时故障。

然而,它也有自己的缺点:

  • 重复项:如果处理成功,但由于任何原因(如网络故障)lrem 失败,则请求将被处理另一次,因为它未从队列中删除。

  • 毒丸:如果请求无法成功处理,它将永远重试处理。

去重

处理重复项需要一种方法来唯一标识请求并手动去重。换句话说,如果我们的所有请求都有一个唯一 ID,我们可以检查该 ID 是否已被处理(例如,通过将已处理 ID 存储在另一个列表或哈希表中)。如果项目已被处理,则忽略它(从队列中删除),然后处理下一个项目。

public void processRequestFromPrivateQueue() {
    var request = queues.lindex(queueName, -1);
    while (request != null) {
        if (! isDuplicate(request)) {
            runSimulation(request);
        }
        queues.lrem(queueName, 1, request);
 .      request = queues.lindex("queue-" + name, -1);
    }
}

避免吞咽“毒丸”

处理毒丸更加复杂。毒丸是指总是会导致处理失败的请求。这可能是由于处理代码中的错误或意外情况,它总是会失败。在这种情况下重试将无济于事;我们没有遇到瞬时问题。

那么,我们能做什么?我们需要跟踪该请求的处理尝试次数,如果超过特定次数,我们就面对现实:我们将无法处理该请求。我们通常希望将请求发送到死信队列 (DLQ),即存储不可处理项目的特定队列。

public void processRequestFromPrivateQueue() {
    var request = queues.lindex(queueName, -1);
    while (request != null) {
        if (counter.incr(counterName) > MAX_ATTEMPT) {
            // Give up - it's a poison pill
            queues.lpush(DLQ, request); // Add to DLQ
        } else {
            runSimulation(request);
        }
        request = queues.lindex("queue-" + name, -1);
        queues.lrem(queueName, 1, request);
        counter.set(counterName, 0); // Reset
    }
}

计数器是一个特定的 Redis 整数值,我们在成功或放弃时对其进行递增和重置。

来自 DLQ 的项目并未丢失;它们被保存以供将来处理。这些项目可以重新添加到主队列(以验证这是否不是瞬时问题或错误是否已修复)。另一种方法要求人类管理员在将这些请求重新注入系统之前对其进行查看;也许这只是一个格式问题……

总结

本文探讨了如何改进我们在《如何使用 Redis 实现任务队列》中实现的作业队列。尽管简单,但最初的实现会在处理失败时丢失请求。本文提出了另一种更复杂的架构来处理这种情况,同时也处理重复项和毒丸。但是,天下没有免费的午餐,最终的代码也稍微复杂一些。

请记住:Redis 是一个很棒的工具箱。但是,它是一个工具箱;您需要用它来构建您需要的东西,因为它很少是现成的。话虽如此,Redis 命令的丰富性让您可以做很多事情……(剧透:我们将在未来的文章中看到其中的一些)。