编辑此页面

将 Apache Kafka 与 Schema Registry 和 JSON Schema 一起使用

本指南展示了您的 Quarkus 应用程序如何使用 Apache Kafka、JSON Schema 序列化记录,并连接到模式注册表(例如 Confluent Schema RegistryApicurio Registry)。

如果您不熟悉 Kafka,尤其是 Quarkus 中的 Kafka,请先阅读 使用 Reactive Messaging 的 Apache Kafka 指南。

先决条件

要完成本指南,您需要

  • 大约 30 分钟

  • 一个 IDE

  • 已安装 JDK 17+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.9.9

  • Docker 和 Docker Compose 或 Podman,以及 Docker Compose

  • 如果您想使用它,可以选择 Quarkus CLI

  • 如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置

架构

在本指南中,我们将实现一个 REST 资源,即 MovieResource,它将消费电影 DTO 并将它们放入 Kafka 主题中。

然后,我们将实现一个消费者,它将从同一主题消费和收集消息。收集的消息将通过另一个资源 ConsumedMovieResource,通过 Server-Sent Events 公开。

电影 将使用 JSON Schema 进行序列化和反序列化。描述 Movie 的模式存储在 Apicurio Registry 中。如果您使用 Confluent JSON Schema serde 和 Confluent Schema Registry,则适用相同的概念。

解决方案

我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 仓库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或者下载 归档文件

解决方案位于 kafka-json-schema-quickstart 目录中。

创建 Maven 项目

首先,我们需要一个新项目。使用以下命令创建一个新项目

CLI
quarkus create app org.acme:kafka-json-schema-quickstart \
    --extension='rest-jackson,messaging-kafka,apicurio-registry-json-schema' \
    --no-code
cd kafka-json-schema-quickstart

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-json-schema-quickstart \
    -Dextensions='rest-jackson,messaging-kafka,apicurio-registry-json-schema' \
    -DnoCode
cd kafka-json-schema-quickstart

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=kafka-json-schema-quickstart"

如果您使用 Confluent Schema Registry,则不需要 quarkus-apicurio-registry-json-schema 扩展。相反,您需要 quarkus-confluent-registry-json-schema 扩展和一些其他依赖项。有关详细信息,请参阅 使用 Confluent Schema Registry

Json Schema

Json Schema 是一种数据序列化系统。数据结构使用模式描述。我们需要做的第一件事是创建一个描述 Movie 结构的模式。创建一个名为 src/main/resources/json-schema.json 的文件,其中包含我们记录(Kafka 消息)的模式

{
  "$id": "https://example.com/person.schema.json",
  "$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
  "title": "Movie",
  "type": "object",
  "properties": {
    "title": {
      "type": "string",
      "description": "The movie's title."
    },
    "year": {
      "type": "integer",
      "description": "The movie's year."
    }
  }
}

请注意,无法从 JSON Schema 定义自动生成 Java 类。因此,您必须按如下方式定义 Java 类,以便序列化过程可以使用它

package org.acme.kafka;

public class Movie {

    private String title;
    private Integer year;

    public Movie() {
    }

