编辑此页面

使用 Apache Kafka Streams

本指南演示了您的 Quarkus 应用程序如何利用 Apache Kafka Streams API 来实现基于 Apache Kafka 的流处理应用程序。

先决条件

要完成本指南,您需要

  • 大约 30 分钟

  • 一个 IDE

  • 已安装 JDK 17+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.9.9

  • Docker 和 Docker Compose 或 Podman,以及 Docker Compose

  • 如果您想使用它,可以选择 Quarkus CLI

  • 如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置

建议您先阅读 Kafka 快速入门

Kafka Streams 的 Quarkus 扩展通过支持 Quarkus Dev Mode(例如,通过 ./mvnw quarkus:dev)来实现开发期间的快速周转时间。 在更改 Kafka Streams 拓扑的代码后,应用程序将在下一个输入消息到达时自动重新加载。

推荐的开发设置是拥有一些生产者,它们以固定的间隔(例如每秒)在已处理的主题上创建测试消息,并使用诸如 kafkacat 之类的工具观察流应用程序的输出主题。 使用开发模式,您将在保存时立即在输出主题上看到流应用程序最新版本生成的消息。

为了获得最佳的开发体验,我们建议将以下配置设置应用于您的 Kafka 代理

group.min.session.timeout.ms=250

还在您的 Quarkus application.properties 中指定以下设置

kafka-streams.consumer.session.timeout.ms=250
kafka-streams.consumer.heartbeat.interval.ms=200

这些设置共同确保应用程序在开发模式下重新启动后可以非常快速地重新连接到代理。

架构

在本指南中,我们将在一个组件(名为 generator)中生成(随机)温度值。 这些值与给定的气象站相关联,并写入 Kafka 主题 (temperature-values)。 另一个主题 (weather-stations) 仅包含有关气象站本身的主要数据(ID 和名称)。

第二个组件 (aggregator) 从两个 Kafka 主题读取数据,并在流式管道中处理它们

  • 两个主题在气象站 ID 上连接

  • 确定每个气象站的最低、最高和平均温度

  • 此聚合数据将写入第三个主题 (temperatures-aggregated)

可以通过检查输出主题来检查数据。 通过公开 Kafka Streams 交互式查询,也可以通过简单的 REST 查询获得每个气象站的最新结果。

总体架构如下所示

Architecture

解决方案

我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载 归档

解决方案位于 kafka-streams-quickstart 目录中。

创建生产者 Maven 项目

首先,我们需要一个带有温度值生产者的新项目。 使用以下命令创建一个新项目

CLI
quarkus create app org.acme:kafka-streams-quickstart-producer \
    --extension='kafka' \
    --no-code
mv kafka-streams-quickstart-producer producer

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-streams-quickstart-producer \
    -Dextensions='kafka' \
    -DnoCode
mv kafka-streams-quickstart-producer producer

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数括在双引号中,例如 "-DprojectArtifactId=kafka-streams-quickstart-producer"

此命令生成一个 Maven 项目,导入 Reactive Messaging 和 Kafka 连接器扩展。

如果您已经配置了 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 messaging-kafka 扩展添加到您的项目中

CLI
quarkus extension add quarkus-messaging-kafka
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-kafka'
Gradle
./gradlew addExtension --extensions='quarkus-messaging-kafka'

这会将以下内容添加到您的构建文件中

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

温度值生产者

创建 producer/src/main/java/org/acme/kafka/streams/producer/generator/ValuesGenerator.java 文件,内容如下

package org.acme.kafka.streams.producer.generator;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

/**
 * A bean producing random temperature data every second.
 * The values are written to a Kafka topic (temperature-values).
 * Another topic contains the name of weather stations (weather-stations).
 * The Kafka configuration is specified in the application configuration.
 */
@ApplicationScoped
public class ValuesGenerator {

    private static final Logger LOG = Logger.getLogger(ValuesGenerator.class);

    private Random random = new Random();

    private List<WeatherStation> stations = List.of(
                        new WeatherStation(1, "Hamburg", 13),
                        new WeatherStation(2, "Snowdonia", 5),
                        new WeatherStation(3, "Boston", 11),
                        new WeatherStation(4, "Tokio", 16),
                        new WeatherStation(5, "Cusco", 12),
                        new WeatherStation(6, "Svalbard", -7),
                        new WeatherStation(7, "Porthsmouth", 11),
                        new WeatherStation(8, "Oslo", 7),
                        new WeatherStation(9, "Marrakesh", 20));

