如何将 Kafka、Schema Registry 和 Avro 与 Quarkus 结合使用

在 Kafka 生态系统中,Apache Avro 是目前最常用的序列化协议。Avro 是一个数据序列化系统。结合 Kafka,它提供了基于模式、健壮且快速的二进制序列化。

在这篇博客文章中,我们将了解如何在 Quarkus 应用程序中使用带有 Schema Registry 的 Avro。本博客专注于 JVM 模式。我们将在另一篇文章中介绍原生模式。

我们将编写一个简单的应用程序,接收 HTTP 请求,将数据写入 Kafka,并从中读取数据。为了简化,同一个应用程序将写入 Kafka 并从中读取,但在现实世界中,这将是不同的应用程序。

architecture

如何开始

好了,我们从头开始。前往 https://code.quarkus.io 创建你的项目,并选择以下扩展

  • RESTEasy JSON-B

  • SmallRye Reactive Messaging - Kafka Connector

  • Apache Avro

project

下载项目并在你喜欢的 IDE 中打开它。

我们需要在生成的 pom.xml 中添加一些内容。打开 pom.xml 文件并添加以下 dependency

<dependency>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-utils-serde</artifactId>
  <version>1.2.2.Final</version>
  <exclusions>
    <exclusion>
      <groupId>org.jboss.spec.javax.interceptor</groupId>
      <artifactId>jboss-interceptors-api_1.2_spec</artifactId>
    </exclusion>
  </exclusions>
</dependency>

此依赖项提供了 Avro 序列化器和反序列化器。这个 *serde* 有多个版本。在本篇博客文章中,我们使用的是 Apicurio 提供的。你也可以使用 Confluent 的(该工件不在 Maven Central 上,因此你需要添加一个额外的仓库)。

我们还需要添加 avro-maven-plugin。在 <build><plugins> 下,添加

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.9.2</version>
  <executions>
    <execution>
    <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>src/main/avro</sourceDirectory>
        <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
        <stringType>String</stringType>
      </configuration>
      </execution>
  </executions>
</plugin>

此插件从位于 src/main/avro 目录下的 Avro schema 文件生成代码。设置好之后,我们终于可以开始编写一些代码了。

Avro Schema

首先,我们需要编写代表将在 Kafka 中读写的对象的 schema。创建 src/main/avro/movie.avsc 文件,内容如下

{
  "namespace": "me.escoffier.quarkus",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "year",
      "type": "int"
    }
  ]
}

pom.xml 文件中配置的 avro-maven-plugin 会生成带有 titleyear 属性的 me.escoffier.quarkus.Movie 类。要生成该类,请运行

mvn generate-sources

Movie 资源

我们将编写的第一个类接收 HTTP 请求并将(Movie)数据写入 Kafka。创建 src/main/java/me/escoffier/MovieResource.java 文件,内容如下

package me.escoffier;

import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/movies")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    private static final Logger LOGGER =
        Logger.getLogger("MovieResource");

    @Inject @Channel("movies") Emitter<Movie> emitter;


    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka",
            movie.getTitle()
        );
        emitter.send(movie);
        return Response.accepted().build();
    }

}

这个 JAX-RS 资源很简单。它有一个端点方法,在 /movies 上接收 JSON 数据。RESTEasy 会自动将 JSON 文档映射到 Movie 对象。正如 avsc 文件中所述,预期的 JSON 包含两个字段:titleyear

在使用 Quarkus 和 Reactive Messaging 时,你不会直接与 Kafka 交互。你注入一个 Emitter,它将一个对象(我们的 movie)发送到一个 *channel*。应用程序配置将此 channel 映射到一个 Kafka 主题。

说到配置,打开 src/main/resources/application.properties 文件,并添加

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/api

mp.messaging.outgoing.movies.connector=smallrye-kafka
mp.messaging.outgoing.movies.topic=movies
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

