如何将 Kafka、Schema Registry 和 Avro 与 Quarkus 结合使用
在 Kafka 生态系统中,Apache Avro 是目前最常用的序列化协议。Avro 是一个数据序列化系统。结合 Kafka,它提供了基于模式、健壮且快速的二进制序列化。
在这篇博客文章中,我们将了解如何在 Quarkus 应用程序中使用带有 Schema Registry 的 Avro。本博客专注于 JVM 模式。我们将在另一篇文章中介绍原生模式。
我们将编写一个简单的应用程序,接收 HTTP 请求,将数据写入 Kafka,并从中读取数据。为了简化,同一个应用程序将写入 Kafka 并从中读取,但在现实世界中,这将是不同的应用程序。
如何开始
好了,我们从头开始。前往 https://code.quarkus.io 创建你的项目,并选择以下扩展
-
RESTEasy JSON-B
-
SmallRye Reactive Messaging - Kafka Connector
-
Apache Avro
下载项目并在你喜欢的 IDE 中打开它。
我们需要在生成的 pom.xml
中添加一些内容。打开 pom.xml
文件并添加以下 dependency
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>1.2.2.Final</version>
<exclusions>
<exclusion>
<groupId>org.jboss.spec.javax.interceptor</groupId>
<artifactId>jboss-interceptors-api_1.2_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
此依赖项提供了 Avro 序列化器和反序列化器。这个 *serde* 有多个版本。在本篇博客文章中,我们使用的是 Apicurio 提供的。你也可以使用 Confluent 的(该工件不在 Maven Central 上,因此你需要添加一个额外的仓库)。
我们还需要添加 avro-maven-plugin
。在 <build><plugins>
下,添加
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
此插件从位于 src/main/avro
目录下的 Avro schema 文件生成代码。设置好之后,我们终于可以开始编写一些代码了。
Avro Schema
首先,我们需要编写代表将在 Kafka 中读写的对象的 schema。创建 src/main/avro/movie.avsc
文件,内容如下
{
"namespace": "me.escoffier.quarkus",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
在 pom.xml
文件中配置的 avro-maven-plugin
会生成带有 title
和 year
属性的 me.escoffier.quarkus.Movie
类。要生成该类,请运行
mvn generate-sources
Movie 资源
我们将编写的第一个类接收 HTTP 请求并将(Movie)数据写入 Kafka。创建 src/main/java/me/escoffier/MovieResource.java
文件,内容如下
package me.escoffier;
import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/movies")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {
private static final Logger LOGGER =
Logger.getLogger("MovieResource");
@Inject @Channel("movies") Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka",
movie.getTitle()
);
emitter.send(movie);
return Response.accepted().build();
}
}
这个 JAX-RS 资源很简单。它有一个端点方法,在 /movies
上接收 JSON 数据。RESTEasy 会自动将 JSON 文档映射到 Movie
对象。正如 avsc
文件中所述,预期的 JSON 包含两个字段:title
和 year
。
在使用 Quarkus 和 Reactive Messaging 时,你不会直接与 Kafka 交互。你注入一个 Emitter
,它将一个对象(我们的 movie)发送到一个 *channel*。应用程序配置将此 channel 映射到一个 Kafka 主题。
说到配置,打开 src/main/resources/application.properties
文件,并添加
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/api
mp.messaging.outgoing.movies.connector=smallrye-kafka
mp.messaging.outgoing.movies.topic=movies
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
此配置需要一些解释。首先,mp.messaging.connector.smallrye-kafka.apicurio.registry.url
配置了 Schema Registry 的 URL。如果你使用的是 Confluent *serde* 而不是 Apicurio 的,属性名称是 mp.messaging.connector.smallrye-kafka.schema.registry.url
。
mp.messaging.outgoing.movies
配置了 movies
channel。connector
属性表明 SmallRye Kafka 连接器管理该 channel。topic
属性(在这种情况下可以省略,因为它与 channel 名称匹配)指定了主题的名称。value.serializer
设置了要使用的序列化器。这里我们使用 Apicurio 提供的 io.apicurio.registry.utils.serde.AvroKafkaSerializer
。registry.*
属性配置了注册表如何处理 schema。
Movie 消费者
应用程序的后半部分更简单。它只是记录收到的 movie。
创建 src/main/java/me/escoffier/MovieConsumer.java
文件,内容如下
package me.escoffier;
import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MovieConsumer {
private static final Logger LOGGER =
Logger.getLogger("MovieConsumer");
@Incoming("movies-from-kafka")
public void receive(Movie movie) {
LOGGER.infof("Received movie: %s (%d)",
movie.getTitle(), movie.getYear());
}
}
@Incoming
注释表示该方法将在 movies-from-kafka
channel 上传输的每个 Movie
对象调用。在这种情况下,我们只是写了一条日志消息。
我们快完成了。我们需要配置从 Kafka 的接收。重新打开 application.properties
并添加
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
mp.messaging.incoming.movies-from-kafka.topic=movies
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
这些属性将 movies-from-kafka
映射到 movies
Kafka 主题。它还配置了反序列化器(io.apicurio.registry.utils.serde.AvroKafkaDeserializer
)。我们禁用了 Kafka 自动提交(enable.auto.commit=false
),因为 Reactive Messaging 会为你处理偏移量提交。
由于发送者和接收者位于同一个应用程序中,我们不能使用相同的 channel 名称。 |
一点基础设施
在运行应用程序之前,我们需要
-
一个 Kafka 代理
-
Apicurio Schema Registry
在项目根目录下创建 docker-compose.yaml
文件,内容如下
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:1.2.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
APPLICATION_ID: registry_id
APPLICATION_SERVER: localhost:9000
这个 docker-compose
文件启动了我们所需的一切。你可能想知道 Apicurio Registry 的一些属性。实际上,Apicurio Registry 本身也是一个 Quarkus 应用程序。
开始运行
好了,让我们开始吧。首先,使用以下命令启动基础设施
docker-compose up -d
使用 docker-compose down; docker-compose rm 停止基础设施 |
然后,启动应用程序
mvn compile quarkus:dev
启动后,打开另一个终端并 *post* movie
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
https://:8080/movies
在应用程序日志中,你应该会看到
2020-09-11 16:42:22,597 INFO [MovieResource] (executor-thread-1) Sending movie The Shawshank Redemption to Kafka
2020-09-11 16:42:22,619 INFO [MovieResource] (executor-thread-1) Sending movie The Godfather to Kafka
2020-09-11 16:42:22,624 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Shawshank Redemption (1994)
2020-09-11 16:42:22,641 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Godfather (1972)
2020-09-11 16:42:22,644 INFO [MovieResource] (executor-thread-1) Sending movie The Dark Knight to Kafka
2020-09-11 16:42:22,663 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Dark Knight (2008)
2020-09-11 16:42:22,669 INFO [MovieResource] (executor-thread-1) Sending movie 12 Angry Men to Kafka
2020-09-11 16:42:22,688 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: 12 Angry Men (1957)
结论
完成!只需几行代码和一些配置,我们就可以在 Quarkus 应用程序中集成 Kafka、Avro 和 Schema Registry!你可以在 https://github.com/cescoffier/quarkus-kafka-and-avro 找到此演示的代码。README 文件包含运行它的说明。
Quarkus 1.9 将在消息和响应式方面带来许多新功能。敬请期待!