    @Outgoing("temperature-values")                                        (1)
    public Multi<Record<Integer, String>> generate() {
        return Multi.createFrom().ticks().every(Duration.ofMillis(500))    (2)
                .onOverflow().drop()
                .map(tick -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                            .setScale(1, RoundingMode.HALF_UP)
                            .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return Record.of(station.id, Instant.now() + ";" + temperature);
                });
    }

    @Outgoing("weather-stations")                                          (3)
    public Multi<Record<Integer, String>> weatherStations() {
        return Multi.createFrom().items(stations.stream()
            .map(s -> Record.of(
                    s.id,
                    "{ \"id\" : " + s.id +
                    ", \"name\" : \"" + s.name + "\" }"))
        );
    }

    private static class WeatherStation {

        int id;
        String name;
        int averageTemperature;

        public WeatherStation(int id, String name, int averageTemperature) {
            this.id = id;
            this.name = name;
            this.averageTemperature = averageTemperature;
        }
    }
}
1 指示 Reactive Messaging 将返回的 Multi 中的项目分派到 temperature-values
2 该方法返回一个 Mutiny *stream* (Multi),每 0.5 秒发出一个随机温度值。
3 指示 Reactive Messaging 将返回的 Multi(气象站的静态列表)中的项目分派到 weather-stations

这两种方法都返回一个*reactive stream*,其项目分别发送到名为 temperature-valuesweather-stations 的流。

主题配置

两个通道都使用 Quarkus 配置文件 application.properties 映射到 Kafka 主题。 为此,将以下内容添加到文件 producer/src/main/resources/application.properties

# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.outgoing.weather-stations.connector=smallrye-kafka
mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer

这配置了 Kafka 引导服务器、两个主题和相应的(反)序列化程序。 有关不同配置选项的更多详细信息,请参见 Kafka 文档中的 生产者配置消费者配置 部分。

创建聚合器 Maven 项目

完成生产者应用程序后,就该实现实际的聚合器应用程序了,该应用程序将运行 Kafka Streams 管道。 像这样创建另一个项目

CLI
quarkus create app org.acme:kafka-streams-quickstart-aggregator \
    --extension='kafka-streams,rest-jackson' \
    --no-code
mv kafka-streams-quickstart-aggregator aggregator

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-streams-quickstart-aggregator \
    -Dextensions='kafka-streams,rest-jackson' \
    -DnoCode
mv kafka-streams-quickstart-aggregator aggregator

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数括在双引号中,例如 "-DprojectArtifactId=kafka-streams-quickstart-aggregator"

这将创建带有 Kafka Streams 的 Quarkus 扩展和 Quarkus REST(以前称为 RESTEasy Reactive)的 Jackson 支持的 aggregator 项目。

如果您已经配置了 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 kafka-streams 扩展添加到您的项目中

CLI
quarkus extension add kafka-streams
Maven
./mvnw quarkus:add-extension -Dextensions='kafka-streams'
Gradle
./gradlew addExtension --extensions='kafka-streams'

这会将以下内容添加到您的 pom.xml

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-kafka-streams")

管道实现

让我们开始实现流处理应用程序,方法是创建一些值对象,用于表示温度测量值、气象站和跟踪聚合值。

首先,创建文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStation.java,表示气象站,内容如下

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection (1)
public class WeatherStation {

    public int id;
    public String name;
}
1 @RegisterForReflection 注释指示 Quarkus 在本机编译期间保留类及其成员。 有关 @RegisterForReflection 注释的更多详细信息,请参见 本机应用程序提示 页面。

然后创建文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/TemperatureMeasurement.java,表示给定站的温度测量值

package org.acme.kafka.streams.aggregator.model;

import java.time.Instant;

public class TemperatureMeasurement {

    public int stationId;
    public String stationName;
    public Instant timestamp;
    public double value;

    public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
            double value) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.timestamp = timestamp;
        this.value = value;
    }
}

最后创建 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/Aggregation.java,它将用于在事件在流式管道中处理时跟踪聚合值

package org.acme.kafka.streams.aggregator.model;

