编辑此页面

Apache Kafka 参考指南

本参考指南演示了您的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Kafka 进行交互。

1. 简介

Apache Kafka 是一个流行的开源分布式事件流平台。它常用于高性能数据管道、流分析、数据集成和关键任务应用程序。类似于消息队列或企业消息平台,它允许您

  • 发布(写入)和订阅(读取)事件流,称为记录

  • 持久且可靠地存储记录流在主题中。

  • 处理发生的事件流或回顾性事件流。

所有这些功能都以分布式、高度可伸缩、弹性、容错和安全的方式提供。

2. Quarkus Apache Kafka 扩展

Quarkus 通过 SmallRye Reactive Messaging 框架为 Apache Kafka 提供支持。它基于 Eclipse MicroProfile Reactive Messaging 规范 2.0,提供了一个灵活的编程模型,连接 CDI 和事件驱动。

本指南将深入介绍 Apache Kafka 和 SmallRye Reactive Messaging 框架。如需快速入门,请参阅 Quarkus Messaging 与 Apache Kafka 入门

您可以通过在项目根目录下运行以下命令,将 messaging-kafka 扩展添加到您的项目中

CLI
quarkus extension add messaging-kafka
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-kafka'
Gradle
./gradlew addExtension --extensions='messaging-kafka'

这会将以下内容添加到您的构建文件中

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

该扩展包含 kafka-clients 版本 3.2.1 作为传递依赖,并与 Kafka 代理版本 2.x 兼容。

3. 配置 SmallRye Kafka 连接器

由于 SmallRye Reactive Messaging 框架支持 Apache Kafka、AMQP、Apache Camel、JMS、MQTT 等不同的消息后端,因此它采用通用词汇。

  • 应用程序发送和接收消息。消息包含一个载荷,并且可以添加元数据。对于 Kafka 连接器,消息对应于 Kafka记录

  • 消息在通道上传输。应用程序组件连接到通道以发布和消耗消息。Kafka 连接器将通道映射到 Kafka主题

  • 通道使用连接器连接到消息后端。连接器配置为将传入消息映射到特定通道(由应用程序消耗),并收集发送到特定通道的传出消息。每个连接器都致力于一种特定的消息传递技术。例如,处理 Kafka 的连接器名为 smallrye-kafka

以下是带有传入通道的 Kafka 连接器的最小配置

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
1 为生产环境配置代理位置。您可以在全局配置或使用 mp.messaging.incoming.$channel.bootstrap.servers 属性按通道配置。在开发模式和测试运行时,Kafka 的开发服务会自动启动一个 Kafka 代理。如果未提供,此属性默认为 localhost:9092
2 将连接器配置为管理价格通道。默认情况下,主题名称与通道名称相同。您可以配置 topic 属性来覆盖它。
%prod 前缀表示该属性仅在应用程序以生产模式运行时使用(因此在开发或测试模式下不使用)。有关更多详细信息,请参阅配置文件文档
连接器自动附加

如果类路径中只有一个连接器,您可以省略 connector 属性配置。Quarkus 会自动将孤立通道关联到类路径上找到的(唯一)连接器。孤立通道是没有下游使用者或没有上游生产者的传入通道的传出通道。

可以使用以下方式禁用此自动附加功能

quarkus.messaging.auto-connector-attachment=false

4. 从 Kafka 接收消息

从前面的最小配置继续,您的 Quarkus 应用程序可以直接接收消息载荷

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

您的应用程序还可以通过其他几种方式消耗传入消息

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

Message 类型允许消费方法访问传入消息的元数据并手动处理确认。我们将在提交策略中探讨不同的确认策略。

如果您想直接访问 Kafka 记录对象,请使用

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord 由底层的 Kafka 客户端提供,可以直接注入到消费方法中。另一种更简单的方法是使用 Record

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record 是传入 Kafka 记录的键和载荷的简单包装器。

@Channel

或者,您的应用程序可以将 Multi 注入到您的 bean 中,并订阅其事件,如下面的示例所示

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

这是将 Kafka 消费者与另一个下游集成的好例子,在本例中将其公开为服务器发送事件 (Server-Sent Events) 端点。

使用 @Channel 消费消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST(以前称为 RESTEasy Reactive)端点为您处理了这些。

以下类型可以作为通道注入

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与之前的 Message 示例一样,如果注入的通道接收载荷 (Multi<T>),它会自动确认消息,并支持多个订阅者。如果您注入的通道接收 Message (Multi<Message<T>>),您将负责确认和广播。我们将在将消息广播到多个消费者中探讨发送广播消息。

注入 @Channel("prices") 或具有 @Incoming("prices") 不会自动配置应用程序从 Kafka 消费消息。您需要配置一个具有 mp.messaging.incoming.prices... 的入站连接器,或者在应用程序中的某个地方有一个 @Outgoing("prices") 方法(在这种情况下,prices 将是一个内存通道)。

4.1. 阻塞处理

Reactive Messaging 在 I/O 线程上调用您的方法。有关此主题的更多详细信息,请参阅Quarkus 反应式架构文档。但是,您通常需要将 Reactive Messaging 与阻塞处理(如数据库交互)结合使用。为此,您需要使用 @Blocking 注释,指示处理是阻塞的,并且不应在调用线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入的有效负载存储到数据库中

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

完整的示例可在 kafka-panache-quickstart 目录中找到。

有 2 个 @Blocking 注释

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,您可以同时使用它们。第一个提供了更细粒度的调优,例如要使用的工作池以及是否保持顺序。第二个也与 Quarkus 的其他反应式功能一起使用,使用默认工作池并保持顺序。

有关 @Blocking 注释用法的详细信息,请参阅SmallRye Reactive Messaging – 处理阻塞执行

@RunOnVirtualThread

有关在 Java 虚拟线程上运行阻塞处理的信息,请参见 Quarkus 虚拟线程支持与响应式消息传递文档

@Transactional

如果您的方法使用 @Transactional 注释,则即使该方法未使用 @Blocking 注释,也会自动将其视为阻塞方法。

4.2. 确认策略

消费者接收的所有消息都必须被确认。如果没有确认,则认为处理有错误。如果消费方法接收到 Record 或载荷,消息将在方法返回时被确认,也称为 Strategy.POST_PROCESSING。如果消费方法返回另一个反应式流或 CompletionStage,消息将在下游消息被确认时被确认。您可以覆盖默认行为,在到达时(Strategy.PRE_PROCESSING)确认消息,或者在消费方法中根本不确认消息(Strategy.NONE),如下面的示例所示。

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

如果消费方法接收到 Message,则确认策略为 Strategy.MANUAL,消费方法负责确认/拒绝消息。

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

如上所述,方法还可以将确认策略覆盖为 PRE_PROCESSINGNONE

4.3. 提交策略

当从 Kafka 记录生成的消息被确认时,连接器会调用提交策略。这些策略决定何时提交特定主题/分区的消费者偏移量。提交偏移量表示所有之前的记录都已处理。它也是应用程序在崩溃恢复或重新启动后重新开始处理的位置。

提交每个偏移量都会带来性能损失,因为 Kafka 偏移量管理可能很慢。但是,如果不经常提交偏移量,如果应用程序在两次提交之间崩溃,可能会导致消息重复。

Kafka 连接器支持三种策略:

  • throttled 跟踪接收的消息,并按顺序提交最新已确认消息的偏移量(意味着,所有之前的消息也已确认)。此策略保证至少一次传递,即使通道执行异步处理。连接器跟踪接收到的记录,并定期(由 auto.commit.interval.ms 指定的周期,默认为 5000 毫秒)提交最高连续偏移量。如果与记录关联的消息未在 throttled.unprocessed-record-max-age.ms(默认为 60000 毫秒)内得到确认,则连接器将被标记为不健康。事实上,此策略无法在单个记录处理失败后立即提交偏移量。如果 throttled.unprocessed-record-max-age.ms 设置为小于或等于 0,则它不执行任何健康检查。 such setting might lead to running out of memory if there are "poison pill" messages (that are never acked).此策略是默认策略,除非 enable.auto.commit 被显式设置为 true。

  • checkpoint 允许将消费者偏移量持久化到状态存储,而不是提交回 Kafka 代理。使用 CheckpointMetadata API,消费代码可以持久化处理状态与记录偏移量一起,以标记消费者的进度。当处理从先前持久化的偏移量继续时,它会将 Kafka 消费者定位到该偏移量,并恢复持久化的状态,从而从上次中断的地方继续有状态的处理。检查点策略在本地保存与最新偏移量关联的处理状态,并定期将其持久化到状态存储(由 auto.commit.interval.ms 指定的周期(默认为 5000))。如果未在 checkpoint.unsynced-state-max-age.ms(默认为 10000)内将任何处理状态持久化到状态存储,则连接器将被标记为不健康。如果 checkpoint.unsynced-state-max-age.ms 设置为小于或等于 0,则它不执行任何健康检查。有关更多信息,请参阅使用检查点进行有状态处理

  • latest 在消息被确认后立即提交 Kafka 消费者接收到的记录偏移量(如果偏移量高于先前提交的偏移量)。此策略提供至少一次传递,前提是通道在不执行任何异步处理的情况下处理消息。具体来说,即使旧消息尚未处理完成,也会始终提交最新已确认消息的偏移量。如果发生事故(如崩溃),处理将从上次提交后重新开始,导致旧消息永远无法成功且完全处理,这会表现为消息丢失。此策略不应在高负载环境中使用,因为偏移量提交成本很高。但是,它减少了重复的可能性。

  • ignore 不执行任何提交。当消费者显式配置为 enable.auto.commit 为 true 时,此策略是默认策略。它将偏移量提交委托给底层的 Kafka 客户端。当 enable.auto.committrue 时,此策略保证至少一次传递。SmallRye Reactive Messaging 异步处理记录,因此可能会提交已轮询但尚未处理的记录的偏移量。发生故障时,只会重新处理尚未提交的记录。

当未显式启用 Kafka 自动提交时,Kafka 连接器会禁用 Kafka 自动提交。此行为与传统的 Kafka 消费者不同。如果高吞吐量对您很重要,并且您不受下游的限制,我们建议要么

  • 使用 throttled 策略,

  • 或者将 enable.auto.commit 设置为 true 并使用 @Acknowledgment(Acknowledgment.Strategy.NONE) 注释消费方法。

SmallRye Reactive Messaging 支持实现自定义提交策略。有关更多信息,请参阅SmallRye Reactive Messaging 文档

4.4. 错误处理策略

如果从 Kafka 记录生成的消息被拒绝,则会应用失败策略。Kafka 连接器支持三种策略:

  • fail:使应用程序失败,不再处理记录(默认策略)。未正确处理的记录的偏移量不会被提交。

  • ignore:记录失败,但处理继续。未正确处理的记录的偏移量将被提交。

  • dead-letter-queue:未正确处理的记录的偏移量将被提交,但记录将被写入 Kafka 死信队列主题。

该策略使用 failure-strategy 属性选择。

对于 dead-letter-queue,您可以配置以下属性:

  • dead-letter-queue.topic:用于写入未正确处理的记录的主题,默认为 dead-letter-topic-$channel,其中 $channel 是通道的名称。

  • dead-letter-queue.key.serializer:用于将记录键写入死信队列的序列化器。默认情况下,它从反序列化器推断序列化器。

  • dead-letter-queue.value.serializer:用于将记录值写入死信队列的序列化器。默认情况下,它从反序列化器推断序列化器。

写入死信队列的记录包含关于原始记录的一组附加标头:

  • dead-letter-reason:失败的原因

  • dead-letter-cause:失败的原因(如果有)

  • dead-letter-topic:记录的原始主题

  • dead-letter-partition:记录的原始分区(整数映射到字符串)

  • dead-letter-offset:记录的原始偏移量(长整型映射到字符串)

SmallRye Reactive Messaging 支持实现自定义失败策略。有关更多信息,请参阅SmallRye Reactive Messaging 文档

4.4.1. 重试处理

您可以将 Reactive Messaging 与 SmallRye Fault Tolerance 结合使用,并在失败后重试处理。

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

您可以配置延迟、重试次数、抖动等。

如果您的方法返回 UniCompletionStage,您需要添加 @NonBlocking 注释。

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}
@NonBlocking 注释仅在 SmallRye Fault Tolerance 5.1.0 及更早版本中需要。从 SmallRye Fault Tolerance 5.2.0(自 Quarkus 2.1.0.Final 起可用)开始,它不再是必需的。有关更多信息,请参阅SmallRye Fault Tolerance 文档

