Kafka - 如何优雅地失败

失败是不可避免的。我们对此无能为力,这同样适用于 Kafka 应用程序。你的应用程序可能包含一个偶尔出现故障或无法处理特定 Kafka 记录的 faulty component。在这篇文章中,我们将看看如何管理这些失败。

ACK 和 NACK

但是,首先,我们需要解释它是如何工作的。在使用响应式消息传递时,你的应用程序会收到 Messages。即使你的方法处理的是 payloads,底层仍然是 Messages,并且它会在调用你的方法之前解开 payload。

messages

消息可以被 ackednacked。如果消息处理成功完成,则消息将被确认。你可以手动触发确认(通过调用 ack() 方法)或让框架自动处理。通常,一旦成功将出站消息发送到代理,出站连接器就会确认消息。如果未另行配置,确认消息会确认 消息,并一直追溯到源头,最有可能由入站连接器创建。

Acknowledgement chain

当入站连接器收到确认时,它可以根据此进行操作,例如,向代理指示消息处理已成功。在 Kafka 的上下文中,有各种提交策略。我们将在未来的帖子中介绍这些。

但正如前面所说,失败是不可避免的。例如,你的 faulty component 可能会抛出异常,或者出站连接器无法发送消息,因为远程代理不可用。在这些情况下,消息会被 nacked,表示处理失败。与成功确认类似,否定确认可以手动触发(使用 nack 方法)或自动处理。例如,如果你的方法抛出异常,则消息会被 nacked。与 ack 一样,nacking 消息会 nack 其源,并且 nack 会一直传播到入站连接器。

Negative acknowledgment chain

连接器有责任决定在这种情况下如何反应。Kafka 连接器提供了三种故障处理策略,这正是我们将要详细介绍的内容。

“快速失败”策略

第一种策略是最简单的,但我不确定我们是否可以称之为“平稳”。这是默认策略。一旦消息被 nacked,连接器就会报告失败,应用程序将停止。如果使用健康检查扩展,应用程序将被标记为不健康,你的编排器可能会重启应用程序。

Fail-Fast

但是,是时候展示一下了。我创建了一个简单的应用程序,它从 Kafka 接收电影标题,如果标题包含 ',,则会失败(抛出异常)。你可以在此 Gist 上查看代码,或使用以下命令运行它

jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/9b0a114b2d5825543f2890d2071b9387441e008b/KafkaFailFast.java
启动应用程序会使用 Docker 启动一个 Kafka 代理。第一次启动可能需要下载相应的容器镜像。

如果你运行了该应用程序并检查了日志,你将看到

ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaFailFast$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with ' in their title: Schindler's List
    at foo.KafkaFailFast$MovieProcessor.consume(KafkaFailFast.java:47)

现在,如果你在浏览器中打开 https://:8080/health,你将看到该故障已被捕获,并且应用程序不健康。

{
    "status": "DOWN",
    "checks": [
        {
            "name": "SmallRye Reactive Messaging - liveness check",
            "status": "DOWN",
            "data": {
                "movies": "[KO] - I don't like movies with ' in their title: Schindler's List",
                "movies-out": "[OK]"
            }
        },
        {
            "name": "SmallRye Reactive Messaging - readiness check",
            "status": "DOWN",
            "data": {
                "movies": "[OK]",
                "movies-out": "[OK]"
            }
        }
    ]
}

这种方法适用于偶发性的、与网络相关的故障。但是,如果故障的来源是你的应用程序代码,你可能会进入重启循环。事实上,当应用程序重启时,它可能会再次接收到导致相同故障的消息(上一张图片中的红色消息),一次又一次。

“忽略”策略

第二种策略也很简单:睁一只眼闭一只眼。当消息被 nacked 时,它会忽略故障并继续处理。

Ignore strategy

日志会指示故障,但它会继续处理下一个。只有当你不需要管理所有消息,或者你的应用程序正在内部处理故障时,才能使用此策略。

要启用此策略,请使用以下配置通道