import java.math.BigDecimal;
import java.math.RoundingMode;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Aggregation {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double sum;
    public double avg;

    public Aggregation updateFrom(TemperatureMeasurement measurement) {
        stationId = measurement.stationId;
        stationName = measurement.stationName;

        count++;
        sum += measurement.value;
        avg = BigDecimal.valueOf(sum / count)
                .setScale(1, RoundingMode.HALF_UP).doubleValue();

        min = Math.min(min, measurement.value);
        max = Math.max(max, measurement.value);

        return this;
    }
}

接下来,让我们在 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java 文件中创建实际的流式查询实现本身。 我们需要做的就是声明一个 CDI 生产者方法,该方法返回 Kafka Streams Topology; Quarkus 扩展将负责配置、启动和停止实际的 Kafka Streams 引擎。

package org.acme.kafka.streams.aggregator.streams;

import java.time.Instant;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.TemperatureMeasurement;
import org.acme.kafka.streams.aggregator.model.WeatherStation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;

import io.quarkus.kafka.client.serialization.ObjectMapperSerde;

@ApplicationScoped
public class TopologyProducer {

    static final String WEATHER_STATIONS_STORE = "weather-stations-store";

    private static final String WEATHER_STATIONS_TOPIC = "weather-stations";
    private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values";
    private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated";

    @Produces
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        ObjectMapperSerde<WeatherStation> weatherStationSerde = new ObjectMapperSerde<>(
                WeatherStation.class);
        ObjectMapperSerde<Aggregation> aggregationSerde = new ObjectMapperSerde<>(Aggregation.class);

        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                WEATHER_STATIONS_STORE);

        GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( (1)
                WEATHER_STATIONS_TOPIC,
                Consumed.with(Serdes.Integer(), weatherStationSerde));

        builder.stream(                                                       (2)
                        TEMPERATURE_VALUES_TOPIC,
                        Consumed.with(Serdes.Integer(), Serdes.String())
                )
                .join(                                                        (3)
                        stations,
                        (stationId, timestampAndValue) -> stationId,
                        (timestampAndValue, station) -> {
                            String[] parts = timestampAndValue.split(";");
                            return new TemperatureMeasurement(station.id, station.name,
                                    Instant.parse(parts[0]), Double.valueOf(parts[1]));
                        }
                )
                .groupByKey()                                                 (4)
                .aggregate(                                                   (5)
                        Aggregation::new,
                        (stationId, value, aggregation) -> aggregation.updateFrom(value),
                        Materialized.<Integer, Aggregation> as(storeSupplier)
                            .withKeySerde(Serdes.Integer())
                            .withValueSerde(aggregationSerde)
                )
                .toStream()
                .to(                                                          (6)
                        TEMPERATURES_AGGREGATED_TOPIC,
                        Produced.with(Serdes.Integer(), aggregationSerde)
                );

        return builder.build();
    }
}
1 weather-stations 表被读入 GlobalKTable,表示每个气象站的当前状态
2 temperature-values 主题被读入 KStream; 每当新消息到达该主题时,将为此测量处理管道
3 来自 temperature-values 主题的消息使用主题的键(气象站 ID)与相应的气象站连接; 连接结果包含来自测量值和关联的气象站消息的数据
4 这些值按消息键(气象站 ID)分组
5 在每个组中,通过跟踪最小值和最大值并计算该站所有测量值的平均值(请参见 Aggregation 类型),对该站的所有测量值进行聚合
6 管道的结果将写入 temperatures-aggregated 主题

Kafka Streams 扩展通过 Quarkus 配置文件 application.properties 进行配置。 创建带有以下内容的文件 aggregator/src/main/resources/application.properties

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=weather-stations,temperature-values

# pass-through options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG

带有 quarkus.kafka-streams 前缀的选项可以在应用程序启动时动态更改,例如,通过环境变量或系统属性。 bootstrap-serversapplication-server 分别映射到 Kafka Streams 属性 bootstrap.serversapplication.servertopics 是 Quarkus 特有的:应用程序将等待所有给定的主题存在,然后启动 Kafka Streams 引擎。 这样做是为了优雅地等待在应用程序启动时尚不存在的主题的创建。

或者,您可以使用 kafka.bootstrap.servers 而不是 quarkus.kafka-streams.bootstrap-servers,就像在上面的 *generator* 项目中所做的那样。

准备好将应用程序推广到生产环境后,请考虑更改上述配置值。 虽然 cache.max.bytes.buffering=10240 将使您的记录更快地通过拓扑,但 10485760 的默认值更适合吞吐量。 还要考虑将 metadata.max.age.ms500(这将快速更新集群元数据,但会生成大量冗余请求)增加到更接近默认值 300000 的值。 1000commit.interval.ms 适用于精确一次处理,但对于具有默认值 30000 的默认至少一次处理,可能会产生过多的负载。