此配置需要一些解释。首先,mp.messaging.connector.smallrye-kafka.apicurio.registry.url 配置了 Schema Registry 的 URL。如果你使用的是 Confluent *serde* 而不是 Apicurio 的,属性名称是 mp.messaging.connector.smallrye-kafka.schema.registry.url

mp.messaging.outgoing.movies 配置了 movies channel。connector 属性表明 SmallRye Kafka 连接器管理该 channel。topic 属性(在这种情况下可以省略,因为它与 channel 名称匹配)指定了主题的名称。value.serializer 设置了要使用的序列化器。这里我们使用 Apicurio 提供的 io.apicurio.registry.utils.serde.AvroKafkaSerializerregistry.* 属性配置了注册表如何处理 schema。

Movie 消费者

应用程序的后半部分更简单。它只是记录收到的 movie。

创建 src/main/java/me/escoffier/MovieConsumer.java 文件,内容如下

package me.escoffier;

import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private static final Logger LOGGER =
        Logger.getLogger("MovieConsumer");

    @Incoming("movies-from-kafka")
    public void receive(Movie movie) {
        LOGGER.infof("Received movie: %s (%d)",
            movie.getTitle(), movie.getYear());
    }

}

@Incoming 注释表示该方法将在 movies-from-kafka channel 上传输的每个 Movie 对象调用。在这种情况下,我们只是写了一条日志消息。

我们快完成了。我们需要配置从 Kafka 的接收。重新打开 application.properties 并添加

mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
mp.messaging.incoming.movies-from-kafka.topic=movies
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

这些属性将 movies-from-kafka 映射到 movies Kafka 主题。它还配置了反序列化器(io.apicurio.registry.utils.serde.AvroKafkaDeserializer)。我们禁用了 Kafka 自动提交(enable.auto.commit=false),因为 Reactive Messaging 会为你处理偏移量提交。

由于发送者和接收者位于同一个应用程序中,我们不能使用相同的 channel 名称。

一点基础设施

在运行应用程序之前,我们需要

  • 一个 Kafka 代理

  • Apicurio Schema Registry

在项目根目录下创建 docker-compose.yaml 文件,内容如下

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:1.2.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod
      KAFKA_BOOTSTRAP_SERVERS: localhost:9092
      APPLICATION_ID: registry_id
      APPLICATION_SERVER: localhost:9000

这个 docker-compose 文件启动了我们所需的一切。你可能想知道 Apicurio Registry 的一些属性。实际上,Apicurio Registry 本身也是一个 Quarkus 应用程序。

开始运行

好了,让我们开始吧。首先,使用以下命令启动基础设施

docker-compose up -d
使用 docker-compose down; docker-compose rm 停止基础设施

然后,启动应用程序

mvn compile quarkus:dev

启动后,打开另一个终端并 *post* movie

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  https://:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  https://:8080/movies

在应用程序日志中,你应该会看到

2020-09-11 16:42:22,597 INFO  [MovieResource] (executor-thread-1) Sending movie The Shawshank Redemption to Kafka
2020-09-11 16:42:22,619 INFO  [MovieResource] (executor-thread-1) Sending movie The Godfather to Kafka
2020-09-11 16:42:22,624 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Shawshank Redemption (1994)
2020-09-11 16:42:22,641 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Godfather (1972)
2020-09-11 16:42:22,644 INFO  [MovieResource] (executor-thread-1) Sending movie The Dark Knight to Kafka
2020-09-11 16:42:22,663 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Dark Knight (2008)
2020-09-11 16:42:22,669 INFO  [MovieResource] (executor-thread-1) Sending movie 12 Angry Men to Kafka
2020-09-11 16:42:22,688 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: 12 Angry Men (1957)

结论

完成!只需几行代码和一些配置,我们就可以在 Quarkus 应用程序中集成 Kafka、Avro 和 Schema Registry!你可以在 https://github.com/cescoffier/quarkus-kafka-and-avro 找到此演示的代码。README 文件包含运行它的说明。

Quarkus 1.9 将在消息和响应式方面带来许多新功能。敬请期待!