Quarkus 应用入门 Apache Kafka

Apache Kafka 是一个流行的分布式流处理平台,它提供了一套独特的特性,例如消息保留、重放能力、消费者组等。Kafka 具有高度的可伸缩性、容错性,并且正成为许多现代系统的骨干。话虽如此,Kafka 并非唯一选择,为您的应用程序选择正确的消息传递技术可能具有挑战性。有很多文章可以帮助您做出决定,例如 这篇文章。本文将介绍 Kafka,稍后还将发布一篇关于 AMQP 的相关文章。

在本文中,您将学习如何在不到 10 个步骤中在 Quarkus 应用程序中开始使用 Apache Kafka。我们将使用 Reactive Messaging - 一种构建事件驱动微服务的声明式方法,但您也可以使用原生 Kafka API 或 Kafka Streams。

完整的代码可从 GitHub 获取。

步骤 1 - 生成您的项目

让我们从头开始,获取具有正确依赖项的新项目结构。前往 https://code.quarkus.io,输入您的 group id 和 artifact id。然后在扩展列表中,选择

  • SmallRye Reactive Messaging - Kafka Connector

  • RESTEasy Jackson

getting started kafka code

您可以禁用“示例代码”以避免生成的项目中包含示例。

然后,单击生成您的应用程序,将项目作为 zip 文件下载,解压缩,然后在您喜欢的 IDE 中加载它。

如果您打开生成的 pom.xml,您会看到已声明 quarkus-smallrye-reactive-messaging-kafkaquarkus-resteasy-jackson 依赖项,因此我们已准备好编写代码。

步骤 2 - 我们将交换什么?

我们需要交换一些东西。不那么有创意,让我们说我们将发送和接收 Movie 对象。在您的项目中,创建 org.acme.Movie 类,内容如下

package org.acme;

public class Movie {

    public String title;
    public int year;

}

在 Kafka 中,我们生产和消费记录。记录包含一个键和一个值。我们假设将使用电影的发行年份作为键,标题作为值。

我们还需要决定要在哪个主题上发送这些记录。让我们保持简单,将主题命名为movies

步骤 3 - 配置应用程序

如上所述,我们将使用 Reactive Messaging。当您使用 Reactive Messaging 时,您会将消息发送到一个通道,并从另一个通道接收它们。这些通道通过配置映射到底层的消息传递技术。在我们的应用程序中,我们必须指出我们的接收和发布通道将使用movies Kafka 通道。在 src/main/resources/application.properties 中,添加以下内容

# The Kafka broker location (defaults to localhost:9092)
kafka.bootstrap.servers=localhost:9092

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.movies-in.connector=smallrye-kafka
mp.messaging.incoming.movies-in.topic=movies
mp.messaging.incoming.movies-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.movies-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.movies-out.connector=smallrye-kafka
mp.messaging.outgoing.movies-out.topic=movies
mp.messaging.outgoing.movies-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer

在用 kafka.bootstrap.servers 配置了代理位置后,我们配置了我们的两个通道:movies-in(接收记录)和 movies-out(发布记录)。

我们使用 mp.messaging.incoming.movies-in 前缀来配置通道。connector 属性指示谁负责此通道,此处为 Kafka 连接器。我们还需要配置键和值的反序列化器。

要配置出站 movies-out 通道,我们使用 mp.messaging.outgoing.movies-out 前缀。除了指示谁负责该通道外,我们还需要配置键和值的序列化器。

步骤 4 - 将电影发布到 Kafka

现在,是时候向 Kafka 发送记录了。创建 org.acme.MovieProducer 类,内容如下

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MovieProducer {

    @Inject @Channel("movies-out")
    Emitter<Record<Integer, String>> emitter;

    public void sendMovieToKafka(Movie movie) {
        emitter.send(Record.of(movie.year, movie.title));
    }
}

在这个类中,我们注入了一个 Emitter,即负责向通道发送消息的对象。此发射器附加到 movies-out 通道(因此将消息发送到 Kafka)。我们发送 Record 对象,其中包含电影的发行年份作为键,其标题作为值。

因此,我们应用程序的其余部分可以简单地使用 sendMovieToKafka 方法将电影发送到我们的 Kafka 主题。

步骤 5 - 消费电影

现在,让我们看看另一边,从 Kafka 中检索电影。

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private final Logger logger = Logger.getLogger(MovieConsumer.class);

    @Incoming("movies-in")
    public void receive(Record<Integer, String> record) {
        logger.infof("Got a movie: %d - %s", record.key(), record.value());
    }
}

在这里,我们使用 @Incoming 注释来指示 Quarkus 在收到每条记录时调用 receive 方法。

步骤 6 - 从 REST 端点发送电影

从 REST 端点向 Kafka 发送消息是很常见的。为此,请创建 org.acme.MovieResource 类,内容如下

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    @Inject
    MovieProducer producer;

    @POST
    public Response send(Movie movie) {
        producer.sendMovieToKafka(movie);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

步骤 7 - 运行它!

首先,我们需要一个 Kafka 代理。您可以遵循 Apache Kafka 快速入门,或使用以下 docker-compose.yaml 文件

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.20.1-kafka-2.6.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.20.1-kafka-2.6.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

docker-compose.yaml 文件复制到您的项目中,然后在终端中运行以下命令来启动您的代理: docker-compose up -d

然后,使用以下命令运行应用程序

./mvnw quarkus:dev

应用程序以开发模式运行,这意味着您仍然可以更新代码。它将自动重新加载。

在另一个终端中,发出几个 HTTP POST 请求,例如

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
https://:8080/

在运行应用程序的终端中,您将看到

...
2021-01-13 09:29:41,087 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-13 09:29:41,114 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...

成功了!

步骤 8 - 本地打包

如果您已正确安装和配置 GraalVM,您可以将此应用程序打包为本地可执行文件

./mvnw package -Pnative

然后,使用以下命令执行您的本地可执行文件: ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner,您将获得一个使用 Kafka 的 Quarkus 应用程序,该应用程序在几毫秒内启动,并且消耗的内存非常少:在摄入 100 条记录后不到 30Mb!

$ rss getting-started-kafka-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
49321 30M ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner

总结

在不到 10 分钟的时间里,我们就拥有了一个使用 Apache Kafka 的新 Quarkus 应用程序。如果您想进一步了解,请查看 Kafka 指南