kafka-streams 命名空间中的所有属性都按原样传递到 Kafka Streams 引擎。 更改它们的值需要重建应用程序。

构建和运行应用程序

我们现在可以构建 produceraggregator 应用程序

./mvnw clean package -f producer/pom.xml
./mvnw clean package -f aggregator/pom.xml

我们将它们打包到容器镜像中并通过 Docker Compose 启动它们,而不是使用 Quarkus 开发模式直接在主机上运行它们。 这样做是为了演示稍后将 aggregator 聚合扩展到多个节点。

Quarkus 默认创建的 Dockerfile 需要对 aggregator 应用程序进行一项调整才能运行 Kafka Streams 管道。 为此,请编辑文件 aggregator/src/main/docker/Dockerfile.jvm 并将行 FROM fabric8/java-alpine-openjdk8-jre 替换为 FROM fabric8/java-centos-openjdk8-jdk

接下来,创建一个 Docker Compose 文件 (docker-compose.yaml) 以启动两个应用程序以及 Apache Kafka 和 ZooKeeper,如下所示

version: '3.5'

services:
  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafkastreams-network
  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkastreams-network

  producer:
    image: quarkus-quickstarts/kafka-streams-producer:1.0
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

  aggregator:
    image: quarkus-quickstarts/kafka-streams-aggregator:1.0
    build:
      context: aggregator
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

networks:
  kafkastreams-network:
    name: ks

要启动所有容器,构建 produceraggregator 容器镜像,请运行 docker-compose up --build

您可以改为使用 KAFKA_BOOTSTRAP_SERVERS 而不是 QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS

您应该会看到来自 producer 应用程序的有关消息已发送到 "temperature-values" 主题的日志语句。

现在运行一个 *debezium/tooling* 镜像实例,将其附加到所有其他容器运行所在的同一网络。 此镜像提供了几个有用的工具,例如 *kafkacat* 和 *httpie*

docker run --tty --rm -i --network ks debezium/tooling:1.1

在 tooling 容器中,运行 *kafkacat* 以检查流式管道的结果

kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated

{"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8}
{"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7}
{"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7}
...

您应该会看到新值到达,因为生产者会继续发出温度测量值,出站主题上的每个值都显示了代表的气象站的最低、最高和平均温度值。

交互式查询

订阅 temperatures-aggregated 主题是响应任何新温度值的好方法。 但是,如果您只对给定气象站的最新聚合值感兴趣,则这有点浪费。 这正是 Kafka Streams 交互式查询的闪光点:它们允许您直接查询管道的底层状态存储,以获取与给定键关联的值。 通过公开一个查询状态存储的简单 REST 端点,可以检索最新聚合结果,而无需订阅任何 Kafka 主题。

让我们首先在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/InteractiveQueries.java 中创建一个新类 InteractiveQueries

将一个方法添加到 KafkaStreamsPipeline 类中,该方法获取给定键的当前状态

package org.acme.kafka.streams.aggregator.streams;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@ApplicationScoped
public class InteractiveQueries {

    @Inject
    KafkaStreams streams;

    public GetWeatherStationDataResult getWeatherStationData(int id) {
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result)); (1)
        }
        else {
            return GetWeatherStationDataResult.notFound();                             (2)
        }
    }

    private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
        while (true) {
            try {
                return streams.store(TopologyProducer.WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
            } catch (InvalidStateStoreException e) {
                // ignore, store not ready yet
            }
        }
    }
}
1 找到了给定站 ID 的值,因此将返回该值
2 未找到任何值,要么是因为查询了一个不存在的站,要么是因为给定站还没有测量值

还在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/GetWeatherStationDataResult.java 中创建该方法的返回类型

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null);

    private final WeatherStationData result;

    private GetWeatherStationDataResult(WeatherStationData result) {
        this.result = result;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }
}

还创建 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStationData.java,它表示气象站的实际聚合结果

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class WeatherStationData {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double avg;

    private WeatherStationData(int stationId, String stationName, double min, double max,
            int count, double avg) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.min = min;
        this.max = max;
        this.count = count;
        this.avg = avg;
    }

    public static WeatherStationData from(Aggregation aggregation) {
        return new WeatherStationData(
                aggregation.stationId,
                aggregation.stationName,
                aggregation.min,
                aggregation.max,
                aggregation.count,
                aggregation.avg);
    }
}