入站消息仅在处理成功完成后才会被确认。因此,它会在成功处理后提交偏移量。如果处理仍然失败,即使在所有重试之后,消息也会被拒绝,并应用失败策略。

4.4.2. 处理反序列化失败

当发生反序列化失败时,您可以拦截它并提供失败策略。要实现这一点,您需要创建一个实现 DeserializationFailureHandler<T> 接口的 bean。

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

要使用此失败处理程序,必须使用 @Identifier 限定符暴露 bean,并且连接器配置必须指定 mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler 属性(用于键或值反序列化器)。

该处理程序会接收反序列化详细信息,包括表示为 Uni<T> 的操作。对于反序列化 Uni,可以实现重试、提供备用值或应用超时等失败策略。

如果您不配置反序列化失败处理程序并且发生反序列化失败,应用程序将被标记为不健康。您也可以忽略失败,这将记录异常并生成 null 值。要启用此行为,请将 mp.messaging.incoming.$channel.fail-on-deserialization-failure 属性设置为 false

如果 fail-on-deserialization-failure 属性设置为 false 并且 failure-strategy 属性为 dead-letter-queue,则失败的记录将被发送到相应的死信队列主题。

4.5. 消费者组

在 Kafka 中,消费者组是一组协调消费主题数据的消费者。主题被划分为一组分区。主题的分区在组内的消费者之间分配,从而有效地提高了消费吞吐量。请注意,每个分区只分配给组中的一个消费者。但是,如果分区的数量大于组中的消费者数量,一个消费者可以被分配多个分区。

让我们简要探讨不同的生产者/消费者模式以及如何使用 Quarkus 实现它们。

  1. 消费者组内的单个消费者线程

    这是订阅 Kafka 主题的应用程序的默认行为:每个 Kafka 连接器将创建一个单个消费者线程并将其置于单个消费者组中。消费者组 ID 默认为 quarkus.application.name 配置属性设置的应用程序名称。它也可以使用 kafka.group.id 属性设置。

    Architecture
  2. 消费者组内的多个消费者线程

    对于给定的应用程序实例,可以使用 mp.messaging.incoming.$channel.concurrency 属性配置消费者组内的消费者数量。订阅主题的分区将在消费者线程之间划分。请注意,如果 concurrency 值超过主题的分区数,某些消费者线程将不会被分配任何分区。

    Architecture
    弃用

    concurrency 属性提供了一种与连接器无关的方式来实现非阻塞并发通道,并取代了 Kafka 连接器特定的 partitions 属性。因此,partitions 属性已被弃用,将在未来的版本中删除。

  3. 消费者组内的多个消费者应用程序

    与前面的示例类似,应用程序的多个实例可以订阅单个消费者组,通过 mp.messaging.incoming.$channel.group.id 属性配置,或者保留为应用程序名称的默认值。这反过来会将主题的分区分配给应用程序实例。

    Architecture
  4. 发布/订阅:多个消费者组订阅主题

    最后,不同的应用程序可以使用不同的消费者组 ID 独立订阅相同的主题。例如,发布到名为orders的主题的消息可以被两个消费者应用程序独立消费,一个使用 mp.messaging.incoming.orders.group.id=invoicing,第二个使用 mp.messaging.incoming.orders.group.id=shipping。因此,不同的消费者组可以根据消息消费需求独立扩展。

    Architecture

一个常见的业务需求是按顺序消费和处理 Kafka 记录。Kafka 代理保留分区内的记录顺序,而不是主题内的记录顺序。因此,考虑记录如何在主题内分区很重要。默认分区器使用记录键的哈希值来计算记录的分区,或者在未定义键时,为每个批次或记录随机选择一个分区。

在正常操作期间,Kafka 消费者会保持其分配的每个分区内记录的顺序。SmallRye Reactive Messaging 会保留此顺序进行处理,除非使用了 @Blocking(ordered = false)(请参阅阻塞处理)。

请注意,由于消费者重新平衡,Kafka 消费者仅保证单个记录的至少一次处理,这意味着未提交的记录可能被消费者再次处理。

4.5.1. 消费者重平衡监听器

在消费者组内,随着新成员的加入和旧成员的离开,分区会被重新分配,以便每个成员获得与其成比例的份额。这称为组的重新平衡。要自行处理偏移量提交和分配的分区,您可以提供一个消费者重平衡监听器。要实现这一点,请实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 接口,并使用 @Idenfier 限定符将其暴露为 CDI bean。一个常见的用例是将偏移量存储在单独的数据存储中,以实现精确一次语义,或从特定偏移量开始处理。

每次消费者主题/分区分配更改时都会调用监听器。例如,应用程序启动时,它会调用 partitionsAssigned 回调,其中包含与消费者关联的主题/分区的初始集。如果之后此集发生更改,它会再次调用 partitionsRevokedpartitionsAssigned 回调,以便您可以实现自定义逻辑。

请注意,重平衡监听器方法是从 Kafka 轮询线程调用的,并且阻塞调用线程直到完成。这是因为重平衡协议具有同步屏障,而在重平衡监听器中使用异步代码可能会在同步屏障之后执行。

当主题/分区分配或撤销给消费者时,它会暂停消息传递,并在重平衡完成后恢复。

如果重平衡监听器代表用户处理偏移量提交(使用 NONE 提交策略),则重平衡监听器必须在 partitionsRevoked 回调中同步提交偏移量。我们也建议在应用程序停止时应用相同的逻辑。

与 Apache Kafka 的 ConsumerRebalanceListener 不同,io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 方法传递 Kafka Consumer 和主题/分区的集合。

在下面的示例中,我们设置了一个消费者,该消费者始终从最多 10 分钟前(或偏移量 0)的消息开始。首先,我们需要提供一个实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 并使用 io.smallrye.common.annotation.Identifier 注释的 bean。然后,我们必须配置我们的入站连接器以使用此 bean。

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

要配置入站连接器以使用提供的监听器,我们可以设置消费者重平衡监听器的标识符:mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

或者让监听器的名称与组 ID 相同:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

设置消费者重平衡监听器的名称优先于使用组 ID。

4.5.2. 使用唯一消费者组

如果您想处理主题中的所有记录(从头开始),您需要

  1. 设置 auto.offset.reset = earliest

  2. 将您的消费者分配给任何其他应用程序未使用过的消费者组。

Quarkus 会生成一个 UUID,该 UUID 在两次执行之间会发生变化(包括在开发模式下)。因此,您确信没有其他消费者使用它,并且每次应用程序启动时都会获得一个新的唯一组 ID。

您可以使用生成的 UUID 作为消费者组,如下所示:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}
如果未设置 group.id 属性,它将默认为 quarkus.application.name 配置属性。

4.5.3. 手动主题-分区分配

assign-seek 通道属性允许手动将主题-分区分配给 Kafka 入站通道,并可以选择性地定位到指定偏移量的分区以开始消费记录。如果使用 assign-seek,消费者将不会动态订阅主题,而是静态分配描述的分区。在手动主题-分区重新平衡时不会发生,因此永远不会调用重平衡监听器。

该属性接受一个由逗号分隔的三元组列表:<topic>:<partition>:<offset>

例如,配置:

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

将消费者分配给:

  • 主题 'topic1' 的分区 0,将初始位置设置为偏移量 10。

  • 主题 'topic2' 的分区 1,将初始位置设置为偏移量 20。

每个三元组中的主题、分区和偏移量可以有以下变化:

  • 如果省略主题,则使用配置的主题。

  • 如果省略偏移量,分区将被分配给消费者,但不会定位到偏移量。

  • 如果偏移量为 0,则定位到主题-分区的开头。

  • 如果偏移量为 -1,则定位到主题-分区的末尾。

4.6. 批量接收 Kafka 记录

默认情况下,入站方法单独接收每个 Kafka 记录。在底层,Kafka 消费者客户端会不断轮询代理,并以批次形式接收记录,这些记录包含在 ConsumerRecords 容器中。

批处理模式下,您的应用程序可以一次性接收消费者轮询返回的所有记录。

要实现这一点,您需要指定一个兼容的容器类型来接收所有数据。

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

入站方法还可以接收 Message<List<Payload>>Message<ConsumerRecords<Key, Payload>>ConsumerRecords<Key, Payload> 类型。它们可以访问记录的详细信息,如偏移量或时间戳。

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

请注意,成功处理入站记录批次将提交批次中接收到的每个分区的最新偏移量。配置的提交策略将仅适用于这些记录。

相反,如果处理引发异常,所有消息都将被拒绝,并对批次中的所有记录应用失败策略。

Quarkus 会自动检测入站通道的批次类型并自动设置批次配置。您可以使用 mp.messaging.incoming.$channel.batch 属性显式配置批处理模式。

4.7. 使用检查点进行有状态处理

checkpoint 提交策略是一项实验性功能,未来可能会发生变化。

SmallRye Reactive Messaging checkpoint 提交策略允许消费者应用程序以有状态的方式处理消息,同时尊重 Kafka 消费者的可伸缩性。具有 checkpoint 提交策略的入站通道将消费者偏移量持久化到外部状态存储,例如关系数据库或键值存储。在处理消费记录后,消费者应用程序可以为分配给 Kafka 消费者的每个主题-分区累积内部状态。此本地状态将定期持久化到状态存储,并与产生它的记录的偏移量相关联。

此策略不会将任何偏移量提交到 Kafka 代理,因此当新分区分配给消费者时(例如,消费者重启或消费者组实例扩展),消费者将从最新的检查点偏移量及其保存的状态恢复处理。

@Incoming 通道消费代码可以通过 CheckpointMetadata API 操作处理状态。例如,计算 Kafka 主题上接收的价格移动平均值的消费者如下所示:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class MeanCheckpointConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);

        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);

        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    }

    static class AveragePrice {
        long count;
        double mean;

        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;
        }
    }
}

transform 方法将转换函数应用于当前状态,生成已更改的状态并本地注册以进行检查点。默认情况下,本地状态会定期持久化到状态存储,周期由 auto.commit.interval.ms 指定(默认为 5000)。如果提供了 persistOnAck 标志,则最新状态会在消息确认时主动持久化到状态存储。setNext 方法类似地直接设置最新状态。

检查点提交策略跟踪每个主题-分区的处理状态最后一次持久化的时间。如果在 checkpoint.unsynced-state-max-age.ms(默认为 10000)内无法持久化挂起的更改,则通道将被标记为不健康。

4.7.1. 状态存储

状态存储实现决定了处理状态的持久化位置和方式。这由 mp.messaging.incoming.[channel-name].checkpoint.state-store 属性配置。状态对象的序列化取决于状态存储实现。为了指示状态存储进行序列化,可能需要使用 mp.messaging.incoming.[channel-name].checkpoint.state-type 属性配置状态对象的类名。

Quarkus 提供以下状态存储实现:

  • quarkus-redis:使用 quarkus-redis-client 扩展持久化处理状态。Jackson 用于将处理状态序列化为 Json。对于复杂对象,需要使用 checkpoint.state-type 属性配置对象的类名。默认情况下,状态存储使用默认的 redis 客户端,但如果使用命名客户端,可以使用 mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name 属性指定客户端名称。处理状态将使用键命名方案 [consumer-group-id]:[topic]:[partition] 存储在 Redis 中。

例如,前面代码的配置如下:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://:7000
quarkus.redis.my-redis.password=<redis-pwd>
  • quarkus-hibernate-reactive:使用 quarkus-hibernate-reactive 扩展持久化处理状态。处理状态对象必须是 Jakarta Persistence 实体并扩展 CheckpointEntity 类,该类处理由消费者组 ID、主题和分区组成的复合对象标识符。因此,需要使用 checkpoint.state-type 属性配置实体的类名。

例如,前面代码的配置如下:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

其中 AveragePriceEntity 是扩展了 CheckpointEntity 的 Jakarta Persistence 实体。

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;

    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;
    }
}
  • quarkus-hibernate-orm:使用 quarkus-hibernate-orm 扩展持久化处理状态。它与前面的状态存储类似,但它使用 Hibernate ORM 而不是 Hibernate Reactive。

配置后,它可以为检查点状态存储使用命名的 persistence-unit

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

有关如何实现自定义状态存储的说明,请参阅实现状态存储

5. 发送消息到 Kafka

Kafka 连接器出站通道的配置与入站通道类似。

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
1 为生产环境配置代理位置。您可以在全局配置或使用 mp.messaging.outgoing.$channel.bootstrap.servers 属性按通道配置。在开发模式和测试运行时,Kafka 的开发服务会自动启动一个 Kafka 代理。如果未提供,此属性默认为 localhost:9092
2 将连接器配置为管理 prices-out 通道。
3 默认情况下,主题名称与通道名称相同。您可以配置 topic 属性来覆盖它。