mp.messaging.incoming.movies.failure-strategy=ignore

你可以尝试使用 Gist 来尝试此策略,或使用以下命令运行它

jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/0a1a8cd9a0cbed69d8025004cd5feab8c044d097/KafkaIgnoreFailure.java

如果你运行了应用程序并检查了日志,你将看到两个异常。

ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaFailFast$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with ' in their title: Schindler's List
    at foo.KafkaFailFast$MovieProcessor.consume(KafkaFailFast.java:47)
...
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaIgnoreFailure$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with , in their title: The Good, the Bad and the Ugly
    at foo.KafkaIgnoreFailure$MovieProcessor.consume(KafkaIgnoreFailure.java:51)
...
WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18204: A message sent to channel `movies` has been nacked, ignored failure is: I don't like movies with , in their title: The Good, the Bad and the Ugly.
INFO  [Kafka-Ignore] (vert.x-eventloop-thread-0) Receiving movie The Lord of the Rings: The Fellowship of the Ring

看最后一行。如前所述,它会继续处理下一条消息。

如果你检查应用程序的健康状况(使用 https://:8080/health),一切正常。

{
    "status": "UP",
    "checks": [
        {
            "name": "SmallRye Reactive Messaging - liveness check",
            "status": "UP",
            "data": {
                "movies": "[OK]",
                "movies-out": "[OK]"
            }
        },
        {
            "name": "SmallRye Reactive Messaging - readiness check",
            "status": "UP",
            "data": {
                "movies": "[OK]",
                "movies-out": "[OK]"
            }
        }
    ]
}

“死信队列”策略

死信队列是一个处理消息处理失败的著名模式。它不是快速失败或忽略并继续处理,而是将失败的消息存储到一个特定的目的地:死信队列。管理员(人工或软件)可以审查失败的消息并决定如何处理(重试、跳过等)。请注意,只有当应用程序不需要顺序时,才能应用此策略。

之后,你可以审查失败的消息。

Dead-letter topic

要启用此策略,你需要在配置中添加以下属性。

mp.messaging.incoming.movies.failure-strategy=dead-letter-queue

默认情况下,它会写入 dead-letter-topic-$topic-name 主题。在我们的演示中,它是 dead-letter-topic-movies。但你也可以通过设置 dead-letter-queue.topic 属性来配置主题。

根据你的 Kafka 配置,你可能需要提前创建主题并配置复制因子。

让我们来试试!KafkaDeadLetterTopic.java 文件扩展了我们之前的应用程序。它使用死信队列故障策略,并包含一个读取死信队列(dead-letter-topic-movies)的组件。

你可以使用以下命令运行应用程序。

jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/f33365cbb42f6a514777b7527ef5e35b62740f5b/KafkaDeadLetterTopic.java

如果你查看日志,你将看到两个预期的异常,并且所有标题都已处理。你还会注意到

INFO  [Kafka-Dead-Letter-Topic] (vert.x-eventloop-thread-0) The message 'The Good, the Bad and the Ugly' has been rejected and sent to the DLT. The reason is: 'I don't like movies with , in their title: The Good, the Bad and the Ugly'.

此日志由读取死信队列的组件写入。

@ApplicationScoped
public static class DeadLetterTopicReader {
    @Incoming("dead-letter-topic-movies")
    public CompletionStage<Void> dead(Message<String> rejected) {
        IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class)
                .orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka"));
        String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value());
        LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason);

        return rejected.ack();
    }
}

从死信队列读取消息时,可以通过读取 dead-letter-reason 标头来检索失败原因。

结论

Kafka 连接器提供了三种处理失败的策略。

  • fail-fast(默认)停止应用程序并将其标记为不健康。

  • ignore 即使出现故障,也会继续处理。

  • dead-letter-queue 将失败的消息发送到另一个 Kafka 主题以供进一步调查。

下次我们将讨论提交策略,因为失败是不可避免的,但成功处理有时也会发生!敬请期待!