我们现在可以添加一个简单的 REST 端点 (aggregator/src/main/java/org/acme/kafka/streams/aggregator/rest/WeatherStationEndpoint.java),该端点调用 getWeatherStationData() 并将数据返回给客户端

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {  (1)
            return Response.ok(result.getResult().get()).build();
        }
        else {
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }
}
1 根据是否获得值,返回该值或 404 响应

完成此代码后,就可以在 Docker Compose 中重建应用程序和 aggregator 服务

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d

这将重建 aggregator 容器并重新启动其服务。 完成后,您可以调用该服务的 REST API 以获取现有站之一的温度数据。 为此,您可以使用之前启动的 tooling 容器中的 httpie

http aggregator:8080/weather-stations/data/1

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 85
Content-Type: application/json
Date: Tue, 18 Jun 2019 19:29:16 GMT

{
    "avg": 12.9,
    "count": 146,
    "max": 41.0,
    "min": -25.6,
    "stationId": 1,
    "stationName": "Hamburg"
}

横向扩展

Kafka Streams 应用程序的一个非常有趣的特性是它们可以横向扩展,也就是说,负载和状态可以在运行相同管道的多个应用程序实例之间分配。 然后,每个节点将包含聚合结果的子集,但是 Kafka Streams 为您提供 一个 API,用于获取哪个节点托管给定键的信息。 然后,应用程序可以直接从另一个实例获取数据,或者只是将客户端指向该另一个节点的位置。

启动 aggregator 应用程序的多个实例将使整体架构看起来像这样

Architecture with multiple aggregator nodes

InteractiveQueries 类必须针对此分布式架构进行稍微调整

public GetWeatherStationDataResult getWeatherStationData(int id) {
    StreamsMetadata metadata = streams.metadataForKey(                  (1)
            TopologyProducer.WEATHER_STATIONS_STORE,
            id,
            Serdes.Integer().serializer()
    );

    if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) {
        LOG.warn("Found no metadata for key {}", id);
        return GetWeatherStationDataResult.notFound();
    }
    else if (metadata.host().equals(host)) {                            (2)
        LOG.info("Found data for key {} locally", id);
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result));
        }
        else {
            return GetWeatherStationDataResult.notFound();
        }
    }
    else {                                                              (3)
        LOG.info(
            "Found data for key {} on remote host {}:{}",
            id,
            metadata.host(),
            metadata.port()
        );
        return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
    }
}

public List<PipelineMetadata> getMetaData() {                           (4)
    return streams.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
            .stream()
            .map(m -> new PipelineMetadata(
                    m.hostInfo().host() + ":" + m.hostInfo().port(),
                    m.topicPartitions()
                        .stream()
                        .map(TopicPartition::toString)
                        .collect(Collectors.toSet()))
            )
            .collect(Collectors.toList());
}
1 获取给定气象站 ID 的流元数据
2 给定的键(气象站 ID)由本地应用程序节点维护,也就是说,它可以自己回答查询
3 给定的键由另一个应用程序节点维护; 在这种情况下,将返回有关该节点的信息(主机和端口)
4 getMetaData() 方法已添加,以向调用者提供应用程序集群中所有节点的列表。

必须相应地调整 GetWeatherStationDataResult 类型

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null, null, null);

    private final WeatherStationData result;
    private final String host;
    private final Integer port;

    private GetWeatherStationDataResult(WeatherStationData result, String host,
            Integer port) {
        this.result = result;
        this.host = host;
        this.port = port;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data, null, null);
    }

    public static GetWeatherStationDataResult foundRemotely(String host, int port) {
        return new GetWeatherStationDataResult(null, host, port);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }

    public Optional<String> getHost() {
        return Optional.ofNullable(host);
    }

    public OptionalInt getPort() {
        return port != null ? OptionalInt.of(port) : OptionalInt.empty();
    }
}

此外,必须定义 getMetaData() 的返回类型 (aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/PipelineMetadata.java)

package org.acme.kafka.streams.aggregator.streams;

import java.util.Set;

public class PipelineMetadata {

    public String host;
    public Set<String> partitions;

    public PipelineMetadata(String host, Set<String> partitions) {
        this.host = host;
        this.partitions = partitions;
    }
}

