Kafka - 如何优雅地失败
失败是不可避免的。我们对此无能为力,这同样适用于 Kafka 应用程序。你的应用程序可能包含一个偶尔出现故障或无法处理特定 Kafka 记录的 faulty component。在这篇文章中,我们将看看如何管理这些失败。
ACK 和 NACK
但是,首先,我们需要解释它是如何工作的。在使用响应式消息传递时,你的应用程序会收到 Messages。即使你的方法处理的是 payloads,底层仍然是 Messages,并且它会在调用你的方法之前解开 payload。
消息可以被 acked 或 nacked。如果消息处理成功完成,则消息将被确认。你可以手动触发确认(通过调用 ack()
方法)或让框架自动处理。通常,一旦成功将出站消息发送到代理,出站连接器就会确认消息。如果未另行配置,确认消息会确认 源 消息,并一直追溯到源头,最有可能由入站连接器创建。
当入站连接器收到确认时,它可以根据此进行操作,例如,向代理指示消息处理已成功。在 Kafka 的上下文中,有各种提交策略。我们将在未来的帖子中介绍这些。
但正如前面所说,失败是不可避免的。例如,你的 faulty component 可能会抛出异常,或者出站连接器无法发送消息,因为远程代理不可用。在这些情况下,消息会被 nacked,表示处理失败。与成功确认类似,否定确认可以手动触发(使用 nack
方法)或自动处理。例如,如果你的方法抛出异常,则消息会被 nacked。与 ack 一样,nacking 消息会 nack 其源,并且 nack 会一直传播到入站连接器。
连接器有责任决定在这种情况下如何反应。Kafka 连接器提供了三种故障处理策略,这正是我们将要详细介绍的内容。
“快速失败”策略
第一种策略是最简单的,但我不确定我们是否可以称之为“平稳”。这是默认策略。一旦消息被 nacked,连接器就会报告失败,应用程序将停止。如果使用健康检查扩展,应用程序将被标记为不健康,你的编排器可能会重启应用程序。
但是,是时候展示一下了。我创建了一个简单的应用程序,它从 Kafka 接收电影标题,如果标题包含 '
或 ,
,则会失败(抛出异常)。你可以在此 Gist 上查看代码,或使用以下命令运行它
jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/9b0a114b2d5825543f2890d2071b9387441e008b/KafkaFailFast.java
启动应用程序会使用 Docker 启动一个 Kafka 代理。第一次启动可能需要下载相应的容器镜像。 |
如果你运行了该应用程序并检查了日志,你将看到
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaFailFast$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with ' in their title: Schindler's List
at foo.KafkaFailFast$MovieProcessor.consume(KafkaFailFast.java:47)
现在,如果你在浏览器中打开 https://:8080/health,你将看到该故障已被捕获,并且应用程序不健康。
{
"status": "DOWN",
"checks": [
{
"name": "SmallRye Reactive Messaging - liveness check",
"status": "DOWN",
"data": {
"movies": "[KO] - I don't like movies with ' in their title: Schindler's List",
"movies-out": "[OK]"
}
},
{
"name": "SmallRye Reactive Messaging - readiness check",
"status": "DOWN",
"data": {
"movies": "[OK]",
"movies-out": "[OK]"
}
}
]
}
这种方法适用于偶发性的、与网络相关的故障。但是,如果故障的来源是你的应用程序代码,你可能会进入重启循环。事实上,当应用程序重启时,它可能会再次接收到导致相同故障的消息(上一张图片中的红色消息),一次又一次。
“忽略”策略
第二种策略也很简单:睁一只眼闭一只眼。当消息被 nacked 时,它会忽略故障并继续处理。
日志会指示故障,但它会继续处理下一个。只有当你不需要管理所有消息,或者你的应用程序正在内部处理故障时,才能使用此策略。
要启用此策略,请使用以下配置通道
mp.messaging.incoming.movies.failure-strategy=ignore
你可以尝试使用 Gist 来尝试此策略,或使用以下命令运行它
jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/0a1a8cd9a0cbed69d8025004cd5feab8c044d097/KafkaIgnoreFailure.java
如果你运行了应用程序并检查了日志,你将看到两个异常。
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaFailFast$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with ' in their title: Schindler's List
at foo.KafkaFailFast$MovieProcessor.consume(KafkaFailFast.java:47)
...
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method foo.KafkaIgnoreFailure$MovieProcessor#consume has thrown an exception: java.lang.IllegalArgumentException: I don't like movies with , in their title: The Good, the Bad and the Ugly
at foo.KafkaIgnoreFailure$MovieProcessor.consume(KafkaIgnoreFailure.java:51)
...
WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18204: A message sent to channel `movies` has been nacked, ignored failure is: I don't like movies with , in their title: The Good, the Bad and the Ugly.
INFO [Kafka-Ignore] (vert.x-eventloop-thread-0) Receiving movie The Lord of the Rings: The Fellowship of the Ring
看最后一行。如前所述,它会继续处理下一条消息。
如果你检查应用程序的健康状况(使用 https://:8080/health),一切正常。
{
"status": "UP",
"checks": [
{
"name": "SmallRye Reactive Messaging - liveness check",
"status": "UP",
"data": {
"movies": "[OK]",
"movies-out": "[OK]"
}
},
{
"name": "SmallRye Reactive Messaging - readiness check",
"status": "UP",
"data": {
"movies": "[OK]",
"movies-out": "[OK]"
}
}
]
}
“死信队列”策略
死信队列是一个处理消息处理失败的著名模式。它不是快速失败或忽略并继续处理,而是将失败的消息存储到一个特定的目的地:死信队列。管理员(人工或软件)可以审查失败的消息并决定如何处理(重试、跳过等)。请注意,只有当应用程序不需要顺序时,才能应用此策略。
之后,你可以审查失败的消息。
要启用此策略,你需要在配置中添加以下属性。
mp.messaging.incoming.movies.failure-strategy=dead-letter-queue
默认情况下,它会写入 dead-letter-topic-$topic-name
主题。在我们的演示中,它是 dead-letter-topic-movies
。但你也可以通过设置 dead-letter-queue.topic
属性来配置主题。
根据你的 Kafka 配置,你可能需要提前创建主题并配置复制因子。 |
让我们来试试!KafkaDeadLetterTopic.java
文件扩展了我们之前的应用程序。它使用死信队列故障策略,并包含一个读取死信队列(dead-letter-topic-movies
)的组件。
你可以使用以下命令运行应用程序。
jbang https://gist.github.com/cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a/raw/f33365cbb42f6a514777b7527ef5e35b62740f5b/KafkaDeadLetterTopic.java
如果你查看日志,你将看到两个预期的异常,并且所有标题都已处理。你还会注意到
INFO [Kafka-Dead-Letter-Topic] (vert.x-eventloop-thread-0) The message 'The Good, the Bad and the Ugly' has been rejected and sent to the DLT. The reason is: 'I don't like movies with , in their title: The Good, the Bad and the Ugly'.
此日志由读取死信队列的组件写入。
@ApplicationScoped
public static class DeadLetterTopicReader {
@Incoming("dead-letter-topic-movies")
public CompletionStage<Void> dead(Message<String> rejected) {
IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class)
.orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka"));
String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value());
LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason);
return rejected.ack();
}
}
从死信队列读取消息时,可以通过读取 dead-letter-reason
标头来检索失败原因。