Quarkus 应用入门 Apache Kafka
Apache Kafka 是一个流行的分布式流处理平台,它提供了一套独特的特性,例如消息保留、重放能力、消费者组等。Kafka 具有高度的可伸缩性、容错性,并且正成为许多现代系统的骨干。话虽如此,Kafka 并非唯一选择,为您的应用程序选择正确的消息传递技术可能具有挑战性。有很多文章可以帮助您做出决定,例如 这篇文章。本文将介绍 Kafka,稍后还将发布一篇关于 AMQP 的相关文章。
在本文中,您将学习如何在不到 10 个步骤中在 Quarkus 应用程序中开始使用 Apache Kafka。我们将使用 Reactive Messaging - 一种构建事件驱动微服务的声明式方法,但您也可以使用原生 Kafka API 或 Kafka Streams。
完整的代码可从 GitHub 获取。 |
步骤 1 - 生成您的项目
让我们从头开始,获取具有正确依赖项的新项目结构。前往 https://code.quarkus.io,输入您的 group id 和 artifact id。然后在扩展列表中,选择
-
SmallRye Reactive Messaging - Kafka Connector
-
RESTEasy Jackson
您可以禁用“示例代码”以避免生成的项目中包含示例。 |
然后,单击生成您的应用程序,将项目作为 zip 文件下载,解压缩,然后在您喜欢的 IDE 中加载它。
如果您打开生成的 pom.xml
,您会看到已声明 quarkus-smallrye-reactive-messaging-kafka
和 quarkus-resteasy-jackson
依赖项,因此我们已准备好编写代码。
步骤 2 - 我们将交换什么?
我们需要交换一些东西。不那么有创意,让我们说我们将发送和接收 Movie
对象。在您的项目中,创建 org.acme.Movie
类,内容如下
package org.acme;
public class Movie {
public String title;
public int year;
}
在 Kafka 中,我们生产和消费记录。记录包含一个键和一个值。我们假设将使用电影的发行年份作为键,标题作为值。
我们还需要决定要在哪个主题上发送这些记录。让我们保持简单,将主题命名为movies。
步骤 3 - 配置应用程序
如上所述,我们将使用 Reactive Messaging。当您使用 Reactive Messaging 时,您会将消息发送到一个通道,并从另一个通道接收它们。这些通道通过配置映射到底层的消息传递技术。在我们的应用程序中,我们必须指出我们的接收和发布通道将使用movies Kafka 通道。在 src/main/resources/application.properties
中,添加以下内容
# The Kafka broker location (defaults to localhost:9092)
kafka.bootstrap.servers=localhost:9092
# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.movies-in.connector=smallrye-kafka
mp.messaging.incoming.movies-in.topic=movies
mp.messaging.incoming.movies-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.movies-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.movies-out.connector=smallrye-kafka
mp.messaging.outgoing.movies-out.topic=movies
mp.messaging.outgoing.movies-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
在用 kafka.bootstrap.servers
配置了代理位置后,我们配置了我们的两个通道:movies-in
(接收记录)和 movies-out
(发布记录)。
我们使用 mp.messaging.incoming.movies-in
前缀来配置通道。connector
属性指示谁负责此通道,此处为 Kafka 连接器。我们还需要配置键和值的反序列化器。
要配置出站 movies-out
通道,我们使用 mp.messaging.outgoing.movies-out
前缀。除了指示谁负责该通道外,我们还需要配置键和值的序列化器。
步骤 4 - 将电影发布到 Kafka
现在,是时候向 Kafka 发送记录了。创建 org.acme.MovieProducer
类,内容如下
package org.acme;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class MovieProducer {
@Inject @Channel("movies-out")
Emitter<Record<Integer, String>> emitter;
public void sendMovieToKafka(Movie movie) {
emitter.send(Record.of(movie.year, movie.title));
}
}
在这个类中,我们注入了一个 Emitter
,即负责向通道发送消息的对象。此发射器附加到 movies-out
通道(因此将消息发送到 Kafka)。我们发送 Record
对象,其中包含电影的发行年份作为键,其标题作为值。
因此,我们应用程序的其余部分可以简单地使用 sendMovieToKafka
方法将电影发送到我们的 Kafka 主题。
步骤 5 - 消费电影
现在,让我们看看另一边,从 Kafka 中检索电影。
package org.acme;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MovieConsumer {
private final Logger logger = Logger.getLogger(MovieConsumer.class);
@Incoming("movies-in")
public void receive(Record<Integer, String> record) {
logger.infof("Got a movie: %d - %s", record.key(), record.value());
}
}
在这里,我们使用 @Incoming
注释来指示 Quarkus 在收到每条记录时调用 receive
方法。
步骤 6 - 从 REST 端点发送电影
从 REST 端点向 Kafka 发送消息是很常见的。为此,请创建 org.acme.MovieResource
类,内容如下
package org.acme;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {
@Inject
MovieProducer producer;
@POST
public Response send(Movie movie) {
producer.sendMovieToKafka(movie);
// Return an 202 - Accepted response.
return Response.accepted().build();
}
}
步骤 7 - 运行它!
首先,我们需要一个 Kafka 代理。您可以遵循 Apache Kafka 快速入门,或使用以下 docker-compose.yaml
文件
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.20.1-kafka-2.6.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.20.1-kafka-2.6.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
将 docker-compose.yaml
文件复制到您的项目中,然后在终端中运行以下命令来启动您的代理: docker-compose up -d
然后,使用以下命令运行应用程序
./mvnw quarkus:dev
应用程序以开发模式运行,这意味着您仍然可以更新代码。它将自动重新加载。
在另一个终端中,发出几个 HTTP POST 请求,例如
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
https://:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
https://:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
https://:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
https://:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
https://:8080/
在运行应用程序的终端中,您将看到
...
2021-01-13 09:29:41,087 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-13 09:29:41,114 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...
成功了!
步骤 8 - 本地打包
如果您已正确安装和配置 GraalVM,您可以将此应用程序打包为本地可执行文件
./mvnw package -Pnative
然后,使用以下命令执行您的本地可执行文件: ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner
,您将获得一个使用 Kafka 的 Quarkus 应用程序,该应用程序在几毫秒内启动,并且消耗的内存非常少:在摄入 100 条记录后不到 30Mb!
$ rss getting-started-kafka-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
49321 30M ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner
总结
在不到 10 分钟的时间里,我们就拥有了一个使用 Apache Kafka 的新 Quarkus 应用程序。如果您想进一步了解,请查看 Kafka 指南。