最后,必须更新 REST 端点类

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
import org.acme.kafka.streams.aggregator.streams.PipelineMetadata;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {                     (1)
            return Response.ok(result.getResult().get()).build();
        }
        else if (result.getHost().isPresent()) {                  (2)
            URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
                    id);
            return Response.seeOther(otherUri).build();
        }
        else {                                                    (3)
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }

    @GET
    @Path("/meta-data")
    @Produces(MediaType.APPLICATION_JSON)
    public List<PipelineMetadata> getMetaData() {                 (4)
        return interactiveQueries.getMetaData();
    }

    private URI getOtherUri(String host, int port, int id) {
        try {
            return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
}
1 在本地找到了数据,因此将其返回
2 数据由另一个节点维护,因此如果给定键的数据存储在其他节点之一上,则回复重定向(HTTP 状态代码 303)。
3 未找到给定气象站 ID 的数据
4 公开有关形成应用程序集群的所有主机的信息

现在再次停止 aggregator 服务并重建它。 然后让我们启动它的三个实例

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d --scale aggregator=3

在三个实例中的任何一个上调用 REST API 时,请求的气象站 ID 的聚合可能存储在接收查询的节点本地,或者可能存储在其他两个节点之一上。

由于 Docker Compose 的负载均衡器将以轮询方式将请求分发到 aggregator 服务,因此我们将直接调用实际节点。 应用程序通过 REST 公开有关所有主机名的信息

http aggregator:8080/weather-stations/meta-data

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 202
Content-Type: application/json
Date: Tue, 18 Jun 2019 20:00:23 GMT

[
    {
        "host": "2af13fe516a9:8080",
        "partitions": [
            "temperature-values-2"
        ]
    },
    {
        "host": "32cc8309611b:8080",
        "partitions": [
            "temperature-values-1"
        ]
    },
    {
        "host": "1eb39af8d587:8080",
        "partitions": [
            "temperature-values-0"
        ]
    }
]

从响应中显示的三个主机之一检索数据(您的实际主机名将有所不同)

http 2af13fe516a9:8080/weather-stations/data/1

如果该节点保存了键 "1" 的数据,您将收到如下响应

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 74
Content-Type: application/json
Date: Tue, 11 Jun 2019 19:16:31 GMT

{
  "avg": 11.9,
  "count": 259,
  "max": 50.0,
  "min": -30.1,
  "stationId": 1,
  "stationName": "Hamburg"
}

否则,该服务将发送重定向

HTTP/1.1 303 See Other
Connection: keep-alive
Content-Length: 0
Date: Tue, 18 Jun 2019 20:01:03 GMT
Location: http://1eb39af8d587:8080/weather-stations/data/1

您还可以让 *httpie* 通过传递 --follow option 自动遵循重定向

http --follow 2af13fe516a9:8080/weather-stations/data/1

以本机方式运行

Kafka Streams 的 Quarkus 扩展允许通过 GraalVM 以本机方式执行流处理应用程序,而无需进一步配置。

要以本机模式运行 produceraggregator 应用程序,可以使用 -Dnative 执行 Maven 构建

./mvnw clean package -f producer/pom.xml -Dnative -Dnative-image.container-runtime=docker
./mvnw clean package -f aggregator/pom.xml -Dnative -Dnative-image.container-runtime=docker

现在创建一个名为 QUARKUS_MODE 的环境变量,并将值设置为 "native"

export QUARKUS_MODE=native

Docker Compose 文件使用它来构建 produceraggregator 镜像时使用正确的 Dockerfile。 Kafka Streams 应用程序可以在本机模式下使用少于 50 MB 的 RSS。 为此,请将 Xmx 选项添加到 aggregator/src/main/docker/Dockerfile.native 中的程序调用

CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]

现在按照上述说明启动 Docker Compose(不要忘记重建容器镜像)。

Kafka Streams 健康检查

如果您使用的是 quarkus-smallrye-health 扩展,则 quarkus-kafka-streams 将自动添加

  • 准备就绪健康检查,以验证是否已创建在 quarkus.kafka-streams.topics 属性中声明的所有主题,

  • 基于 Kafka Streams 状态的活跃性健康检查。

因此,当您访问应用程序的 /q/health 端点时,您将获得有关 Kafka Streams 状态以及可用和/或缺失主题的信息。

这是状态为 DOWN 的一个示例