在应用程序配置中,通道名称是唯一的。因此,如果您想在同一个主题上配置入站和出站通道,您需要为通道命名不同的名称(如本指南示例中的 mp.messaging.incoming.pricesmp.messaging.outgoing.prices-out)。

然后,您的应用程序可以生成消息并将它们发布到 prices-out 通道。它可以使用 double 载荷,如下面的代码片段所示。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

您不应从代码中直接调用用 @Incoming 和/或 @Outgoing 注释的方法。它们由框架调用。由用户代码调用它们不会产生预期的结果。

请注意,generate 方法返回一个 Multi<Double>,它实现了 Reactive Streams Publisher 接口。此发布者将由框架用于生成消息并将其发送到配置的 Kafka 主题。

与其返回载荷,不如返回 io.smallrye.reactive.messaging.kafka.Record 以发送键/值对。

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

载荷可以封装在 org.eclipse.microprofile.reactive.messaging.Message 中,以便更好地控制写入的记录。

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata 允许设置 Kafka 记录的元数据属性,例如 keytopicpartitiontimestamp。一种用例是动态选择消息的目标主题。在这种情况下,您不需要在应用程序配置文件中配置主题,而是需要使用出站元数据来设置主题名称。

除了返回 Reactive Stream Publisher 的方法签名(MultiPublisher 的实现)之外,出站方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. 使用 Emitter 发送消息

有时,您需要一种命令式发送消息的方式。

例如,如果您需要在接收到 REST 端点中的 POST 请求时将消息发送到流。在这种情况下,您不能使用 @Outgoing,因为您的方法有参数。

为此,您可以使用 Emitter

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

发送载荷会返回一个 CompletionStage,在消息被确认时完成。如果消息传输失败,CompletionStage 将以 nack 的原因异常完成。

Emitter 的配置方式与 @Incoming@Outgoing 使用的其他流配置相同。

使用 Emitter,您可以从命令式代码将消息发送到反应式消息。这些消息存储在队列中,直到它们被发送。如果 Kafka 生产者客户端跟不上要发送到 Kafka 的消息,这个队列可能会占用大量内存,您甚至可能耗尽内存。您可以使用 @OnOverflow 配置背压策略。它允许您配置队列大小(默认为 256)以及缓冲区大小达到时应用的策略。可用策略包括 DROPLATESTFAILBUFFERUNBOUNDED_BUFFERNONE

通过 Emitter API,您还可以将出站载荷封装在 Message<T> 中。与之前的示例一样,Message 允许您以不同的方式处理 ack/nack 情况。

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

如果您更喜欢使用 Reactive Stream API,可以使用 MutinyEmitter,它将从 send 方法返回 Uni<Void>。因此,您可以使用 Mutiny API 来处理下游消息和错误。

import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

您也可以使用 sendAndAwait 方法阻塞发送事件到 emitter。它将等到事件被接收方确认或拒绝后才从方法返回。

弃用

io.smallrye.reactive.messaging.annotations.Emitterio.smallrye.reactive.messaging.annotations.Channelio.smallrye.reactive.messaging.annotations.OnOverflow 类现已弃用,并被以下类取代:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

新的 Emitter.send 方法返回一个 CompletionStage,在生成的消息被确认时完成。

弃用

MutinyEmitter#send(Message msg) 方法已弃用,建议使用以下接收 Message 进行发送的方法:

  • <M extends Message<? extends T>> Uni<Void> sendMessage(M msg)

  • <M extends Message<? extends T>> void sendMessageAndAwait(M msg)

  • <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg)

有关如何使用 Emitter 的更多信息,请参阅SmallRye Reactive Messaging – Emitters and Channels

5.2. 写入确认

当 Kafka 代理收到记录时,其确认可能需要一些时间,具体取决于配置。此外,它还会将无法写入的记录存储在内存中。

默认情况下,连接器会等待 Kafka 确认记录后才继续处理(确认接收到的 Message)。您可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此行为。

请注意,acks 属性对记录确认有巨大影响。

如果记录无法写入,则该消息将被拒绝。

5.3. 背压

Kafka 出站连接器处理背压,监控等待写入 Kafka 代理的已发送消息的数量。已发送消息的数量通过 max-inflight-messages 属性配置,默认为 1024。

连接器只并发发送该数量的消息。在至少一条已发送消息被代理确认之前,不会发送其他消息。然后,当代理的已发送消息之一被确认时,连接器将新消息写入 Kafka。请确保相应地配置 Kafka 的 batch.sizelinger.ms

您也可以通过将 max-inflight-messages 设置为 0 来删除已发送消息的限制。但是,请注意,如果请求数量达到 max.in.flight.requests.per.connection,Kafka 生产者可能会被阻塞。

5.4. 重试消息分派

当 Kafka 生产者从服务器接收到错误时,如果该错误是暂时的、可恢复的错误,客户端将重试发送消息批次。此行为受 retriesretry.backoff.ms 参数控制。此外,SmallRye Reactive Messaging 将根据 retriesdelivery.timeout.ms 参数重试可恢复错误下的单个消息。

请注意,虽然在可靠系统中进行重试是最佳实践,但 max.in.flight.requests.per.connection 参数默认为 5,这意味着消息顺序不保证。如果消息顺序对您的用例是必需的,将 max.in.flight.requests.per.connection 设置为 1 将确保一次只发送一个消息批次,但这会以牺牲生产者吞吐量为代价。

有关处理错误时应用重试机制,请参阅重试处理部分。

5.5. 处理序列化失败

对于 Kafka 生产者客户端,序列化失败是不可恢复的,因此消息分派不会重试。在这些情况下,您可能需要为序列化器应用失败策略。要实现这一点,您需要创建一个实现 SerializationFailureHandler<T> 接口的 bean。

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

要使用此失败处理程序,必须使用 @Identifier 限定符暴露 bean,并且连接器配置必须指定 mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler 属性(用于键或值序列化器)。

处理程序会接收序列化详细信息,包括表示为 Uni<byte[]> 的操作。请注意,方法必须等待结果并返回序列化后的字节数组。

5.6. 内存通道

在某些用例中,使用消息模式在同一应用程序内部传输消息非常方便。当您不将通道连接到 Kafka 等消息后端时,所有操作都在内存中进行,并通过链接方法创建流。每个链仍然是一个反应式流,并强制执行背压协议。

框架会验证生产者/消费者链是否完整,这意味着如果应用程序使用内存通道写入消息(使用仅具有 @Outgoing 的方法,或使用 Emitter),它还必须在应用程序内部从内存通道中消耗消息(使用仅具有 @Incoming 的方法或使用非托管流)。

5.7. 将消息广播到多个消费者

默认情况下,一个通道可以链接到一个使用者,使用 @Incoming 方法或 @Channel 反应式流。在应用程序启动时,通道会进行验证,以形成由单个使用者和生产者组成的消费者和生产者链。您可以通过在通道上设置 mp.messaging.$channel.broadcast=true 来覆盖此行为。

对于内存通道,可以在 @Outgoing 方法上使用 @Broadcast 注释。例如:

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

反之,可以通过设置 mp.messaging.incoming.$channel.merge=true 将同一通道上的多个生产者合并。在 @Incoming 方法上,您可以使用 @Merge 注释来控制多个通道的合并方式。

在出站或处理方法上重复使用 @Outgoing 注释是分派消息到多个出站通道的另一种方式。

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MultipleProducers {

    private final Random random = new Random();

    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();
    }

}

在前面的示例中,生成的价格将被广播到两个出站通道。下面的示例使用 Targeted 容器对象选择性地将消息发送到多个出站通道,其中键是通道名称,值是消息载荷。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.Targeted;

@ApplicationScoped
public class TargetedProducers {

    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        }
        return targeted;
    }

}

请注意,Kafka 序列化器的自动检测不适用于使用 Targeted 的签名。

有关使用多个出站通道的更多详细信息,请参阅SmallRye Reactive Messaging 文档

5.8. Kafka 事务

Kafka 事务支持对多个 Kafka 主题和分区进行原子写入。Kafka 连接器提供了 KafkaTransactions 自定义 emitter,用于在事务中写入 Kafka 记录。它可以作为常规 emitter @Channel 注入。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

提供给 withTransaction 方法的函数接收一个 TransactionalEmitter 用于生成记录,并返回一个提供事务结果的 Uni

  • 如果处理成功完成,则会刷新生产者并提交事务。

  • 如果处理引发异常、返回失败的 Uni 或将 TransactionalEmitter 标记为中止,则会中止事务。

Kafka 事务性生产者需要配置 acks=all 客户端属性,以及 transactional.id 的唯一 ID,这意味着 enable.idempotence=true。当 Quarkus 检测到出站通道使用了 KafkaTransactions 时,它会在通道上配置这些属性,为 transactional.id 属性提供默认值 "${quarkus.application.name}-${channelName}"

请注意,对于生产使用,transactional.id 在所有应用程序实例之间必须是唯一的。

虽然普通消息 emitter 支持并发调用 send 方法,并因此将出站消息排队写入 Kafka,但 KafkaTransactions emitter 一次只能支持一个事务。事务从调用 withTransaction 开始,直到返回的 Uni 成功或失败。在事务进行中,后续调用 withTransaction(包括在给定函数内的嵌套调用)将引发 IllegalStateException

请注意,在 Reactive Messaging 中,处理方法的执行已经是串行化的,除非使用了 @Blocking(ordered = false)。如果 withTransaction 可以并发调用,例如从 REST 端点调用,则建议限制执行的并发性。这可以通过 Microprofile Fault Tolerance 中的 @Bulkhead 注释来完成。

可以在将 Kafka 事务与 Hibernate Reactive 事务链接中找到使用示例。

5.8.1. 事务感知消费者

如果您只想消费在 Kafka 事务内写入和提交的记录,您需要将入站通道上的 isolation.level 属性配置为:

mp.messaging.incoming.prices-in.isolation.level=read_committed

6. Kafka 请求-回复

Kafka 请求-回复模式允许将请求记录发布到 Kafka 主题,然后等待响应初始请求的回复记录。Kafka 连接器提供了 KafkaRequestReply 自定义 emitter,它实现了 Kafka 出站通道的请求-回复模式的请求方(或客户端)。

它可以作为常规 emitter @Channel 注入。

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;

    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);
    }

}

请求方法将记录发布到出站通道配置的目标主题,并轮询回复主题(默认情况下,目标主题加上 -replies 后缀)以获取回复记录。收到回复后,返回的 Uni 将以记录值完成。请求发送操作会生成一个相关 ID 并设置一个标头(默认为 REPLY_CORRELATION_ID),它期望该标头在回复记录中发回。

回复方可以使用反应式消息处理器来实现(请参阅处理消息)。

有关 Kafka 请求回复功能和高级配置选项的更多信息,请参阅SmallRye Reactive Messaging 文档

7. 处理消息

流式数据应用程序通常需要从主题消费某些事件,处理它们,并将结果发布到另一个主题。处理器方法可以使用 @Incoming@Outgoing 注释简单地实现。

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

process 方法的参数是入站消息载荷,而返回值将用作出站消息载荷。先前提到的参数和返回类型的签名也受支持,例如 Message<T>Record<K, V> 等。

您可以通过消费和返回反应式流 Multi<T> 类型来应用异步流处理。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

7.1. 传播记录键

在处理消息时,您可以将入站记录键传播到出站记录。

通过 mp.messaging.outgoing.$channel.propagate-record-key=true 配置启用,记录键传播会使用与入站记录相同的生成出站记录。

如果出站记录已包含,则它不会被覆盖入站记录键。如果入站记录确实有一个null 键,则使用 mp.messaging.outgoing.$channel.key 属性。

7.2. 精确一次处理

Kafka 事务允许在事务中管理消费者偏移量以及生成的 P 消息。这使得在消费-转换-生产模式(也称为精确一次处理)中将消费者与事务性生产者耦合成为可能。

KafkaTransactions 自定义 emitter 提供了一种在事务中将精确一次处理应用于入站 Kafka 消息的方法。

以下示例在事务中包含了一批 Kafka 记录。

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 建议将精确一次处理与批处理模式一起使用。虽然可以与单个 Kafka 消息一起使用,但性能会受到很大影响。
2 消费的消息被传递到 KafkaTransactions#withTransactionAndAck 以处理偏移量提交和消息确认。
3 send 方法在事务中将记录写入 Kafka,而无需等待代理的发送回执。待写入 Kafka 的消息将被缓冲,并在提交事务之前刷新。因此,建议配置 @OnOverflow bufferSize 以适应足够的消息,例如 max.poll.records,即批次中返回的最大记录数。
  • 如果处理成功完成,在提交事务之前,给定批次消息的主题分区偏移量将被提交到事务中。

  • 如果处理需要中止,在中止事务后,消费者的位置将重置为最后提交的偏移量,从而有效地从该偏移量恢复消费。如果尚未向主题分区提交消费者偏移量,则消费者的位置将重置为主题分区的开头,即使偏移量重置策略为 latest

