编辑此页面

将 Apache Kafka 与 Schema Registry 和 Avro 一起使用

本指南介绍您的 Quarkus 应用程序如何使用 Apache Kafka、Avro 序列化记录,并连接到模式注册表(例如 Confluent Schema RegistryApicurio Registry)。

如果您不熟悉 Kafka,特别是 Quarkus 中的 Kafka,请先查阅使用 Apache Kafka 和 Reactive Messaging指南。

先决条件

要完成本指南,您需要

  • 大约 30 分钟

  • 一个 IDE

  • 已安装 JDK 17+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.9.9

  • Docker 和 Docker Compose 或 Podman,以及 Docker Compose

  • 如果您想使用它,可以选择 Quarkus CLI

  • 如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置

架构

在本指南中,我们将实现一个 REST 资源,即 MovieResource,它将消耗电影 DTO 并将它们放入 Kafka 主题。

然后,我们将实现一个消费者,它将从同一个主题消费并收集消息。收集到的消息将通过另一个资源 ConsumedMovieResource,使用 Server-Sent Events 进行公开。

电影将使用 Avro 进行序列化和反序列化。描述电影的模式存储在 Apicurio Registry 中。如果您使用的是 Confluent Avro serde 和 Confluent Schema Registry,概念也是一样的。

解决方案

我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载一个存档

解决方案位于 kafka-avro-schema-quickstart 目录中。

创建 Maven 项目

首先,我们需要一个新项目。使用以下命令创建一个新项目

CLI
quarkus create app org.acme:kafka-avro-schema-quickstart \
    --extension='rest-jackson,messaging-kafka,apicurio-registry-avro' \
    --no-code
cd kafka-avro-schema-quickstart

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-avro-schema-quickstart \
    -Dextensions='rest-jackson,messaging-kafka,apicurio-registry-avro' \
    -DnoCode
cd kafka-avro-schema-quickstart

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=kafka-avro-schema-quickstart"

如果您使用 Confluent Schema Registry,则不需要 quarkus-apicurio-registry-avro 扩展。相反,您需要 quarkus-confluent-registry-avro 扩展和更多依赖项。有关详细信息,请参阅使用 Confluent Schema Registry

Avro 模式

Apache Avro 是一个数据序列化系统。数据结构使用模式进行描述。我们需要做的第一件事是创建描述 Movie 结构的模式。创建一个名为 src/main/avro/movie.avsc 的文件,其中包含我们记录(Kafka 消息)的模式。

{
  "namespace": "org.acme.kafka.quarkus",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "year",
      "type": "int"
    }
  ]
}

如果您使用以下方式构建项目

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

movies.avsc 将被编译成一个 Movie.java 文件,位于 target/generated-sources/avsc 目录中。

请查看 Avro 规范,了解更多关于 Avro 语法和支持类型的信息。

使用 Quarkus,无需使用特定的 Maven 插件来处理 Avro 模式,这一切都由 quarkus-avro 扩展为您完成!

如果您使用以下方式运行项目

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

您对模式文件的更改将自动应用于生成的 Java 文件。

Movie 生产者

在定义了模式之后,我们现在可以开始实现 MovieResource

让我们打开 MovieResource,注入一个 Movie DTO 的Emitter,并实现一个 @POST 方法,该方法消耗 Movie 并通过 Emitter 发送它。

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

现在,我们需要将 movies 通道(Emitter 发送到此通道)映射到一个 Kafka 主题。为了实现这一点,请编辑 application.properties 文件,并添加以下内容:

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

您可能已经注意到我们没有定义 value.serializer。这是因为 Quarkus 可以自动检测io.apicurio.registry.serde.avro.AvroKafkaSerializer 是适合这里的,基于 @Channel 声明、Movie 类型的结构以及 Apicurio Registry 库的存在。我们仍然需要定义 apicurio.registry.auto-register 属性。