curl -i http://aggregator:8080/q/health

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 454

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",  (1)
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        },
        {
            "name": "Kafka Streams topics health check",  (2)
            "status": "DOWN",
            "data": {
                "available_topics": "weather-stations,temperature-values",
                "missing_topics": "hygrometry-values"
            }
        }
    ]
}
1 活跃性健康检查。 也可在 /q/health/live 端点上找到。
2 准备就绪健康检查。 也可在 /q/health/ready 端点上找到。

因此,如您所见,只要缺少 quarkus.kafka-streams.topics 之一或 Kafka Streams state 不是 RUNNING,状态就会变为 DOWN

如果没有可用的主题,则 Kafka Streams topics health checkdata 字段中将不存在 available_topics 键。 同样,如果没有缺失的主题,则 Kafka Streams topics health checkdata 字段中将不存在 missing_topics 键。

您当然可以通过在 application.properties 中将 quarkus.kafka-streams.health.enabled 属性设置为 false 来禁用 quarkus-kafka-streams 扩展的健康检查。

显然,您可以基于相应的端点 /q/health/live/q/health/ready 创建您的活跃性和准备就绪探针。

活跃性健康检查

这是活跃性检查的示例

curl -i http://aggregator:8080/q/health/live

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 225

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        }
    ]
}

state 来自 KafkaStreams.State 枚举。

准备就绪健康检查

这是准备就绪检查的示例

curl -i http://aggregator:8080/q/health/ready

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 265

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams topics health check",
            "status": "DOWN",
            "data": {
                "missing_topics": "weather-stations,temperature-values"
            }
        }
    ]
}

更进一步

本指南介绍了如何使用 Quarkus 和 Kafka Streams API 在 JVM 和本机模式下构建流处理应用程序。 为了在生产环境中运行您的 KStreams 应用程序,您还可以为数据管道添加健康检查和指标。 请参阅有关 MicrometerSmallRye MetricsSmallRye Health 的 Quarkus 指南以了解更多信息。

配置参考

构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖

配置属性

类型

默认

如果存在 smallrye-health 扩展,是否发布健康检查(默认为 true)。

环境变量:QUARKUS_KAFKA_STREAMS_HEALTH_ENABLED

显示更多

布尔值

true

此 Kafka Streams 应用程序的唯一标识符。 如果未设置,则默认为 quarkus.application.name。

环境变量:QUARKUS_KAFKA_STREAMS_APPLICATION_ID

显示更多

字符串

${quarkus.application.name}

标识 Kafka 引导服务器的以逗号分隔的 host:port 对列表。 如果未设置,则回退到 kafka.bootstrap.servers,如果也未设置,则使用 localhost:9092

环境变量:QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS

显示更多

主机:端口 列表

localhost:9092

此应用程序实例的唯一标识符,通常采用 host:port 形式。

环境变量:QUARKUS_KAFKA_STREAMS_APPLICATION_SERVER

显示更多

字符串

以逗号分隔的主题名称列表。 仅当所有这些主题都存在于 Kafka 集群中并且 ignore.topics 设置为 false 时,才会启动管道。

环境变量:QUARKUS_KAFKA_STREAMS_TOPICS

显示更多

字符串列表

以逗号分隔的主题名称模式列表。 仅当所有这些主题都存在于 Kafka 集群中并且 ignore.topics 设置为 false 时,才会启动管道。

环境变量:QUARKUS_KAFKA_STREAMS_TOPIC_PATTERNS

显示更多

字符串列表

等待从管理客户端返回主题名称的超时时间。 如果设置为 0(或负数),则忽略 topics 检查。

环境变量:QUARKUS_KAFKA_STREAMS_TOPICS_TIMEOUT

显示更多

Duration 

10S

架构注册表键。 不同的架构注册表库在不同的配置属性中需要注册表 URL。 对于 Apicurio Registry,请使用 apicurio.registry.url。 对于 Confluent 架构注册表,请使用 schema.registry.url

环境变量:QUARKUS_KAFKA_STREAMS_SCHEMA_REGISTRY_KEY

显示更多

字符串

schema.registry.url

架构注册表 URL。

环境变量:QUARKUS_KAFKA_STREAMS_SCHEMA_REGISTRY_URL

显示更多

字符串

环境变量:QUARKUS_KAFKA_STREAMS_SECURITY_PROTOCOL

显示更多

字符串

用于客户端连接的 SASL 机制

环境变量:QUARKUS_KAFKA_STREAMS_SASL_MECHANISM