使用精确一次处理时,消费的消息偏移量提交由事务处理,因此应用程序不应通过其他方式提交偏移量。消费者应将 enable.auto.commit 设置为 false(默认值),并显式设置 commit-strategy=ignore

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

7.2.1. 精确一次处理的错误处理

KafkaTransactions#withTransaction 返回的 Uni 如果事务失败并被中止,将会产生一个失败。应用程序可以选择处理错误情况,但如果从 @Incoming 方法返回一个失败的 Uni,入站通道将有效地失败并停止反应式流。

KafkaTransactions#withTransactionAndAck 方法会确认和拒绝消息,但不会返回失败的 Uni。拒绝的消息将由入站通道的失败策略处理(请参阅错误处理策略)。将 failure-strategy=ignore 配置为简单地将 Kafka 消费者重置为最后提交的偏移量并从中恢复消费。

8. 直接访问 Kafka 客户端

在极少数情况下,您可能需要访问底层的 Kafka 客户端。KafkaClientService 提供对 ProducerConsumer 的线程安全访问。

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

KafkaClientService 是一个实验性 API,未来可能会发生变化。

您还可以获取注入到应用程序中的 Kafka 配置,并直接创建 Kafka 生产者、消费者和管理客户端。

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

default-kafka-broker 配置映射包含所有以 kafka.KAFKA_ 为前缀的应用程序属性。有关更多配置选项,请查看Kafka 配置解析

9. JSON 序列化

Quarkus 具有处理 JSON Kafka 消息的内置功能。

假设我们有一个 Fruit 数据类如下:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

我们想用它来从 Kafka 接收消息,进行一些价格转换,然后将消息发送回 Kafka。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

要做到这一点,我们需要设置带有 Jackson 或 JSON-B 的 JSON 序列化。

配置好 JSON 序列化后,您还可以使用 Publisher<Fruit>Emitter<Fruit>

9.1. 通过 Jackson 序列化

Quarkus 具有基于 Jackson 的 JSON 序列化和反序列化内置支持。它还将生成序列化器和反序列化器,因此您无需配置任何内容。当禁用生成时,您可以按如下方式使用提供的 ObjectMapperSerializerObjectMapperDeserializer

存在一个现有的 ObjectMapperSerializer,可用于通过 Jackson 序列化所有数据对象。如果您想使用序列化器/反序列化器自动检测,您可以创建一个空的子类。

默认情况下,ObjectMapperSerializer 将 null 序列化为 "null" 字符串,可以通过设置 Kafka 配置属性 json.serialize.null-as-null=true 来自定义,这将把 null 序列化为 null。这在使用压缩主题时非常有用,因为 null 用作墓碑,用于在压缩阶段知道要删除哪些消息。

相应的反序列化器类需要被子类化。因此,让我们创建一个扩展 ObjectMapperDeserializerFruitDeserializer

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 Jackson 序列化器和反序列化器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

现在,您的 Kafka 消息将包含 Fruit 数据对象的 Jackson 序列化表示。在这种情况下,不需要 deserializer 配置,因为序列化器/反序列化器自动检测默认已启用。

如果您想反序列化水果列表,您需要创建一个带有 Jackson TypeReference 的反序列化器来表示使用的泛型集合。

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

9.2. 通过 JSON-B 序列化

首先,您需要包含 quarkus-jsonb 扩展。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

存在一个现有的 JsonbSerializer,可用于通过 JSON-B 序列化所有数据对象。如果您想使用序列化器/反序列化器自动检测,您可以创建一个空的子类。

默认情况下,JsonbSerializer 将 null 序列化为 "null" 字符串,可以通过设置 Kafka 配置属性 json.serialize.null-as-null=true 来自定义,这将把 null 序列化为 null。这在使用压缩主题时非常有用,因为 null 用作墓碑,用于在压缩阶段知道要删除哪些消息。

相应的反序列化器类需要被子类化。因此,让我们创建一个扩展通用 JsonbDeserializerFruitDeserializer

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 JSON-B 序列化器和反序列化器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

现在,您的 Kafka 消息将包含 Fruit 数据对象的 JSON-B 序列化表示。

如果您想反序列化水果列表,您需要创建一个带有 Type 的反序列化器来表示使用的泛型集合。

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}
如果您不想为每个数据对象创建反序列化器,您可以使用通用的 io.vertx.kafka.client.serialization.JsonObjectDeserializer,它将反序列化为 io.vertx.core.json.JsonObject。也可以使用相应的序列化器:io.vertx.kafka.client.serialization.JsonObjectSerializer

10. Avro 序列化

这在专门的指南中有描述:将 Apache Kafka 与 Schema Registry 和 Avro 结合使用

11. JSON Schema 序列化

12. 序列化器/反序列化器自动检测

当使用 Quarkus Messaging 与 Kafka(io.quarkus:quarkus-messaging-kafka)时,Quarkus 通常可以自动检测正确的序列化器和反序列化器类。此自动检测基于 @Incoming@Outgoing 方法的声明,以及注入的 @Channel

例如,如果您声明:

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

并且您的配置表明 generated-price 通道使用 smallrye-kafka 连接器,那么 Quarkus 将自动将 value.serializer 设置为 Kafka 内置的 IntegerSerializer

同样,如果您声明:

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {
    ...
}

并且您的配置表明 my-kafka-records 通道使用 smallrye-kafka 连接器,那么 Quarkus 将自动将 key.deserializer 设置为 Kafka 内置的 LongDeserializer,并将 value.deserializer 设置为 ByteArrayDeserializer

最后,如果您声明:

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

并且您的配置表明 price-create 通道使用 smallrye-kafka 连接器,那么 Quarkus 将自动将 value.serializer 设置为 Kafka 内置的 DoubleSerializer

序列化器/反序列化器自动检测支持的完整类型集是:

  • shortjava.lang.Short

  • intjava.lang.Integer

  • longjava.lang.Long

  • floatjava.lang.Float

  • doublejava.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • 存在 org.apache.kafka.common.serialization.Serializer<T> / org.apache.kafka.common.serialization.Deserializer<T> 直接实现的类。

    • 实现需要指定类型参数 T 作为(反)序列化类型。

  • 从 Avro Schema 生成的类,以及 Avro GenericRecord,如果存在 Confluent 或 Apicurio Registry serde

  • 存在 ObjectMapperSerializer / ObjectMapperDeserializer 的子类的类,如通过 Jackson 序列化中所述。

    • 技术上不需要子类化 ObjectMapperSerializer,但在这种情况下,无法自动检测。

  • 存在 JsonbSerializer / JsonbDeserializer 的子类的类,如通过 JSON-B 序列化中所述。

    • 技术上不需要子类化 JsonbSerializer,但在这种情况下,无法自动检测。

如果通过配置设置了序列化器/反序列化器,则自动检测不会替换它。

如果您在序列化器自动检测方面遇到任何问题,可以通过设置 quarkus.messaging.kafka.serializer-autodetection.enabled=false 来完全关闭它。如果您发现需要这样做,请在 Quarkus 问题跟踪器 中提交一个 bug,以便我们能解决您遇到的任何问题。

13. JSON 序列化器/反序列化器生成

Quarkus 会自动为以下情况生成序列化器和反序列化器:

  1. 未配置序列化器/反序列化器

  2. 自动检测未找到匹配的序列化器/反序列化器

它底层使用 Jackson。

可以使用以下方式禁用此生成:

quarkus.messaging.kafka.serializer-generation.enabled=false
生成不支持集合,例如 List<Fruit>。请参阅 通过 Jackson 进行序列化 来为这种情况编写您自己的序列化器/反序列化器。

14. 使用 Schema Registry

这在 Avro 的专用指南中进行了介绍:将 Apache Kafka 与 Schema Registry 和 Avro 结合使用。以及 JSON Schema 的另一份指南:将 Apache Kafka 与 Schema Registry 和 JSON Schema 结合使用

15. 健康检查

Quarkus 为 Kafka 提供了一些健康检查。这些检查与 quarkus-smallrye-health 扩展结合使用。

15.1. Kafka Broker 就绪检查

使用 quarkus-kafka-client 扩展时,您可以通过在 application.properties 中将 quarkus.kafka.health.enabled 属性设置为 true 来启用*就绪*健康检查。此检查报告与*默认* Kafka Broker(使用 kafka.bootstrap.servers 配置)的交互状态。它需要与 Kafka Broker 的*管理连接*,并且默认情况下处于禁用状态。如果启用,当您访问应用程序的 /q/health/ready 端点时,您将获得关于连接验证状态的信息。

15.2. Kafka Reactive Messaging 健康检查

使用 Reactive Messaging 和 Kafka 连接器时,每个配置的通道(入站或出站)都提供*启动*、*活跃性*和*就绪*检查。

  • 启动检查会验证与 Kafka 集群的通信是否已建立。

  • 活跃性检查会捕获在与 Kafka 通信期间发生的任何不可恢复的故障。

  • 就绪检查会验证 Kafka 连接器是否已准备好向配置的 Kafka 主题进行消息的消费/生产。

对于每个通道,您可以使用以下方式禁用检查:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
您可以使用 mp.messaging.incoming|outgoing.$channel.bootstrap.servers 属性为每个通道配置 bootstrap.servers。默认值为 kafka.bootstrap.servers

Reactive Messaging 的*启动*和*就绪*检查提供了两种策略。默认策略会验证与 Broker 是否已建立活动连接。这种方法不具侵扰性,因为它基于内置的 Kafka 客户端指标。

通过使用 health-topic-verification-enabled=true 属性,*启动*探测器会使用*管理客户端*检查主题列表。而*就绪*探测器对于入站通道会检查是否至少分配了一个分区用于消费,对于出站通道会检查生产者使用的主题是否存在于 Broker 中。

请注意,要实现这一点,需要*管理连接*。您可以使用 health-topic-verification-timeout 配置来调整与 Broker 的主题验证调用的超时时间。

16. 可观察性

如果存在 OpenTelemetry 扩展,则 Kafka 连接器通道可以开箱即用地与 OpenTelemetry Tracing 集成。写入 Kafka 主题的消息会传播当前的跟踪 span。对于入站通道,如果消费的 Kafka 记录包含跟踪信息,则消息处理会继承消息 span 作为父级。

可以为每个通道显式禁用跟踪:

mp.messaging.incoming.data.tracing-enabled=false

如果存在 Micrometer 扩展,则 Kafka 生产者和消费者客户端的指标会作为 Micrometer 指标暴露。

16.1. 通道指标

也可以收集每个通道的指标并将其作为 Micrometer 指标暴露。可以收集以下指标,每个通道由*通道*标签标识:

  • quarkus.messaging.message.count:生产或接收的消息数量

  • quarkus.messaging.message.acks:成功处理的消息数量

  • quarkus.messaging.message.failures:处理失败的消息数量

  • quarkus.messaging.message.duration:消息处理的持续时间。

出于向后兼容的原因,通道指标默认不启用,可以通过以下方式启用:

消息*观察*依赖于拦截消息,因此不支持消费具有自定义消息类型(如 IncomingKafkaRecordKafkaRecordIncomingKafkaRecordBatchKafkaRecordBatch)的通道。

消息拦截和观察仍然支持消费通用 Message 类型或通过 转换器启用的自定义有效载荷的通道。

smallrye.messaging.observation.enabled=true

17. Kafka Streams

这在专用指南中进行了介绍:将 Apache Kafka Streams 结合使用

18. 使用 Snappy 进行消息压缩

在*出站*通道上,可以通过将 compression.type 属性设置为 snappy 来启用 Snappy 压缩。

mp.messaging.outgoing.fruit-out.compression.type=snappy

在 JVM 模式下,它可以开箱即用。但是,要将应用程序编译为本地可执行文件,您需要在 application.properties 中添加 quarkus.kafka.snappy.enabled=true

在原生模式下,Snappy 默认是禁用的,因为使用 Snappy 需要嵌入本地库并在应用程序启动时解包。

19. 使用 OAuth 进行身份验证

如果您的 Kafka Broker 使用 OAuth 作为身份验证机制,您需要配置 Kafka 消费者来启用此身份验证过程。首先,将以下依赖项添加到您的应用程序中:

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
<!-- if compiling to native you'd need also the following dependency -->
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-common</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

