将 Apache Kafka 与 Schema Registry 和 Avro 一起使用
本指南介绍您的 Quarkus 应用程序如何使用 Apache Kafka、Avro 序列化记录,并连接到模式注册表(例如 Confluent Schema Registry 或 Apicurio Registry)。
如果您不熟悉 Kafka,特别是 Quarkus 中的 Kafka,请先查阅使用 Apache Kafka 和 Reactive Messaging指南。
先决条件
要完成本指南,您需要
-
大约 30 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME -
Apache Maven 3.9.9
-
Docker 和 Docker Compose 或 Podman,以及 Docker Compose
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
架构
在本指南中,我们将实现一个 REST 资源,即 MovieResource,它将消耗电影 DTO 并将它们放入 Kafka 主题。
然后,我们将实现一个消费者,它将从同一个主题消费并收集消息。收集到的消息将通过另一个资源 ConsumedMovieResource,使用 Server-Sent Events 进行公开。
电影将使用 Avro 进行序列化和反序列化。描述电影的模式存储在 Apicurio Registry 中。如果您使用的是 Confluent Avro serde 和 Confluent Schema Registry,概念也是一样的。
解决方案
我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载一个存档。
解决方案位于 kafka-avro-schema-quickstart 目录中。
创建 Maven 项目
首先,我们需要一个新项目。使用以下命令创建一个新项目
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D参数用双引号括起来,例如"-DprojectArtifactId=kafka-avro-schema-quickstart"
|
如果您使用 Confluent Schema Registry,则不需要 |
Avro 模式
Apache Avro 是一个数据序列化系统。数据结构使用模式进行描述。我们需要做的第一件事是创建描述 Movie 结构的模式。创建一个名为 src/main/avro/movie.avsc 的文件,其中包含我们记录(Kafka 消息)的模式。
{
"namespace": "org.acme.kafka.quarkus",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
如果您使用以下方式构建项目
quarkus build
./mvnw install
./gradlew build
movies.avsc 将被编译成一个 Movie.java 文件,位于 target/generated-sources/avsc 目录中。
请查看 Avro 规范,了解更多关于 Avro 语法和支持类型的信息。
使用 Quarkus,无需使用特定的 Maven 插件来处理 Avro 模式,这一切都由 quarkus-avro 扩展为您完成! |
如果您使用以下方式运行项目
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
您对模式文件的更改将自动应用于生成的 Java 文件。
Movie 生产者
在定义了模式之后,我们现在可以开始实现 MovieResource。
让我们打开 MovieResource,注入一个 Movie DTO 的Emitter,并实现一个 @POST 方法,该方法消耗 Movie 并通过 Emitter 发送它。
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
现在,我们需要将 movies 通道(Emitter 发送到此通道)映射到一个 Kafka 主题。为了实现这一点,请编辑 application.properties 文件,并添加以下内容:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
|
您可能已经注意到我们没有定义 如果您使用 Confluent Schema Registry,也不需要配置 |
Movie 消费者
因此,我们可以将包含我们 Movie 数据的记录写入 Kafka。这些数据使用 Avro 进行序列化。现在,是时候为它们实现一个消费者了。
让我们创建 ConsumedMovieResource,它将从 movies-from-kafka 通道消费 Movie 消息,并通过 Server-Sent Events 将其公开。
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
应用程序代码的最后一部分是在 application.properties 中配置 movies-from-kafka 通道。
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
|
您可能已经注意到我们没有定义 如果您使用 Confluent Schema Registry,也不需要配置 |
运行应用程序
在开发模式下启动应用程序
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
由于开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参阅Kafka 的开发服务和Apicurio Registry 的开发服务。
|
您可能已经注意到我们没有在任何地方配置模式注册表 URL。这是因为 Apicurio Registry 的开发服务会自动配置 Quarkus Messaging 中的所有 Kafka 通道以使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 之外,还公开了一个与 Confluent Schema Registry API 兼容的端点。因此,此自动配置同时适用于 Apicurio Registry serde 和 Confluent Schema Registry serde。 但是,请注意,没有为运行 Confluent Schema Registry 本身提供开发服务支持。如果您想使用正在运行的 Confluent Schema Registry 实例,请配置其 URL 以及 Kafka 代理的 URL。
|
在第二个终端中使用 curl 查询 ConsumedMovieResource 资源。
curl -N https://:8080/consumed-movies
在第三个终端中,发布一些电影。
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
https://:8080/movies
观察第二个终端中打印的内容。您应该看到类似以下内容:
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
在 JVM 或 Native 模式下运行
当不在开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。让它们运行起来最简单的方法是使用 docker-compose 来启动相应的容器。
如果您使用 Confluent Schema Registry,您已经有一个正在运行且已配置的 Kafka 代理和 Confluent Schema Registry 实例。您可以忽略这里的 docker-compose 说明以及 Apicurio Registry 配置。 |
在项目根目录中创建一个 docker-compose.yaml 文件,其中包含以下内容:
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
在启动应用程序之前,让我们先启动 Kafka 代理和 Apicurio Registry。
docker-compose up
要停止容器,请使用 docker-compose down。您也可以使用 docker-compose rm 清理容器。 |
您可以使用以下方式构建应用程序:
quarkus build
./mvnw install
./gradlew build
并使用以下方式以 JVM 模式运行:
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到监听在 localhost:9092 的 Kafka 代理。您可以使用以下方式配置引导服务器: java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar |
在命令行中指定注册表 URL 不是非常方便,因此您可以为 prod 配置文件添加一个配置属性。
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2
您可以使用以下方式构建原生可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
并使用以下方式运行:
./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
测试应用程序
如上所述,Kafka 和 Apicurio Registry 的开发服务会在开发模式和测试模式下自动启动和配置 Kafka 代理和 Apicurio Registry 实例。因此,我们无需自行设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。
首先,让我们在构建文件中添加 REST Client 和 Awaitility 的测试依赖项。
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
在测试中,我们将循环发送电影,并检查 ConsumedMovieResource 是否返回了我们发送的内容。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
我们修改了与项目一起生成的 MovieResourceTest。此测试类有一个子类 NativeMovieResourceIT,它针对原生可执行文件运行相同的测试。要运行它,请执行: |
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手动设置
如果我们不能使用开发服务,并且想手动启动 Kafka 代理和 Apicurio Registry 实例,我们会定义一个 QuarkusTestResourceLifecycleManager。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
使用兼容版本的 Apicurio Registry
quarkus-apicurio-registry-avro 扩展依赖于 Apicurio Registry 客户端的最新版本,并且大多数 Apicurio Registry 服务器和客户端版本是向后兼容的。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。
例如,使用 Apicurio 开发服务,如果您将镜像名称设置为使用版本 2.1.5.Final。
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
您需要确保 apicurio-registry-serdes-avro-serde 依赖项和 REST 客户端 apicurio-common-rest-client-vertx 依赖项设置为兼容的版本。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-avro")
implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client 和 apicurio-common-rest-client-vertx 的已知先前兼容版本如下:
-
apicurio-registry-client2.1.5.Final 与apicurio-common-rest-client-vertx0.1.5.Final -
apicurio-registry-client2.3.1.Final 与apicurio-common-rest-client-vertx0.1.13.Final
使用 Confluent Schema Registry
如果您想使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-avro 扩展,而不是 quarkus-apicurio-registry-avro 扩展。此外,您还需要在 pom.xml / build.gradle 文件中添加一些依赖项和一个自定义 Maven 存储库。
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-avro")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-avro-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-avro-serializer。在原生模式下,Quarkus 支持以下版本:6.2.x、7.0.x、7.1.x、7.2.x、7.3.x。
对于版本 7.4.x 及更高版本,由于 Confluent Schema Serializer 的问题,您需要添加另一个依赖项。
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
对于任何其他版本,可能需要调整原生配置。
Avro 代码生成详情
在本指南中,我们使用了 Quarkus 代码生成机制从 Avro 模式生成 Java 文件。
在后台,该机制使用 org.apache.avro:avro-compiler。
您可以使用以下配置属性来改变它的工作方式:
-
avro.codegen.[avsc|avdl|avpr].imports- 应首先编译的文件或目录列表,从而可以被后续编译的模式导入。请注意,导入的文件不应相互引用。所有路径都应相对于src/[main|test]/avro目录,或构建系统配置的任何源目录中的avro子目录。作为逗号分隔的列表传递。 -
avro.codegen.stringType- 用于 Avro 字符串的 Java 类型。可以是CharSequence、String或Utf8之一。默认为String。 -
avro.codegen.createOptionalGetters- 启用生成返回请求类型 Optional 的getOptional…方法。默认为false。 -
avro.codegen.enableDecimalLogicalType- 确定是否为 decimal 类型使用 Java 类,默认为false。 -
avro.codegen.createSetters- 确定是否为记录字段创建 setter。默认为false。 -
avro.codegen.gettersReturnOptional- 启用生成返回请求类型 Optional 的get…方法。默认为false。 -
avro.codegen.optionalGettersForNullableFieldsOnly,与gettersReturnOptional选项结合使用。如果设置了此选项,则仅为可为空的字段生成 Optional getter。如果字段是强制性的,则会生成常规 getter。默认为false。
进一步阅读
-
如何使用 Quarkus 进行 Kafka、Schema Registry 和 Avro - 本指南基于一篇博客文章。它对 Avro 和模式注册表的概念进行了很好的介绍。