显示更多

字符串

SASL 连接的 JAAS 登录上下文参数,采用 JAAS 配置文件使用的格式

环境变量:QUARKUS_KAFKA_STREAMS_SASL_JAAS_CONFIG

显示更多

字符串

SASL 客户端回调处理程序类的完全限定名称

环境变量:QUARKUS_KAFKA_STREAMS_SASL_CLIENT_CALLBACK_HANDLER_CLASS

显示更多

字符串

SASL 登录回调处理程序类的完全限定名称

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_CALLBACK_HANDLER_CLASS

显示更多

字符串

实现 Login 接口的类的完全限定名称

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_CLASS

显示更多

字符串

Kafka 作为 Kerberos 主体名称运行

环境变量:QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_SERVICE_NAME

显示更多

字符串

Kerberos kinit 命令路径

环境变量:QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_KINIT_CMD

显示更多

字符串

登录线程将休眠,直到上次刷新后的指定窗口因子时间

环境变量:QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR

显示更多

double

添加到续订时间的随机抖动百分比

环境变量:QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_TICKET_RENEW_JITTER

显示更多

double

添加到续订时间的随机抖动百分比

环境变量:QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN

显示更多

long

登录刷新线程将休眠,直到达到相对于凭据生命周期的指定窗口因子

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_WINDOW_FACTOR

显示更多

double

相对于凭据生命周期的最大随机抖动量

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_WINDOW_JITTER

显示更多

double

登录刷新线程在刷新凭据之前要等待的所需最短持续时间

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_MIN_PERIOD

显示更多

Duration 

刷新凭据时要维护的凭据到期之前的缓冲持续时间

环境变量:QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_BUFFER

显示更多

Duration 

用于生成 SSLContext 的 SSL 协议

环境变量:QUARKUS_KAFKA_STREAMS_SSL_PROTOCOL

显示更多

字符串

用于 SSL 连接的安全提供程序的名称

环境变量:QUARKUS_KAFKA_STREAMS_SSL_PROVIDER

显示更多

字符串

密码套件列表

环境变量: QUARKUS_KAFKA_STREAMS_SSL_CIPHER_SUITES

显示更多

字符串

SSL连接启用的协议列表

环境变量: QUARKUS_KAFKA_STREAMS_SSL_ENABLED_PROTOCOLS

显示更多

字符串

信任存储类型

环境变量: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_TYPE

显示更多

字符串

信任存储位置

环境变量: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_LOCATION

显示更多

字符串

信任存储密码

环境变量: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_PASSWORD

显示更多

字符串

信任存储证书

环境变量: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_CERTIFICATES

显示更多

字符串

密钥存储类型

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_TYPE

显示更多

字符串

密钥存储位置

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_LOCATION

显示更多

字符串

密钥存储密码

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_PASSWORD

显示更多

字符串

密钥存储私钥

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_KEY

显示更多

字符串

密钥存储证书链

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_CERTIFICATE_CHAIN

显示更多

字符串

密钥存储中私钥的密码

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEY_PASSWORD

显示更多

字符串

密钥管理器工厂用于SSL连接的算法

环境变量: QUARKUS_KAFKA_STREAMS_SSL_KEYMANAGER_ALGORITHM

显示更多

字符串

信任管理器工厂用于SSL连接的算法

环境变量: QUARKUS_KAFKA_STREAMS_SSL_TRUSTMANAGER_ALGORITHM

显示更多

字符串

端点识别算法,用于使用服务器证书验证服务器主机名

环境变量: QUARKUS_KAFKA_STREAMS_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM

显示更多

字符串

https

用于SSL加密操作的SecureRandom PRNG实现

环境变量: QUARKUS_KAFKA_STREAMS_SSL_SECURE_RANDOM_IMPLEMENTATION

显示更多

字符串

关于 Duration 格式

要写入持续时间值,请使用标准java.time.Duration格式。有关更多信息,请参阅Duration#parse() Java API文档

您还可以使用简化的格式,以数字开头

  • 如果该值仅为一个数字,则表示以秒为单位的时间。

  • 如果该值是一个数字后跟 ms,则表示以毫秒为单位的时间。

在其他情况下,简化格式将被转换为 java.time.Duration 格式以进行解析

  • 如果该值是一个数字后跟 hms,则在其前面加上 PT

  • 如果该值是一个数字后跟 d,则在其前面加上 P

相关内容