此依赖项提供了处理 OAuth 工作流程所需的 callback handler。然后,在 application.properties 中添加:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

更新 oauth.client.idoauth.client.secretoauth.token.endpoint.uri 的值。

OAuth 身份验证支持 JVM 和原生模式。由于原生模式下 SSL 默认不启用,因此必须添加 quarkus.ssl.native=true 来支持 JaasClientOauthLoginCallbackHandler,因为它使用了 SSL。(有关更多详细信息,请参阅 在原生可执行文件中使用 SSL 指南。)

20. TLS 配置

Kafka 客户端扩展与 Quarkus TLS 注册表 集成以配置客户端。

要为默认 Kafka 配置 TLS,您需要在 application.properties 中提供一个命名的 TLS 配置:

quarkus.tls.your-tls-config.trust-store.pem.certs=target/certs/kafka.crt,target/certs/kafka-ca.crt
# ...
kafka.tls-configuration-name=your-tls-config
# enable ssl security protocol
kafka.security.protocol=ssl

这将为 Kafka 客户端提供一个 ssl.engine.factory.class 实现。

还要确保使用 security.protocol 属性启用 SSL 通道安全协议,并将其配置为 SSLSASL_SSL

Quarkus Messaging 通道可以单独配置以使用特定的 TLS 配置:

mp.messaging.incoming.your-channel.tls-configuration-name=your-tls-config
mp.messaging.incoming.your-channel.security.protocol=ssl

21. 测试 Kafka 应用程序

21.1. 无 Broker 测试

在不启动 Kafka Broker 的情况下测试应用程序可能很有用。要实现这一点,您可以将 Kafka 连接器管理的通道*切换*为*内存*模式。

此方法仅适用于 JVM 测试。不能用于原生测试(因为它们不支持注入)。

假设我们要测试以下处理器应用程序:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

首先,将以下测试依赖项添加到您的应用程序中:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

然后,创建如下的 Quarkus 测试资源:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 将入站通道 orders(预期来自 Kafka 的消息)切换为内存模式。
2 将出站通道 beverages(向 Kafka 写入消息)切换为内存模式。
3 构建并返回一个 Map,其中包含配置应用程序使用内存通道所需的所有属性。
4 测试停止时,清除 InMemoryConnector(丢弃所有接收和发送的消息)。

使用上面创建的测试资源创建 Quarkus 测试:

import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 将内存连接器注入您的测试类:
2 检索入站通道(orders)- 该通道必须已在测试资源中切换为内存模式。
3 检索出站通道(beverages)- 该通道必须已在测试资源中切换为内存模式。
4 使用 send 方法向 orders 通道发送消息。应用程序将处理此消息并将消息发送到 beverages 通道。
5 使用 received 方法在 beverages 通道上检查应用程序产生的消息。

如果您的 Kafka 消费者是基于批处理的,您需要发送一批消息到通道,方法是手动创建它们。

例如

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
    }
}
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")

    InMemoryConnector connector;

    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
            }
        };
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
            }
        };

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)

        ordersIn.send(batch);

        await().until(committed::get);  (4)
    }
}
1 创建一个 AtomicBoolean 来跟踪批次是否已提交。
2 当批次提交时更新 committed
3 创建一个包含单个记录的 IncomingKafkaRecordBatch
4 等待批次提交。

通过内存通道,我们能够在不启动 Kafka Broker 的情况下测试处理消息的应用程序代码。请注意,不同的内存通道是独立的,将通道连接器切换到内存模式并不会模拟配置为同一 Kafka 主题的通道之间的消息传递。

21.1.1. 使用 InMemoryConnector 进行上下文传播

默认情况下,内存通道在调用者线程(在单元测试中是主线程)上分发消息。

quarkus-test-vertx 依赖项提供了 @io.quarkus.test.vertx.RunOnVertxContext 注释,当在测试方法上使用时,它会在 Vert.x 上下文中执行测试。

然而,大多数其他连接器都会处理上下文传播,在单独的 Vert.x 上下文副本上分发消息。

如果您的测试依赖于上下文传播,您可以通过使用 run-on-vertx-context 属性配置内存连接器通道,在 Vert.x 上下文中分发事件,包括消息和确认。或者,您可以使用 InMemorySource#runOnVertxContext 方法切换此行为。

21.2. 使用 Kafka Broker 进行测试

如果您正在使用 Kafka 开发服务,那么在测试期间 Kafka Broker 会启动并可用,除非它在 %test 配置文件中被禁用。虽然可以通过 Kafka Clients API 连接到此 Broker,但 Kafka Companion Library 提供了一种更简单的方式来与 Kafka Broker 交互,并在测试中创建消费者、生产者和管理操作。

要在测试中使用 KafkaCompanion API,请首先添加以下依赖项:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

它提供了 io.quarkus.test.kafka.KafkaCompanionResource - io.quarkus.test.common.QuarkusTestResourceLifecycleManager 的一个实现。

然后使用 @QuarkusTestResource 在测试中配置 Kafka Companion,例如:

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion 注入 KafkaCompanion 实例,该实例已配置为访问为测试创建的 Kafka Broker。
2 使用 KafkaCompanion 创建生产者任务,将 10 条记录写入 'orders' 主题。
3 创建消费者任务,订阅 'orders-processed' 主题并消费 10 条记录。
4 等待消费者任务完成。

您需要配置:

mp.messaging.incoming.orders.connector=smallrye-kafka
mp.messaging.incoming.orders.auto.offset.reset=earliest

否则,您将在 <4> 中收到 java.lang.AssertionError: No completion (or failure) event received in the last 10000 ms 错误。

如果 Kafka 开发服务在测试期间可用,KafkaCompanionResource 将使用创建的 Kafka Broker,否则它将使用 Strimzi Test Container 创建一个 Kafka Broker。

可以使用 @ResourceArg 自定义创建的 Kafka Broker 的配置,例如:

@QuarkusTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

21.2.1. 自定义测试资源

或者,您可以在测试资源中启动一个 Kafka Broker。以下代码片段展示了一个使用 Testcontainers 启动 Kafka Broker 的测试资源:

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 配置 Kafka 的 bootstrap 位置,以便应用程序连接到此 Broker。

22. Kafka 开发服务

如果存在任何 Kafka 相关扩展(例如 quarkus-messaging-kafka),Kafka 开发服务会在开发模式和测试运行时自动启动一个 Kafka Broker。因此,您无需手动启动 Broker。应用程序会自动配置。

由于启动 Kafka Broker 可能需要较长时间,Kafka 开发服务使用 Redpanda,一个与 Kafka 兼容的 Broker,它可以在约 1 秒内启动。

22.1. 启用/禁用 Kafka 开发服务

Kafka 开发服务自动启用,除非:

  • quarkus.kafka.devservices.enabled 设置为 false

  • 已配置 kafka.bootstrap.servers

  • 所有 Reactive Messaging Kafka 通道都已配置了 bootstrap.servers 属性

Kafka 开发服务依赖 Docker 来启动 Broker。如果您的环境不支持 Docker,您需要手动启动 Broker,或连接到已运行的 Broker。您可以使用 kafka.bootstrap.servers 配置 Broker 地址。

22.2. 共享 Broker

大多数情况下,您需要共享 Broker。Kafka 开发服务实现了*服务发现*机制,供多个 Quarkus 应用程序在*开发*模式下共享单个 Broker。

Kafka 开发服务使用 quarkus-dev-service-kafka 标签启动容器,该标签用于标识容器。

如果您需要多个(共享)Broker,您可以配置 quarkus.kafka.devservices.service-name 属性并指定 Broker 名称。它会查找具有相同值的容器,如果找不到则启动一个新的。默认服务名称是 kafka

共享在开发模式下默认启用,但在测试模式下禁用。您可以使用 quarkus.kafka.devservices.shared=false 来禁用共享。

22.3. 设置端口

默认情况下,Kafka 开发服务会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.kafka.devservices.port 属性来设置端口。

请注意,Kafka 的广告地址会自动配置为所选端口。

22.4. 配置镜像

Kafka 开发服务支持 Redpandakafka-nativeStrimzi(在 Kraft 模式下)镜像。

Redpanda 是一个与 Kafka 兼容的事件流平台。由于它提供快速的启动时间,开发服务默认使用 redpandadata/redpanda 的 Redpanda 镜像。您可以从 https://hub.docker.com/r/redpandadata/redpanda 中选择任何版本。

kafka-native 提供标准 Apache Kafka 发行版的镜像,使用 Quarkus 和 GraalVM 编译为原生二进制文件。尽管仍处于*实验*阶段,但它提供了非常快的启动时间和较小的占用空间。

镜像类型可以通过以下方式配置:

quarkus.kafka.devservices.provider=kafka-native

Strimzi 为在 Kubernetes 上运行 Apache Kafka 提供容器镜像和 Operator。虽然 Strimzi 针对 Kubernetes 进行了优化,但这些镜像在经典的容器环境中也能完美运行。Strimzi 容器镜像在 JVM 上运行*真正的* Kafka Broker,启动速度较慢。

quarkus.kafka.devservices.provider=strimzi

对于 Strimzi,您可以从 https://quay.io/repository/strimzi-test-container/test-container?tab=tags 中选择支持 Kraft 的任何 Kafka 版本镜像(2.8.1 及更高版本)。

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

22.5. 配置 Kafka 主题

您可以配置 Kafka 开发服务在 Broker 启动后创建主题。主题将以给定的分区数和 1 个副本创建。

以下示例创建一个名为 test 的主题,包含 3 个分区,以及第二个名为 messages 的主题,包含 2 个分区。

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

如果具有给定名称的主题已存在,则会跳过创建,而不会尝试将现有主题重新分区为不同数量的分区。

您可以使用 quarkus.kafka.devservices.topic-partitions-timeout 配置 Kafka 管理客户端调用主题创建的超时时间,默认为 2 秒。

22.6. 事务性和幂等生产者支持

默认情况下,Redpanda Broker 配置为启用事务和幂等性功能。您可以使用以下方式禁用它们:

quarkus.kafka.devservices.redpanda.transaction-enabled=false
Redpanda 事务不支持精确一次处理。

22.7. Compose

Kafka 开发服务支持 Compose 开发服务。它依赖于 compose-devservices.yml,例如:

name: <application name>
services:
  kafka:
    image: apache/kafka-native:3.9.0
    restart: "no"
    ports:
      - '9092'
    labels:
      io.quarkus.devservices.compose.exposed_ports: /etc/kafka/docker/ports
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    command: "/kafka.sh"
    volumes:
      - './kafka.sh:/kafka.sh'

为了让 Broker 向客户端通告其外部可访问地址,它需要一个额外的文件 kafka.sh,如 将端口映射暴露给正在运行的容器 中所述。

22.8. 配置参考

构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖

配置属性

类型

默认

如果 Kafka 开发服务已被明确启用或禁用。开发服务通常默认启用,除非存在现有配置。对于 Kafka,开发服务会启动一个 Broker,除非设置了 kafka.bootstrap.servers,或者所有 Reactive Messaging Kafka 通道都配置了 bootstrap.servers

环境变量:QUARKUS_KAFKA_DEVSERVICES_ENABLED

显示更多

布尔值

开发服务将侦听的可选固定端口。

如果未定义,将随机选择端口。

环境变量:QUARKUS_KAFKA_DEVSERVICES_PORT

显示更多

整数

Kafka 开发服务容器类型。

支持 Redpanda、Strimzi 和 kafka-native 容器提供程序。默认为 redpanda。

请注意,Strimzi 和 Kafka Native 镜像以 Kraft 模式启动。

环境变量:QUARKUS_KAFKA_DEVSERVICES_PROVIDER

显示更多

redpandastrimzikafka-native

redpanda

要使用的 Kafka 容器镜像。

取决于提供程序。

环境变量:QUARKUS_KAFKA_DEVSERVICES_IMAGE_NAME

显示更多

字符串

指示 Quarkus 开发服务管理的 Kafka Broker 是否共享。当共享时,Quarkus 会使用基于标签的服务发现来查找正在运行的容器。如果找到匹配的容器,则会使用它,因此不会启动第二个容器。否则,开发服务会启动一个新容器。

发现使用 quarkus-dev-service-kafka 标签。该值使用 service-name 属性配置。

容器共享仅在开发模式下使用。

环境变量:QUARKUS_KAFKA_DEVSERVICES_SHARED

显示更多

布尔值

true

