将 Apache Kafka 与 Schema Registry 和 JSON Schema 一起使用
本指南展示了您的 Quarkus 应用程序如何使用 Apache Kafka、JSON Schema 序列化记录,并连接到模式注册表(例如 Confluent Schema Registry 或 Apicurio Registry)。
如果您不熟悉 Kafka,尤其是 Quarkus 中的 Kafka,请先阅读 使用 Reactive Messaging 的 Apache Kafka 指南。
先决条件
要完成本指南,您需要
-
大约 30 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
Docker 和 Docker Compose 或 Podman,以及 Docker Compose
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
架构
在本指南中,我们将实现一个 REST 资源,即 MovieResource
,它将消费电影 DTO 并将它们放入 Kafka 主题中。
然后,我们将实现一个消费者,它将从同一主题消费和收集消息。收集的消息将通过另一个资源 ConsumedMovieResource
,通过 Server-Sent Events 公开。
电影 将使用 JSON Schema 进行序列化和反序列化。描述 Movie 的模式存储在 Apicurio Registry 中。如果您使用 Confluent JSON Schema serde 和 Confluent Schema Registry,则适用相同的概念。
解决方案
我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 仓库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或者下载 归档文件。
解决方案位于 kafka-json-schema-quickstart
目录中。
创建 Maven 项目
首先,我们需要一个新项目。使用以下命令创建一个新项目
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=kafka-json-schema-quickstart"
如果您使用 Confluent Schema Registry,则不需要 |
Json Schema
Json Schema 是一种数据序列化系统。数据结构使用模式描述。我们需要做的第一件事是创建一个描述 Movie
结构的模式。创建一个名为 src/main/resources/json-schema.json
的文件,其中包含我们记录(Kafka 消息)的模式
{
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
"title": "Movie",
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The movie's title."
},
"year": {
"type": "integer",
"description": "The movie's year."
}
}
}
请注意,无法从 JSON Schema 定义自动生成 Java 类。因此,您必须按如下方式定义 Java 类,以便序列化过程可以使用它
package org.acme.kafka;
public class Movie {
private String title;
private Integer year;
public Movie() {
}
public Movie(String title, Integer year) {
this.title = title;
this.year = year;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
}
Movie
生产者
定义了模式后,我们现在可以开始实现 MovieResource
。
让我们打开 MovieResource
,注入一个 Emitter
的 Movie
DTO,并实现一个 @POST
方法,该方法消费 Movie
并通过 Emitter
发送它
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
现在,我们需要将 movies
通道(Emitter
发送到此通道)映射 到 Kafka 主题,并将模式映射 到此通道上使用。为此,请编辑 application.properties
文件,并添加以下内容
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false
# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
请注意,与 avro 序列化不同,自动检测 不能与 JSON Schema 一起使用,因此我们必须定义 如果您使用 Confluent Schema Registry,在这种情况下,您还必须将 |
Movie
消费者
因此,我们可以将包含 Movie
数据的记录写入 Kafka。该数据使用 JSON Schema 进行序列化。现在,是时候为它们实现一个消费者了。
让我们创建 ConsumedMovieResource
,它将从 movies-from-kafka
通道消费 Movie
消息,并通过 Server-Sent Events 公开它
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
应用程序代码的最后一部分是在 application.properties
中配置 movies-from-kafka
通道
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
同样,与 Avro 不同,我们必须定义 如果您使用 Confluent Schema Registry,您还必须将 |
运行应用程序
在开发模式下启动应用程序
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
感谢 Dev Services,Kafka broker 和 Apicurio Registry 实例会自动启动。有关详细信息,请参阅 Kafka 的 Dev Services 和 Apicurio Registry 的 Dev Services。
您可能已经注意到,我们没有在任何地方配置模式注册表 URL。这是因为 Apicurio Registry 的 Dev Services 配置了 Quarkus Messaging 中的所有 Kafka 通道,以使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 外,还公开了一个与 Confluent Schema Registry API 兼容的端点。因此,这种自动配置适用于 Apicurio Registry serde 和 Confluent Schema Registry serde。 但是请注意,没有 Dev Services 支持运行 Confluent Schema Registry 本身。如果您想使用正在运行的 Confluent Schema Registry 实例,请配置其 URL 以及 Kafka broker 的 URL
|
在第二个终端中,使用 curl
查询 ConsumedMovieResource
资源
curl -N https://:8080/consumed-movies
在第三个终端中,发布几部电影
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
https://:8080/movies
观察在第二个终端中打印的内容。您应该看到类似以下内容
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
在 JVM 或 Native 模式下运行
不在开发或测试模式下运行时,您需要启动自己的 Kafka broker 和 Apicurio Registry。运行它们的最佳方法是使用 docker-compose
启动相应的容器。
如果您使用 Confluent Schema Registry,您已经运行并配置了 Kafka broker 和 Confluent Schema Registry 实例。您可以忽略此处的 docker-compose 说明以及 Apicurio Registry 配置。 |
在项目根目录创建一个 docker-compose.yaml
文件,内容如下
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
在启动应用程序之前,让我们首先启动 Kafka broker 和 Apicurio Registry
docker-compose up
要停止容器,请使用 docker-compose down 。您还可以使用 docker-compose rm 清理容器 |
您可以使用以下命令构建应用程序
quarkus build
./mvnw install
./gradlew build
并使用以下命令在 JVM 模式下运行它
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到侦听 localhost:9092 的 Kafka broker。您可以使用以下命令配置 bootstrap 服务器:java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar |
在命令行上指定注册表 URL 不是非常方便,因此您可以仅为 prod
配置文件添加配置属性
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2
您可以使用以下命令构建 native 可执行文件
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
并使用以下命令运行它
./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
测试应用程序
如上所述,Kafka 和 Apicurio Registry 的 Dev Services 会在开发模式和测试中自动启动和配置 Kafka broker 和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们可以专注于编写测试。
首先,让我们在构建文件中添加对 REST Client 和 Awaitility 的测试依赖项
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
在测试中,我们将循环发送电影,并检查 ConsumedMovieResource
是否返回我们发送的内容。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
我们修改了随项目一起生成的 MovieResourceTest 。这个测试类有一个子类 NativeMovieResourceIT ,它针对 native 可执行文件运行相同的测试。要运行它,请执行 |
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手动设置
如果我们无法使用 Dev Services 并想要手动启动 Kafka broker 和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
使用兼容版本的 Apicurio Registry
quarkus-apicurio-registry-json-schema
扩展依赖于 Apicurio Registry 客户端的最新版本,并且 Apicurio Registry 服务器和客户端的大多数版本都是向后兼容的。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。
例如,对于 Apicurio Dev Service,如果您设置图像名称以使用版本 2.1.5.Final
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
您需要确保 apicurio-registry-serdes-json-schema-serde
依赖项和 REST 客户端 apicurio-common-rest-client-vertx
依赖项设置为兼容版本
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-json-schema</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
和 apicurio-common-rest-client-vertx
的已知先前兼容版本如下
-
apicurio-registry-client
2.1.5.Final 与apicurio-common-rest-client-vertx
0.1.5.Final -
apicurio-registry-client
2.3.1.Final 与apicurio-common-rest-client-vertx
0.1.13.Final
使用 Confluent Schema Registry
如果要使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-json-schema
扩展,而不是 quarkus-apicurio-registry-json-schema
扩展。此外,您需要向 pom.xml
/ build.gradle
文件添加一些依赖项和一个自定义 Maven 仓库
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-json-schema")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-json-schema-serializer
。在 native 模式下,Quarkus 支持以下版本:6.2.x
、7.0.x
、7.1.x
、7.2.x
、7.3.x
。
对于版本 7.4.x
和 7.5.x
,由于 Confluent Schema Serializer 的问题,您需要添加另一个依赖项
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
对于任何其他版本,可能需要调整 native 配置。