Kafka - 何时提交?

上一篇博客文章中,我们探讨了 Reactive Messaging Kafka 连接器提供的故障策略。但是,假设今天是个幸运日,一切都正常工作了。我们应该通知 Kafka 处理已成功。在 Kafka 术语中,这称为:偏移提交。本文将介绍使用 Reactive Messaging Kafka 连接器提交偏移量的不同策略。

Kafka 消费者组和偏移量

Kafka 将记录(消息)围绕主题进行组织。每个主题都有一个名称,应用程序将记录发送到主题,并从主题轮询记录。到目前为止,没有什么不寻常的。

主题被划分为分区。每个分区都是有序的、不可变的记录序列。发送到主题的消息会将其追加到选定的分区。分区中的每条消息都有一个称为偏移量的顺序 ID 号。它在分区内唯一标识每条消息。因此,在 Kafka 中,您可以使用 <主题, 分区, 偏移量> 元组来标识单个记录。

topics partitions

当应用程序从 Kafka 消费消息时,它会使用 Kafka 消费者。使用此消费者,它可以从特定主题(例如 moviesactors)轮询消息批次。检索到的消息属于分配给该消费者的分区。这一点至关重要。

消费者属于一个消费者组,由一个名称标识(上图中的 AB)。一个组包含一个或多个消费者。通常,当您扩展应用程序时,它会创建一个加入同一组的消费者。

consumer groups

每个消费者组只接收一个主题的每条记录一次。为了实现这一点,它会将组中的每个消费者分配给一组分区。例如,在上图中,来自应用程序 A1 的消费者接收分区 0 和 1 的记录。A2 接收分区 2 的记录。App B 是其消费者组中的唯一消费者。因此,它会接收所有三个分区的记录。因此(暂时忽略再平衡或其他细节),主题中的每条记录每个消费者组只会被该组中的特定消费者接收一次。

为了协调每个消费者组的进度,每个消费者会定期向代理通知其当前位置——最后处理的偏移量。它会提交偏移量,表明该分区之前的所有记录都已处理完毕。因此,如果一个消费者停止然后再次返回,它将从最后提交的位置重新开始(如果再次分配给该分区)。请注意,此行为是可配置的。

需要注意的是提交的周期性。偏移量提交成本很高,为了提高性能,我们不应在处理完每条记录后都提交偏移量。在这方面,Kafka 与传统的消息传递解决方案(如 JMS)不同,JMS 会单独确认每条消息。另一个重要特性是提交的位置性。您提交的位置表明该位置之前的所有记录都已处理完毕。但事实真的如此吗?

Kafka 默认行为

Apache Kafka 消费者默认使用自动提交方法。使用此类消费者的应用程序围绕轮询循环构建

while(true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
    processRetrievedRecords(records);
}

这样的程序会轮询一批记录,处理它们,然后轮询下一批。在调用 poll 方法时,消费者会定期自动提交前一批的最后一个偏移量。

相当不错,对吧?如果应用程序未能处理消息,它会抛出异常,该异常会中断 while 循环或被优雅地处理(在 processRetrievedRecords 方法中)。在第一种情况下,这意味着它将不再提交(因为发生在 poll 方法中,不再调用)。如果应用程序重新启动,它将从最后提交的偏移量恢复(或者,如果该组还没有偏移量,则应用 auto.offset.reset 策略,默认为 latest)。它可能会重新处理一批消息(处理重复项是应用程序的责任),但至少不会丢失任何东西。

那么,有什么问题吗?看起来很棒……直到你加入一点异步。

如果消息处理是异步的怎么办

如果消息处理是异步的(卸载到另一个线程,使用非阻塞 I/O……),则故障可能不会中断上面的 while 循环。故障是异步发生的,在轮询线程之外。如果尽管处理失败,poll 方法仍然被调用,并且自动提交仍然启用,我们可能会在发生错误时提交偏移量。如果之前检索到的某些记录的处理尚未完成,而此时发生了自动提交,它可能会认为记录已成功处理,但此时结果未知。