如果您使用 Confluent Schema Registry,也不需要配置 value.serializer。它也会被自动检测。Confluent Schema Registry 中 apicurio.registry.auto-register 的对应项称为 auto.register.schemas。它默认为 true,因此在此示例中不必配置。如果您想禁用自动模式注册,可以将其显式设置为 false

Movie 消费者

因此,我们可以将包含我们 Movie 数据的记录写入 Kafka。这些数据使用 Avro 进行序列化。现在,是时候为它们实现一个消费者了。

让我们创建 ConsumedMovieResource,它将从 movies-from-kafka 通道消费 Movie 消息,并通过 Server-Sent Events 将其公开。

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

应用程序代码的最后一部分是在 application.properties 中配置 movies-from-kafka 通道。

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

您可能已经注意到我们没有定义 value.deserializer。这是因为 Quarkus 可以自动检测io.apicurio.registry.serde.avro.AvroKafkaDeserializer 是适合这里的,基于 @Channel 声明、Movie 类型的结构以及 Apicurio Registry 库的存在。我们也无需定义 apicurio.registry.use-specific-avro-reader 属性,它也是自动配置的。

如果您使用 Confluent Schema Registry,也不需要配置 value.deserializerspecific.avro.reader。它们都会被自动检测。

运行应用程序

在开发模式下启动应用程序

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

由于开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参阅Kafka 的开发服务Apicurio Registry 的开发服务

您可能已经注意到我们没有在任何地方配置模式注册表 URL。这是因为 Apicurio Registry 的开发服务会自动配置 Quarkus Messaging 中的所有 Kafka 通道以使用自动启动的注册表实例。

Apicurio Registry 除了其原生 API 之外,还公开了一个与 Confluent Schema Registry API 兼容的端点。因此,此自动配置同时适用于 Apicurio Registry serde 和 Confluent Schema Registry serde。

但是,请注意,没有为运行 Confluent Schema Registry 本身提供开发服务支持。如果您想使用正在运行的 Confluent Schema Registry 实例,请配置其 URL 以及 Kafka 代理的 URL。

kafka.bootstrap.servers=PLAINTEXT://:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=https://:8081

在第二个终端中使用 curl 查询 ConsumedMovieResource 资源。

curl -N https://:8080/consumed-movies

在第三个终端中,发布一些电影。

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  https://:8080/movies

观察第二个终端中打印的内容。您应该看到类似以下内容:

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

在 JVM 或 Native 模式下运行

当不在开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。让它们运行起来最简单的方法是使用 docker-compose 来启动相应的容器。

如果您使用 Confluent Schema Registry,您已经有一个正在运行且已配置的 Kafka 代理和 Confluent Schema Registry 实例。您可以忽略这里的 docker-compose 说明以及 Apicurio Registry 配置。

在项目根目录中创建一个 docker-compose.yaml 文件,其中包含以下内容:

version: '2'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:2.4.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

在启动应用程序之前,让我们先启动 Kafka 代理和 Apicurio Registry。

docker-compose up
要停止容器,请使用 docker-compose down。您也可以使用 docker-compose rm 清理容器。

您可以使用以下方式构建应用程序:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

并使用以下方式以 JVM 模式运行:

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到监听在 localhost:9092 的 Kafka 代理。您可以使用以下方式配置引导服务器: java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar

在命令行中指定注册表 URL 不是非常方便,因此您可以为 prod 配置文件添加一个配置属性。

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2

您可以使用以下方式构建原生可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

并使用以下方式运行:

./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

测试应用程序

如上所述,Kafka 和 Apicurio Registry 的开发服务会在开发模式和测试模式下自动启动和配置 Kafka 代理和 Apicurio Registry 实例。因此,我们无需自行设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。

首先,让我们在构建文件中添加 REST Client 和 Awaitility 的测试依赖项。

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

在测试中,我们将循环发送电影,并检查 ConsumedMovieResource 是否返回了我们发送的内容。