    public Movie(String title, Integer year) {
        this.title = title;
        this.year = year;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public Integer getYear() {
        return year;
    }

    public void setYear(Integer year) {
        this.year = year;
    }
}

Movie 生产者

定义了模式后,我们现在可以开始实现 MovieResource

让我们打开 MovieResource,注入一个 EmitterMovie DTO,并实现一个 @POST 方法,该方法消费 Movie 并通过 Emitter 发送它

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

现在,我们需要将 movies 通道(Emitter 发送到此通道)映射 到 Kafka 主题,并将模式映射 到此通道上使用。为此,请编辑 application.properties 文件,并添加以下内容

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false

# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

请注意,与 avro 序列化不同,自动检测 不能与 JSON Schema 一起使用,因此我们必须定义 value.serializer。与 avro 一样,我们仍然必须定义 apicurio.registry.auto-register 属性。

如果您使用 Confluent Schema Registry,在这种情况下,您还必须将 value.serializer 定义为 io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer。它也会被自动检测到。与 apicurio.registry.auto-register 对应的 Confluent Schema Registry 称为 auto.register.schemas。它默认为 true,因此在此示例中不必配置。如果要禁用自动模式注册,可以显式设置为 false

Movie 消费者

因此,我们可以将包含 Movie 数据的记录写入 Kafka。该数据使用 JSON Schema 进行序列化。现在,是时候为它们实现一个消费者了。

让我们创建 ConsumedMovieResource,它将从 movies-from-kafka 通道消费 Movie 消息,并通过 Server-Sent Events 公开它

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

应用程序代码的最后一部分是在 application.properties 中配置 movies-from-kafka 通道

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

同样,与 Avro 不同,我们必须定义 value.deserializer

如果您使用 Confluent Schema Registry,您还必须将 value.deserializer 配置为 io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer。它们都会被自动检测到。

运行应用程序

在开发模式下启动应用程序

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

感谢 Dev Services,Kafka broker 和 Apicurio Registry 实例会自动启动。有关详细信息,请参阅 Kafka 的 Dev ServicesApicurio Registry 的 Dev Services

您可能已经注意到,我们没有在任何地方配置模式注册表 URL。这是因为 Apicurio Registry 的 Dev Services 配置了 Quarkus Messaging 中的所有 Kafka 通道,以使用自动启动的注册表实例。

Apicurio Registry 除了其原生 API 外,还公开了一个与 Confluent Schema Registry API 兼容的端点。因此,这种自动配置适用于 Apicurio Registry serde 和 Confluent Schema Registry serde。

但是请注意,没有 Dev Services 支持运行 Confluent Schema Registry 本身。如果您想使用正在运行的 Confluent Schema Registry 实例,请配置其 URL 以及 Kafka broker 的 URL

kafka.bootstrap.servers=PLAINTEXT://:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=https://:8081

在第二个终端中,使用 curl 查询 ConsumedMovieResource 资源

curl -N https://:8080/consumed-movies

在第三个终端中,发布几部电影

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  https://:8080/movies

观察在第二个终端中打印的内容。您应该看到类似以下内容

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

在 JVM 或 Native 模式下运行

不在开发或测试模式下运行时,您需要启动自己的 Kafka broker 和 Apicurio Registry。运行它们的最佳方法是使用 docker-compose 启动相应的容器。

如果您使用 Confluent Schema Registry,您已经运行并配置了 Kafka broker 和 Confluent Schema Registry 实例。您可以忽略此处的 docker-compose 说明以及 Apicurio Registry 配置。

在项目根目录创建一个 docker-compose.yaml 文件,内容如下

version: '2'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:2.4.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

在启动应用程序之前,让我们首先启动 Kafka broker 和 Apicurio Registry

docker-compose up
要停止容器,请使用 docker-compose down。您还可以使用 docker-compose rm 清理容器

您可以使用以下命令构建应用程序

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

并使用以下命令在 JVM 模式下运行它

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到侦听 localhost:9092 的 Kafka broker。您可以使用以下命令配置 bootstrap 服务器:java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar

在命令行上指定注册表 URL 不是非常方便,因此您可以仅为 prod 配置文件添加配置属性

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2

您可以使用以下命令构建 native 可执行文件

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

并使用以下命令运行它

./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

测试应用程序

如上所述,Kafka 和 Apicurio Registry 的 Dev Services 会在开发模式和测试中自动启动和配置 Kafka broker 和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们可以专注于编写测试。

首先,让我们在构建文件中添加对 REST Client 和 Awaitility 的测试依赖项

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

在测试中,我们将循环发送电影,并检查 ConsumedMovieResource 是否返回我们发送的内容。

package org.acme.kafka;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}
我们修改了随项目一起生成的 MovieResourceTest。这个测试类有一个子类 NativeMovieResourceIT,它针对 native 可执行文件运行相同的测试。要运行它,请执行
CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

手动设置

如果我们无法使用 Dev Services 并想要手动启动 Kafka broker 和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.105.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

使用兼容版本的 Apicurio Registry

quarkus-apicurio-registry-json-schema 扩展依赖于 Apicurio Registry 客户端的最新版本,并且 Apicurio Registry 服务器和客户端的大多数版本都是向后兼容的。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。

例如,对于 Apicurio Dev Service,如果您设置图像名称以使用版本 2.1.5.Final

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final

您需要确保 apicurio-registry-serdes-json-schema-serde 依赖项和 REST 客户端 apicurio-common-rest-client-vertx 依赖项设置为兼容版本

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-apicurio-registry-json-schema</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-vertx</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-client</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-common</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
    <version>2.1.5.Final</version>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-jdk</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-common-rest-client-vertx</artifactId>
    <version>0.1.5.Final</version>
</dependency>
build.gradle
dependencies {
    implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))

    ...

    implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
    implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
        exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
        exclude group: "io.apicurio", module: "apicurio-registry-client"
        exclude group: "io.apicurio", module: "apicurio-registry-common"
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-client") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-common") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-common-rest-client-vertx") {
        version {
            strictly "0.1.5.Final"
        }
    }
}

apicurio-registry-clientapicurio-common-rest-client-vertx 的已知先前兼容版本如下

  • apicurio-registry-client 2.1.5.Final 与 apicurio-common-rest-client-vertx 0.1.5.Final

  • apicurio-registry-client 2.3.1.Final 与 apicurio-common-rest-client-vertx 0.1.13.Final

使用 Confluent Schema Registry

如果要使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-json-schema 扩展,而不是 quarkus-apicurio-registry-json-schema 扩展。此外,您需要向 pom.xml / build.gradle 文件添加一些依赖项和一个自定义 Maven 仓库

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-json-schema</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-json-schema-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-json-schema")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-json-schema-serializer。在 native 模式下,Quarkus 支持以下版本:6.2.x7.0.x7.1.x7.2.x7.3.x

对于版本 7.4.x7.5.x,由于 Confluent Schema Serializer 的问题,您需要添加另一个依赖项

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

对于任何其他版本,可能需要调整 native 配置。

进一步阅读

相关内容