附加到已启动容器的 quarkus-dev-service-kafka 标签的值。当 shared 设置为 true 时,此属性用于此目的。在这种情况下,在启动容器之前,Kafka 开发服务会查找具有设置为指定值的 quarkus-dev-service-kafka 标签的容器。如果找到,它将使用该容器而不是启动一个新容器。否则,它会启动一个新容器,并将 quarkus-dev-service-kafka 标签设置为指定的值。

当您需要多个共享 Kafka 代理时,将使用此属性。

环境变量:QUARKUS_KAFKA_DEVSERVICES_SERVICE_NAME

显示更多

字符串

kafka

在开发服务 Kafka Broker 中创建的主题-分区对。Broker 启动后,将创建给定主题和分区,跳过已存在的主题。例如,quarkus.kafka.devservices.topic-partitions.test=2 将创建一个名为 test 的主题,包含 2 个分区。

主题创建不会尝试重新分区具有不同分区数的现有主题。

环境变量:QUARKUS_KAFKA_DEVSERVICES_TOPIC_PARTITIONS__TOPIC_NAME_

显示更多

Map<String,Integer>

用于主题创建的管理客户端调用的超时。

默认为 2 秒。

环境变量:QUARKUS_KAFKA_DEVSERVICES_TOPIC_PARTITIONS_TIMEOUT

显示更多

Duration 

2S

传递给容器的环境变量。

环境变量:QUARKUS_KAFKA_DEVSERVICES_CONTAINER_ENV__ENVIRONMENT_VARIABLE_NAME_

显示更多

Map<String,String>

启用事务支持。同时也启用生产者幂等性。在 https://vectorized.io/blog/fast-transactions/ 上查找有关 Redpanda 事务支持的更多信息。请注意,KIP-447(生产者可扩展性以实现精确一次语义)KIP-360(提高幂等/事务性生产者的可靠性)*不支持*。

环境变量:QUARKUS_KAFKA_DEVSERVICES_REDPANDA_TRANSACTION_ENABLED

显示更多

布尔值

true

用于访问 Redpanda HTTP 代理 (pandaproxy) 的端口。

如果未定义,将随机选择端口。

环境变量:QUARKUS_KAFKA_DEVSERVICES_REDPANDA_PROXY_PORT

显示更多

整数

关于 Duration 格式

要写入持续时间值,请使用标准的 java.time.Duration 格式。有关更多信息,请参阅 Duration#parse() Java API 文档

您还可以使用简化的格式,以数字开头

  • 如果该值仅为一个数字,则表示以秒为单位的时间。

  • 如果该值是一个数字后跟 ms,则表示以毫秒为单位的时间。

在其他情况下,简化格式将被转换为 java.time.Duration 格式以进行解析

  • 如果该值是一个数字后跟 hms,则在其前面加上 PT

  • 如果该值是一个数字后跟 d,则在其前面加上 P

23. Kafka 开发 UI

如果存在任何 Kafka 相关扩展(例如 quarkus-messaging-kafka),Quarkus 开发 UI 将会扩展一个 Kafka Broker 管理 UI。它会自动连接到为应用程序配置的 Kafka Broker。

Kafka Dev UI link

使用 **Kafka 开发 UI**,您可以直接管理您的 Kafka 集群并执行任务,例如:

  • 列出和创建主题

  • 可视化记录

  • 发布新记录

  • 检查消费者组列表及其消费滞后情况

Kafka Dev UI records
Kafka 开发 UI 是 Quarkus 开发 UI 的一部分,仅在开发模式下可用。

24. Kubernetes 服务绑定

Quarkus Kafka 扩展支持 Kubernetes 服务绑定规范。您可以通过将 quarkus-kubernetes-service-binding 扩展添加到您的应用程序来启用此功能。

在适当配置的 Kubernetes 集群中运行时,Kafka 扩展将从集群内的可用服务绑定中拉取其 Kafka Broker 连接配置,而无需用户进行配置。

25. 执行模型

Reactive Messaging 在 I/O 线程上调用用户的方法。因此,默认情况下,方法不能阻塞。如 阻塞处理 中所述,如果该方法会阻塞调用线程,您需要为该方法添加 @Blocking 注释。

有关此主题的更多详细信息,请参阅 Quarkus Reactive Architecture 文档

26. 通道装饰器

SmallRye Reactive Messaging 支持装饰入站和出站通道,用于实现跨领域关注点,如监控、跟踪或消息拦截。有关实现装饰器和消息拦截器的更多信息,请参阅 SmallRye Reactive Messaging 文档

27. 配置参考

有关 SmallRye Reactive Messaging 配置的更多详细信息,请参阅 SmallRye Reactive Messaging - Kafka 连接器文档

每个通道都可以通过配置禁用,使用:

mp.messaging.[incoming|outgoing].[channel].enabled=false

最重要的属性列在下表中:

27.1. 入站通道配置(轮询 Kafka)

以下属性使用以下方式配置:

mp.messaging.incoming.your-channel-name.attribute=value

某些属性具有可以全局配置的别名。

kafka.bootstrap.servers=...

您还可以传递底层 Kafka 消费者 支持的任何属性。

例如,要配置 max.poll.records 属性,请使用:

mp.messaging.incoming.[channel].max.poll.records=1000

一些消费者客户端属性已配置为合理的默认值:

如果未设置,reconnect.backoff.max.ms 设置为 10000 以避免断开连接时的高负载。

如果未设置,key.deserializer 设置为 org.apache.kafka.common.serialization.StringDeserializer

消费者 client.id 根据要创建的客户端数量使用 mp.messaging.incoming.[channel].partitions 属性进行配置。

  • 如果提供了 client.id,则按原样使用,或者根据 partitions 属性添加客户端索引后缀。

  • 如果未提供 client.id,则生成为 [client-id-prefix][channel-name][-index]

表 1. 'smallrye-kafka' 连接器的入站属性
属性 (别名) 描述 强制 默认

bootstrap.servers

(kafka.bootstrap.servers)

用于建立与 Kafka 集群的初始连接的主机:端口的逗号分隔列表。

类型:字符串

false

localhost:9092

topic

消耗/填充的 Kafka 主题。如果未设置此属性或 topics 属性,则使用通道名称。

类型:字符串

false

health-enabled

是否启用(默认)或禁用健康报告。

类型:布尔值

false

true

health-readiness-enabled

是否启用(默认)或禁用就绪健康报告。

类型:布尔值

false

true

health-readiness-topic-verification

*已弃用* - 就绪检查是否应验证主题是否存在于 Broker 中。默认为 false。启用它需要管理连接。已弃用:请改用 'health-topic-verification-enabled'。

类型:布尔值

false

health-readiness-timeout

*已弃用* - 在就绪健康检查期间,连接器连接到 Broker 并检索主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超过该时间,则通道被视为未就绪。已弃用:请改用 'health-topic-verification-timeout'。

类型:长整型

false

health-topic-verification-enabled

启动和就绪检查是否应验证主题是否存在于 Broker 中。默认为 false。启用它需要管理客户端连接。

类型:布尔值

false

false

health-topic-verification-timeout

在启动和就绪健康检查期间,连接器连接到 Broker 并检索主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超过该时间,则通道被视为未就绪。

类型:长整型

false

2000

已启用跟踪

是否启用(默认)或禁用跟踪

类型:布尔值

false

true

client-id-prefix

Kafka 客户端 client.id 属性的前缀。如果配置或生成的 client.id 被定义,它将以给定值作为前缀,否则 kafka-consumer- 是前缀。

类型:字符串

false

checkpoint.state-store

在使用 checkpoint 提交策略时,在实现 io.smallrye.reactive.messaging.kafka.StateStore.Factory 的 Bean 中设置的名称,用于指定状态存储实现。

类型:字符串

false

checkpoint.state-type

在使用 checkpoint 提交策略时,要持久化到状态存储中的状态对象的完全限定类型名称。如果提供,状态存储实现可以使用它来帮助持久化处理状态对象。

类型:字符串

false

checkpoint.unsynced-state-max-age.ms

在使用 checkpoint 提交策略时,指定处理状态在连接器被标记为不健康之前必须持久化的最大年龄(以毫秒为单位)。将此属性设置为 0 会禁用此监控。

类型:整数

false

10000

cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在*入站*通道上启用,连接器会分析入站记录并尝试创建 Cloud Event 元数据。如果在*出站*通道上启用,如果消息包含 Cloud Event 元数据,连接器会将出站消息作为 Cloud Event 发送。

类型:布尔值

false

true

kafka-configuration

为该通道提供默认 Kafka 消费者/生产者配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

类型:字符串

false

topics

要消耗的主题的逗号分隔列表。不能与 topicpattern 属性一起使用。

类型:字符串

false

pattern

指示 topic 属性是正则表达式。必须与 topic 属性一起使用。不能与 topics 属性一起使用。

类型:布尔值

false

false

key.deserializer

用于反序列化记录键的反序列化器类名。

类型:字符串

false

org.apache.kafka.common.serialization.StringDeserializer

lazy-client

Kafka 客户端是延迟创建还是立即创建。

类型:布尔值

false

false

value.deserializer

用于反序列化记录值的反序列化器类名。

类型:字符串

true

fetch.min.bytes

服务器应为 fetch 请求返回的最小数据量。默认设置 1 字节意味着 fetch 请求在收到一个字节数据或 fetch 请求等待数据到达超时后即可得到响应。

类型:整数

false

1

group.id

一个唯一字符串,用于标识应用程序所属的消费者组。

如果未设置,则默认为由 quarkus.application.name 配置属性设置的应用程序名称。

如果该属性也未设置,则使用一个唯一生成的 ID。

建议始终定义 group.id,自动生成仅用于开发的便利功能。您可以通过将此属性设置为 ${quarkus.uuid} 来显式请求自动生成的唯一 ID。

类型:字符串

false

enable.auto.commit

如果启用,则基础 Kafka 客户端会在后台定期提交消费者的 offset,而忽略记录的实际处理结果。建议*不要*启用此设置,让 Reactive Messaging 处理提交。

类型:布尔值

false

false

retry

是否在发生故障时重新尝试连接到 Broker。

类型:布尔值

false

true

retry-attempts

重新连接前的最大次数。-1 表示无限重试。

类型:整数

false

-1

retry-max-wait

两次重连之间的最大延迟(以秒为单位)。

类型:整数

false

30

广播

Kafka 记录是否应分发给多个消费者。

类型:布尔值

false

false

auto.offset.reset

在 Kafka 中没有初始 offset 时该做什么。可接受的值是 earliest、latest 和 none。

类型:字符串

false

latest

失败策略

指定当从记录生成的已生成消息被否定确认(nack)时要应用的失败策略。值可以是 fail(默认)、ignoredead-letter-queue

类型:字符串

false

失败

commit-strategy

指定当从记录生成的已生成消息被确认时要应用的提交策略。值可以是 latestignorethrottled。如果 enable.auto.commit 为 true,则默认为 ignore,否则为 throttled

类型:字符串

false

throttled.unprocessed-record-max-age.ms

在使用 throttled 提交策略时,指定未处理消息在连接器被标记为不健康之前可以存在的最长时间(以毫秒为单位)。将此属性设置为 0 会禁用此监控。

类型:整数

false

60000

dead-letter-queue.topic

failure-strategy 设置为 dead-letter-queue 时,指示记录发送到的主题。默认为 dead-letter-topic-$channel

类型:字符串

false

dead-letter-queue.key.serializer

failure-strategy 设置为 dead-letter-queue 时,指示要使用的键序列化器。如果未设置,则使用与键反序列化器关联的序列化器。

类型:字符串

false

dead-letter-queue.value.serializer

failure-strategy 设置为 dead-letter-queue 时,指示要使用的值序列化器。如果未设置,则使用与值反序列化器关联的序列化器。

类型:字符串

false

partitions

并发消费的分区数。连接器创建指定数量的 Kafka 消费者。它应该与目标主题的分区数匹配。

类型:整数

false

1

requests

partitions 大于 1 时,此属性允许配置每个消费者每次请求的记录数。

类型:整数

false

128

consumer-rebalance-listener.name

在实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 的 Bean 中设置的 @Identifier 名称。如果设置,此 rebalance 侦听器将应用于消费者。

类型:字符串

false

key-deserialization-failure-handler

在实现 io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler 的 Bean 中设置的 @Identifier 名称。如果设置,在反序列化键时发生的反序列化失败将委托给此处理程序,该处理程序可以重试或提供备用值。

类型:字符串

false

value-deserialization-failure-handler

在实现 io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler 的 Bean 中设置的 @Identifier 名称。如果设置,在反序列化值时发生的反序列化失败将委托给此处理程序,该处理程序可以重试或提供备用值。