package org.acme.kafka;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}
我们修改了与项目一起生成的 MovieResourceTest。此测试类有一个子类 NativeMovieResourceIT,它针对原生可执行文件运行相同的测试。要运行它,请执行:
CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

手动设置

如果我们不能使用开发服务,并且想手动启动 Kafka 代理和 Apicurio Registry 实例,我们会定义一个 QuarkusTestResourceLifecycleManager

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.105.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

使用兼容版本的 Apicurio Registry

quarkus-apicurio-registry-avro 扩展依赖于 Apicurio Registry 客户端的最新版本,并且大多数 Apicurio Registry 服务器和客户端版本是向后兼容的。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。

例如,使用 Apicurio 开发服务,如果您将镜像名称设置为使用版本 2.1.5.Final

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final

您需要确保 apicurio-registry-serdes-avro-serde 依赖项和 REST 客户端 apicurio-common-rest-client-vertx 依赖项设置为兼容的版本。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-apicurio-registry-avro</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-vertx</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-client</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-common</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
    <version>2.1.5.Final</version>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-jdk</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-common-rest-client-vertx</artifactId>
    <version>0.1.5.Final</version>
</dependency>
build.gradle
dependencies {
    implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))

    ...

    implementation("io.quarkus:quarkus-apicurio-registry-avro")
    implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
        exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
        exclude group: "io.apicurio", module: "apicurio-registry-client"
        exclude group: "io.apicurio", module: "apicurio-registry-common"
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-client") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-common") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-common-rest-client-vertx") {
        version {
            strictly "0.1.5.Final"
        }
    }
}

apicurio-registry-clientapicurio-common-rest-client-vertx 的已知先前兼容版本如下:

  • apicurio-registry-client 2.1.5.Final 与 apicurio-common-rest-client-vertx 0.1.5.Final

  • apicurio-registry-client 2.3.1.Final 与 apicurio-common-rest-client-vertx 0.1.13.Final

使用 Confluent Schema Registry

如果您想使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-avro 扩展,而不是 quarkus-apicurio-registry-avro 扩展。此外,您还需要在 pom.xml / build.gradle 文件中添加一些依赖项和一个自定义 Maven 存储库。

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-avro</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-avro")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-avro-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-avro-serializer。在原生模式下,Quarkus 支持以下版本:6.2.x7.0.x7.1.x7.2.x7.3.x

对于版本 7.4.x 及更高版本,由于 Confluent Schema Serializer 的问题,您需要添加另一个依赖项。

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

对于任何其他版本,可能需要调整原生配置。

Avro 代码生成详情

在本指南中,我们使用了 Quarkus 代码生成机制从 Avro 模式生成 Java 文件。

在后台,该机制使用 org.apache.avro:avro-compiler

您可以使用以下配置属性来改变它的工作方式:

  • avro.codegen.[avsc|avdl|avpr].imports - 应首先编译的文件或目录列表,从而可以被后续编译的模式导入。请注意,导入的文件不应相互引用。所有路径都应相对于 src/[main|test]/avro 目录,或构建系统配置的任何源目录中的 avro 子目录。作为逗号分隔的列表传递。

  • avro.codegen.stringType - 用于 Avro 字符串的 Java 类型。可以是 CharSequenceStringUtf8 之一。默认为 String

  • avro.codegen.createOptionalGetters - 启用生成返回请求类型 Optional 的 getOptional…​ 方法。默认为 false

  • avro.codegen.enableDecimalLogicalType - 确定是否为 decimal 类型使用 Java 类,默认为 false

  • avro.codegen.createSetters - 确定是否为记录字段创建 setter。默认为 false

  • avro.codegen.gettersReturnOptional - 启用生成返回请求类型 Optional 的 get…​ 方法。默认为 false

  • avro.codegen.optionalGettersForNullableFieldsOnly,与 gettersReturnOptional 选项结合使用。如果设置了此选项,则仅为可为空的字段生成 Optional getter。如果字段是强制性的,则会生成常规 getter。默认为 false

进一步阅读

相关内容