因此,为了处理这些情况,我们可以禁用自动提交并切换到手动提交。在这种情况下,应用程序有责任定期提交偏移量。因此,应用程序需要跟踪轮询的记录、它们的处理、故障,并定期提交偏移量。这看起来可能并不太棘手,但实际上可能变得相当困难。同样,在异步场景中,您可能会以各种顺序完成消息的处理。例如,如果您为每条记录调用远程服务,响应可能不会与记录以相同的顺序返回。您需要单独跟踪消息,并且仅在所有先前消息都成功处理后才提交偏移量。否则,您可能会在先前记录的处理仍在进行中甚至处理失败的情况下提交偏移量。

我们该如何解决这个问题?

Kafka 连接器提交策略

使用 Reactive Messaging 和 Kafka 连接器时,您就进入了一个异步的世界。

消息处理可能不会同步和顺序进行。当 Reactive Messaging Message 处理完成时,它会确认消息。在处理失败的情况下,它会发送否定确认。Kafka 连接器会收到这些确认,并可以决定需要做什么,基本上是:提交还是不提交。

您可以在三种策略中选择

  • throttled(Quarkus 1.10 起的默认值)

  • latest(Quarkus 1.10 之前的默认值)

  • ignore(如果设置了 enabled.auto.commit=true,则为默认值)

这是使用 commit-strategy 属性配置的

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=throttled

Throttled 策略

Throttled 策略可以看作是上面描述的默认“自动提交”行为的异步变体。启用后,连接器会跟踪每条接收到的消息并监控它们的确认。当连接器发现某个位置之前的所有消息都已成功处理时,它会提交该位置。此提交会定期发生,以避免过于频繁地提交。

此策略提供非常好的吞吐量,并且可以处理异步处理。要启用此策略,请通过以下方式配置通道

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=throttled

有一个细节需要提及。如果一条旧消息既未被确认也未被拒绝,该策略将无法再提交该位置。它将永远排队消息,等待该缺失的确认发生。这可能导致内存不足,因为连接器永远无法提交位置并清空队列。幸运的是,该策略会检测到这种情况并向连接器报告失败,标记应用程序不健康。throttled.unprocessed-record-max-age.ms 属性配置了每条消息在被视为有毒消息(默认为 1 分钟)之前被确认或拒绝的截止日期。

Ignore 策略

如果您明确启用了 Kafka 的自动提交(将 enable.auto.commit 属性设置为 true),连接器将默认使用此策略。在这种情况下,连接器会忽略确认,也不会提交偏移量。Kafka 消费者在轮询批次时会定期提交偏移量,如上所述。如果消息处理是同步的并且故障得到妥善处理,此策略可以正常工作。

您可以通过将 enabled-auto-commit 配置为 true 来启用此策略

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.enable.auto.commit=true
请注意,从 Quarkus 1.9 开始,自动提交默认是禁用的。因此,您需要明确启用它。

如果您不启用自动提交,仍然可以使用此策略,但它永远不会提交偏移量。换句话说,您每次都会从最早存储的记录重新开始。虽然这种情况有一些用例,但请仔细检查这是否是您想要的。在这种情况下,使用以下方式启用此策略

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=ignore

Latest 策略

此策略在每次确认消息时提交偏移量。此策略倾向于频繁提交,从而降低吞吐量。但是,如果消息是同步处理的,它也降低了重复的风险。

使用以下方式启用此策略

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.commit-strategy=latest

结论

总的来说,请使用 throttled 策略。它提供了高吞吐量并处理异步用例。此策略正在成为 Quarkus 1.10 中的默认策略。如果您认为 Kafka 自动提交是可以接受的,或者您想完全跳过偏移量提交,也可以切换到 ignore 策略。

这篇博客文章到此结束。下一篇将讨论如何使用 Kafka 连接器接收和生成 Cloud Events。