类型:字符串

false

fail-on-deserialization-failure

如果没有设置反序列化失败处理程序,并且发生反序列化失败,则报告失败并将应用程序标记为不健康。如果设置为 false 且发生反序列化失败,则会转发一个 null 值。

类型:布尔值

false

true

graceful-shutdown

应用程序终止时是否应尝试优雅关机。

类型:布尔值

false

true

poll-timeout

轮询超时时间(以毫秒为单位)。轮询记录时,轮询最多等待该持续时间后才会返回记录。默认为 1000ms。

类型:整数

false

1000

pause-if-no-requests

当应用程序不请求项目时是否应暂停轮询,并在请求时恢复。这允许实现基于应用程序容量的反压。请注意,轮询不会停止,但在暂停时不会检索任何记录。

类型:布尔值

false

true

batch

Kafka 记录是否以批处理方式消费。通道注入点必须消耗兼容的类型,例如 List<Payload>KafkaRecordBatch<Payload>

类型:布尔值

false

false

max-queue-size-factor

用于确定排队处理的记录的最大数量的乘数因子,使用 max.poll.records * max-queue-size-factor。默认为 2。在 batch 模式下,max.poll.records 被视为 1

类型:整数

false

2

27.2. 出站通道配置(写入 Kafka)

以下属性使用以下方式配置:

mp.messaging.outgoing.your-channel-name.attribute=value

某些属性具有可以全局配置的别名。

kafka.bootstrap.servers=...

您还可以传递底层 Kafka 生产者 支持的任何属性。

例如,要配置 max.block.ms 属性,请使用:

mp.messaging.incoming.[channel].max.block.ms=10000

一些生产者客户端属性已配置为合理的默认值:

如果未设置,reconnect.backoff.max.ms 设置为 10000 以避免断开连接时的高负载。

如果未设置,key.serializer 设置为 org.apache.kafka.common.serialization.StringSerializer

如果未设置,生产者 client.id 将生成为 [client-id-prefix][channel-name]

表 2. 'smallrye-kafka' 连接器的出站属性
属性 (别名) 描述 强制 默认

acks

生产者要求 leader 接收之前才能将请求视为完成的确认数。这控制着发送记录的持久性。可接受的值是:0、1、all。

类型:字符串

false

1

bootstrap.servers

(kafka.bootstrap.servers)

用于建立与 Kafka 集群的初始连接的主机:端口的逗号分隔列表。

类型:字符串

false

localhost:9092

client-id-prefix

Kafka 客户端 client.id 属性的前缀。如果配置或生成的 client.id 已定义,它将以给定值作为前缀,否则 kafka-producer- 是前缀。

类型:字符串

false

buffer.memory

生产者可用于缓冲等待发送到服务器的记录的总字节数。

类型:长整型

false

33554432

close-timeout

等待 Kafka 生产者优雅关闭的毫秒数。

类型:整数

false

10000

cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在*入站*通道上启用,连接器会分析入站记录并尝试创建 Cloud Event 元数据。如果在*出站*通道上启用,如果消息包含 Cloud Event 元数据,连接器会将出站消息作为 Cloud Event 发送。

类型:布尔值

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

配置出站 Cloud Event 的默认 datacontenttype 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 datacontenttype 属性,则使用此值。

类型:字符串

false

cloud-events-data-schema

(cloud-events-default-data-schema)

配置出站 Cloud Event 的默认 dataschema 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 dataschema 属性,则使用此值。

类型:字符串

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

连接器是否应自动插入出站 Cloud Event 的 time 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 time 属性,则使用此值。

类型:布尔值

false

true

cloud-events-mode

Cloud Event 模式(structuredbinary(默认))。指示 Cloud Event 如何写入出站记录。

类型:字符串

false

binary

cloud-events-source

(cloud-events-default-source)

配置出站 Cloud Event 的默认 source 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 source 属性,则使用此值。

类型:字符串

false

cloud-events-subject

(cloud-events-default-subject)

配置出站 Cloud Event 的默认 subject 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 subject 属性,则使用此值。

类型:字符串

false

cloud-events-type

(cloud-events-default-type)

配置出站 Cloud Event 的默认 type 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 type 属性,则使用此值。

类型:字符串

false

health-enabled

是否启用(默认)或禁用健康报告。

类型:布尔值

false

true

health-readiness-enabled

是否启用(默认)或禁用就绪健康报告。

类型:布尔值

false

true

health-readiness-timeout

*已弃用* - 在就绪健康检查期间,连接器连接到 Broker 并检索主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超过该时间,则通道被视为未就绪。已弃用:请改用 'health-topic-verification-timeout'。

类型:长整型

false

health-readiness-topic-verification

*已弃用* - 就绪检查是否应验证主题是否存在于 Broker 中。默认为 false。启用它需要管理连接。已弃用:请改用 'health-topic-verification-enabled'。

类型:布尔值

false

health-topic-verification-enabled

启动和就绪检查是否应验证主题是否存在于 Broker 中。默认为 false。启用它需要管理客户端连接。

类型:布尔值

false

false

health-topic-verification-timeout

在启动和就绪健康检查期间,连接器连接到 Broker 并检索主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超过该时间,则通道被视为未就绪。

类型:长整型

false

2000

kafka-configuration

为该通道提供默认 Kafka 消费者/生产者配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

类型:字符串

false

key

写入记录时使用的键。

类型:字符串

false

key-serialization-failure-handler

在实现 io.smallrye.reactive.messaging.kafka.SerializationFailureHandler 的 Bean 中设置的 @Identifier 名称。如果设置,在序列化键时发生的序列化失败将委托给此处理程序,该处理程序可能提供备用值。

类型:字符串

false

key.serializer

用于序列化记录键的序列化器类名。

类型:字符串

false

org.apache.kafka.common.serialization.StringSerializer

lazy-client

Kafka 客户端是延迟创建还是立即创建。

类型:布尔值

false

false

max-inflight-messages

要并发写入 Kafka 的最大消息数。它限制了等待写入并由 Broker 确认的消息数量。您可以将此属性设置为 0 来移除限制。

类型:长整型

false

1024

merge

连接器是否允许多个上游。

类型:布尔值

false

false

partition

目标分区 ID。-1 表示让客户端确定分区。

类型:整数

false

-1

propagate-headers

要传播到出站记录的入站记录头的逗号分隔列表。

类型:字符串

false

propagate-record-key

将入站记录的键传播到出站记录。

类型:布尔值

false

false

retries

如果设置为正数,连接器将尝试重新发送任何未成功传递的消息(可能出现瞬时错误),直到达到重试次数。如果设置为 0,则禁用重试。如果未设置,连接器将尝试在 delivery.timeout.ms 配置的时间内重新发送任何无法成功传递的消息(因为可能发生瞬时错误)。

类型:长整型

false

2147483647

topic

消耗/填充的 Kafka 主题。如果未设置此属性或 topics 属性,则使用通道名称。

类型:字符串

false

已启用跟踪

是否启用(默认)或禁用跟踪

类型:布尔值

false

true

value-serialization-failure-handler

在实现 io.smallrye.reactive.messaging.kafka.SerializationFailureHandler 的 Bean 中设置的 @Identifier 名称。如果设置,在序列化值时发生的序列化失败将委托给此处理程序,该处理程序可能提供备用值。

类型:字符串

false

value.serializer

用于序列化有效载荷的序列化器类名。

类型:字符串

true

waitForWriteCompletion

在确认消息之前,客户端是否等待 Kafka 确认写入的记录。

类型:布尔值

false

true

27.3. Kafka 配置解析

Quarkus 将所有 Kafka 相关的应用程序属性(以 kafka.KAFKA_ 开头的属性)暴露在一个名为 default-kafka-broker 的配置映射中。此配置用于建立与 Kafka Broker 的连接。

除了此默认配置外,您还可以通过 kafka-configuration 属性配置生产者 Map 的名称:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

在这种情况下,连接器会查找与 my-configuration 名称关联的 Map。如果未设置 kafka-configuration,则会查找以通道名称(上例中为 my-channel)公开的 Map

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}
如果设置了 kafka-configuration 但找不到 Map,则部署将失败。

属性值解析如下:

  1. 该属性直接在通道配置中设置(mp.messaging.incoming.my-channel.attribute=value),

  2. 如果未设置,连接器会查找具有通道名称或已配置的 kafka-configuration(如果已设置)的 Map,并从中检索值。

  3. 如果解析的 Map 不包含该值,则使用默认的 Map(以 default-kafka-broker 名称公开)。

27.4. 条件性配置通道

您可以使用特定的配置文件来配置通道。因此,只有在启用了指定配置文件时,通道才会被配置(并添加到应用程序中)。

要实现这一点,您需要:

  1. mp.messaging.[incoming|outgoing].$channel 条目前加上 %my-profile 前缀,例如 %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. 在包含 @Incoming(channel)@Outgoing(channel) 注释的 CDI Bean 上使用 @IfBuildProfile("my-profile"),这些 Bean 仅在启用了配置文件时才需要启用。

请注意,Reactive Messaging 会验证图的完整性。因此,在使用这种条件性配置时,请确保应用程序在启用和不启用配置文件时都能正常工作。

请注意,此方法也可用于根据配置文件更改通道配置。

28. 集成 Kafka - 常见模式

28.1. 从 HTTP 端点写入 Kafka

要从 HTTP 端点发送消息到 Kafka,请在您的端点中注入一个 Emitter(或 MutinyEmitter)。

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 注入一个 Emitter<String>
2 HTTP 方法接收有效载荷并返回一个 CompletionStage,在消息写入 Kafka 时完成。
3 将消息发送到 Kafka,send 方法返回一个 CompletionStage

该端点将传入的有效载荷(来自 POST HTTP 请求)发送到发射器。发射器的通道在 application.properties 文件中映射到 Kafka 主题。

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

该端点返回一个 CompletionStage,表示方法的异步性质。emitter.send 方法返回一个 CompletionStage<Void>。当消息写入 Kafka 后,返回的 Future 会完成。如果写入失败,返回的 CompletionStage 会以异常形式完成。

如果端点不返回 CompletionStage,HTTP 响应可能会在消息发送到 Kafka 之前写入,因此失败不会报告给用户。

如果您需要发送 Kafka 记录,请使用:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 注意 Emitter<Record<K, V>> 的使用。
2 使用 Record.of(k, v) 创建记录。

28.2. 使用 Hibernate with Panache 持久化 Kafka 消息

要将从 Kafka 接收的对象持久化到数据库,您可以使用 Hibernate with Panache。

如果您使用 Hibernate Reactive,请参阅 使用 Hibernate Reactive 持久化 Kafka 消息

假设您接收 Fruit 对象。为简单起见,我们的 Fruit 类非常简单:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

要消费存储在 Kafka 主题上的 Fruit 实例并将它们持久化到数据库,您可以使用以下方法:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 配置入站通道。此通道从 Kafka 读取。
2 由于我们正在写入数据库,因此我们必须在事务中进行。此注释会启动一个新事务并在方法返回时提交它。Quarkus 会自动将方法视为*阻塞*。确实,使用经典 Hibernate 写入数据库是阻塞的。因此,Quarkus 会在您可以阻塞的工作线程(而不是 I/O 线程)上调用该方法。
3 该方法接收每个 Fruit。请注意,您需要一个反序列化器来从 Kafka 记录中重建 Fruit 实例。
4 持久化接收到的 fruit 对象。

如 <4> 中所述,您需要一个反序列化器,它可以根据记录创建 Fruit。这可以通过 Jackson 反序列化器来完成:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

相关的配置将是:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

有关 Jackson 与 Kafka 用法的更多详细信息,请查看 通过 Jackson 进行序列化。您也可以使用 Avro。

28.3. 使用 Hibernate Reactive 持久化 Kafka 消息

要将从 Kafka 接收的对象持久化到数据库,您可以使用 Hibernate Reactive with Panache。

假设您接收 Fruit 对象。为简单起见,我们的 Fruit 类非常简单:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 确保使用响应式变体:

要消费存储在 Kafka 主题上的 Fruit 实例并将它们持久化到数据库,您可以使用以下方法:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
    }

}
1 注入 Hibernate Reactive Session
2 Hibernate Reactive SessionPanache API 需要一个活动的 CDI 请求上下文。@ActivateRequestContext 注释会创建一个新的请求上下文,并在返回的方法的 Uni 完成时销毁它。如果未使用 Panache,则可以注入 Mutiny.SessionFactory 并以类似方式使用,而无需激活请求上下文或手动关闭会话。
3 请求一个新事务。事务在传递的操作完成时完成。
4 持久化实体。它返回一个 Uni<Fruit>
5 切换回 Uni<Void>
6 关闭会话 - 这会关闭与数据库的连接。然后可以回收该连接。

与*经典* Hibernate 不同,您不能使用 @Transactional。相反,我们使用 session.withTransaction 并持久化我们的实体。map 用于返回 Uni<Void> 而不是 Uni<Fruit>

您需要一个反序列化器,它可以根据记录创建 Fruit。这可以通过 Jackson 反序列化器来完成:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

相关的配置将是:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

有关 Jackson 与 Kafka 用法的更多详细信息,请查看 通过 Jackson 进行序列化。您也可以使用 Avro。

28.4. 将由 Hibernate 管理的实体写入 Kafka

假设有以下过程:

  1. 您接收到带有有效载荷的 HTTP 请求,

  2. 您从该有效载荷创建一个 Hibernate 实体实例,

  3. 您将该实体持久化到数据库,

  4. 您将实体发送到 Kafka 主题。

如果您使用 Hibernate Reactive,请参阅 将由 Hibernate Reactive 管理的实体写入 Kafka

由于我们要写入数据库,因此我们必须在事务中运行此方法。但是,将实体发送到 Kafka 是异步发生的。我们可以通过在 MutinyEmitter 上使用 .sendAndAwait().sendAndForget(),或者在 Emitter 上使用 .send().toCompletableFuture().join() 来实现此目的。

要实现此过程,您需要以下方法:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                               (1)
    public void storeAndSendToKafka(Fruit fruit) {               (2)
        fruit.persist();
        emitter.sendAndAwait(new FruitDto(fruit));               (3)
    }
}
1 由于我们正在写入数据库,请确保我们在事务中运行:
2 该方法接收要持久化的 fruit 实例。
3 将托管实体包装在 Data Transfer Object 中并将其发送到 Kafka。这确保了托管实体不会受到 Kafka 序列化的影响。然后等待操作完成再返回。
在使用 @Transactional 时不应返回 CompletionStageUni,因为所有事务提交都将在单个线程上发生,这会影响性能。

28.5. 将由 Hibernate Reactive 管理的实体写入 Kafka

要将由 Hibernate Reactive 管理的实体发送到 Kafka,我们建议使用:

  • Quarkus REST 用于提供 HTTP 请求,

  • MutinyEmitter 用于将消息发送到通道,因此可以轻松地与 Hibernate Reactive 或 Hibernate Reactive with Panache 暴露的 Mutiny API 集成。

以下示例演示了如何接收有效载荷,使用 Hibernate Reactive with Panache 将其存储在数据库中,然后将持久化的实体发送到 Kafka:

package org.acme;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 注入一个 MutinyEmitter,它暴露了一个 Mutiny API。它简化了与 Hibernate Reactive with Panache 暴露的 Mutiny API 的集成。
2 接收有效载荷的 HTTP 方法返回一个 Uni<Void>。在操作完成时(实体已持久化并写入 Kafka)会写入 HTTP 响应。
3 我们需要在事务中将实体写入数据库。
4 一旦持久化操作完成,我们将实体发送到 Kafka。send 方法返回一个 Uni<Void>

28.6. 将 Kafka 主题作为 Server-Sent Events 流式传输

将 Kafka 主题作为 Server-Sent Events (SSE) 流式传输非常简单:

  1. 您在 HTTP 端点中注入代表 Kafka 主题的通道。

  2. 您将该通道作为 PublisherMulti 从 HTTP 方法返回。

以下代码提供了一个示例:

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

某些环境会在活动不足时断开 SSE 连接。解决方法是定期发送*ping*消息(或空对象)。

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

解决方法稍微复杂一些,因为除了发送来自 Kafka 的 fruit 之外,还需要定期发送 ping。为实现此目的,我们将来自 Kafka 的流与每 10 秒发出 {} 的周期性流合并。

28.7. 将 Kafka 事务与 Hibernate Reactive 事务链接

通过将 Kafka 事务与 Hibernate Reactive 事务链接,您可以将记录发送到 Kafka 事务,执行数据库更新,并且仅当数据库事务成功时才提交 Kafka 事务。

以下示例演示了:

  • 通过 Quarkus REST 使用 HTTP 请求接收有效载荷,

  • 使用 SmallRye Fault Tolerance 限制该 HTTP 端点的并发性,

  • 启动 Kafka 事务并将有效载荷发送到 Kafka 记录,

  • 使用 Hibernate Reactive with Panache 将有效载荷存储在数据库中,

  • 仅当实体成功持久化时才提交 Kafka 事务。

package org.acme;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 注入 KafkaTransactions,它公开了一个 Mutiny API。它允许与 Hibernate Reactive with Panache 暴露的 Mutiny API 集成。
2 将 HTTP 端点的并发限制为 "1",防止同时启动多个事务。
3 接收有效载荷的 HTTP 方法返回一个 Uni<Void>。在操作完成时(实体已持久化且 Kafka 事务已提交)会写入 HTTP 响应。
4 开始一个 Kafka 事务。
5 在 Kafka 事务中将有效载荷发送到 Kafka。
6 在 Hibernate Reactive 事务中将实体持久化到数据库。
7 一旦持久化操作完成且没有错误,Kafka 事务将被提交。结果将被省略并作为 HTTP 响应返回。

在上面的示例中,数据库事务(内部)将提交,然后是 Kafka 事务(外部)。如果您希望先提交 Kafka 事务,然后再提交数据库事务,您需要以相反的顺序嵌套它们。

下一个示例演示了这一点,使用了 Hibernate Reactive API(无 Panache):

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        return sf.withTransaction(session -> (2)
                kafkaTx.withTransaction(emitter -> (3)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (4)
                ));
    }
}
1 注入 Hibernate Reactive SessionFactory
2 开始一个 Hibernate Reactive 事务。
3 开始一个 Kafka 事务。
4 持久化有效载荷并将实体发送到 Kafka。

或者,您可以使用 @WithTransaction 注释来启动一个事务并在方法返回时提交它:

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    @WithTransaction (1)
    public Uni<Void> post(Fruit fruit) {
        return kafkaTx.withTransaction(emitter -> (2)
            fruit.persist().invoke(() -> emitter.send(fruit)) (3)
        );
    }
}
1 启动一个 Hibernate Reactive 事务并在方法返回时提交它。
2 开始一个 Kafka 事务。
3 持久化有效载荷并将实体发送到 Kafka。

28.8. 将 Kafka 事务与 Hibernate ORM 事务链接

虽然 KafkaTransactions 在 Mutiny 之上提供了响应式 API 来管理 Kafka 事务,但您仍然可以将 Kafka 事务与阻塞的 Hibernate ORM 事务链接。

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Pet> emitter;

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    @Transactional (1)
    public void post(Fruit fruit) {
        emitter.withTransaction(e -> { (2)
            // if id is attributed by the database, will need to flush to get it
            // fruit.persistAndFlush();
            fruit.persist(); (3)
            Log.infov("Persisted fruit {0}", p);
            e.send(p); (4)
            return Uni.createFrom().voidItem();
        }).await().indefinitely(); (5)
    }
}
1 开始一个 Hibernate ORM 事务。事务在方法返回时提交。
2 开始一个 Kafka 事务。
3 持久化有效载荷。
4 在 Kafka 事务中将实体发送到 Kafka。
5 等待返回的 Uni 以便 Kafka 事务完成。

29. 日志记录

为了减少 Kafka 客户端生成的日志量,Quarkus 将以下日志类别的级别设置为 WARNING

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

您可以通过在 application.properties 中添加以下行来覆盖配置:

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

30. 连接到托管 Kafka 集群

本节将介绍如何连接到著名的 Kafka Cloud 服务。

30.1. Azure Event Hub

Azure Event Hub 提供了一个与 Apache Kafka 兼容的端点。

Kafka 的 Azure Event Hubs 在*基本*层不可用。您至少需要*标准*层才能使用 Kafka。请参阅 Azure Event Hubs 定价 以查看其他选项。

要使用 TLS 通过 Kafka 协议连接到 Azure Event Hub,您需要以下配置:

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 端口是 9093
2 您需要使用 JAAS PlainLoginModule
3 用户名是 $ConnectionString 字符串。
4 Azure 提供的 Event Hub 连接字符串。

<YOUR.EVENTHUBS.CONNECTION.STRING> 替换为您 Event Hubs 命名空间的连接字符串。有关获取连接字符串的说明,请参阅 获取 Event Hubs 连接字符串。结果将类似:

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

此配置可以是全局的(如上所示),也可以在通道配置中设置:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

30.2. Red Hat OpenShift Streams for Apache Kafka

Red Hat OpenShift Streams for Apache Kafka 提供托管的 Kafka Broker。首先,遵循 使用 rhoas CLI 获取 Red Hat OpenShift Streams for Apache Kafka 的入门指南 来创建您的 Kafka Broker 实例。确保您已复制了与您创建的*服务账户*关联的客户端 ID 和客户端密码。

然后,您可以如下配置 Quarkus 应用程序以连接到 Broker:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 连接字符串,在管理控制台上提供,例如 demo-c—​bjsv-ldd-cvavkc-a.bf2.kafka.rhcloud.com:443
2 Kafka 用户名(来自服务账户的客户端 ID)。
3 Kafka 密码(来自服务账户的客户端密码)。
通常,这些属性会使用 %prod 进行前缀,以便仅在生产模式下运行时启用它们。
使用 rhoas CLI 获取 Red Hat OpenShift Streams for Apache Kafka 的入门指南 所述,要使用 Red Hat OpenShift Streams for Apache Kafka,您必须提前创建主题,创建*服务账户*,并授予该服务账户读取和写入您主题的权限。身份验证数据(客户端 ID 和密码)与服务账户相关,这意味着您可以实现精细权限并限制对主题的访问。

在 Kubernetes 中使用时,建议将客户端 ID 和密码存储在 Kubernetes secret 中:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

为了让您的 Quarkus 应用程序能够使用该 secret,请将以下行添加到 application.properties 文件中:

%prod.quarkus.openshift.env.secrets=kafka-credentials

30.2.1. Red Hat OpenShift Service Registry

Red Hat OpenShift Service Registry 提供完全托管的服务注册表,用于处理 Kafka 模式。

您可以按照 Red Hat OpenShift Service Registry 入门指南 中的说明进行操作,或使用 rhoas CLI 创建新的服务注册表实例:

rhoas service-registry create --name my-schema-registry

请务必记下实例的*注册表 URL*。对于身份验证,您可以使用之前创建的同一个*服务账户*。您需要确保它具有访问服务注册表的必要权限。

例如,使用 rhoas CLI,您可以为服务账户授予 MANAGER 角色:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

然后,您可以如下配置 Quarkus 应用程序以连接到模式注册表:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} (1)
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} (2)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} (3)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} (4)
1 服务注册表 URL,在管理控制台上提供,例如 https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2
2 OAuth 令牌端点 URL,例如 https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token
3 客户端 ID(来自服务账户)。
4 客户端密码(来自服务账户)。

30.2.2. 使用 Service Binding Operator 将 Red Hat OpenShift 托管服务绑定到 Quarkus 应用程序

如果您的 Quarkus 应用程序部署在已安装 Service Binding OperatorOpenShift Application Services Operator 的 Kubernetes 或 OpenShift 集群中,则访问 Red Hat OpenShift Streams for Apache Kafka 和 Service Registry 所需的配置可以通过 Kubernetes 服务绑定 注入到应用程序中。

为了设置服务绑定,您需要首先将 OpenShift 托管服务连接到您的集群。对于 OpenShift 集群,您可以遵循 将 Kafka 和 Service Registry 实例连接到您的 OpenShift 集群 中的说明。

一旦您将集群与 RHOAS Kafka 和 Service Registry 实例连接起来,请确保您已授予新创建的服务账户必要的权限。

然后,使用 Kubernetes 服务绑定 扩展,您可以配置 Quarkus 应用程序以生成这些服务的 ServiceBinding 资源:

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

在此示例中,Quarkus 构建将生成以下 ServiceBinding 资源:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

您可以遵循 部署到 OpenShift 来部署您的应用程序,包括生成的 ServiceBinding 资源。访问 Kafka 和 Schema Registry 实例所需的配置属性将在部署时自动注入到应用程序中。

31. 深入了解

本指南展示了如何使用 Quarkus 与 Kafka 进行交互。它利用 Quarkus Messaging 来构建数据流应用程序。

如果您想进一步了解,请查看 Quarkus 中使用的 SmallRye Reactive Messaging 的文档。

相关内容