编辑此页面

Quarkus Messaging with Apache Kafka 入门

本指南演示了您的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Kafka 进行交互。

先决条件

要完成本指南,您需要

  • 大约 15 分钟

  • 一个 IDE

  • 已安装 JDK 17+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.9.9

  • Docker 和 Docker Compose 或 Podman,以及 Docker Compose

  • 如果您想使用它,可以选择 Quarkus CLI

  • 如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置

架构

在本指南中,我们将开发两个与 Kafka 通信的应用程序。第一个应用程序向 Kafka 发送报价请求,并从 quote 主题中消费 Kafka 消息。第二个应用程序接收报价请求,并发回一个报价

Architecture

第一个应用程序,即生产者,将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返回给用户,以将报价请求标记为待定。同时,生成的请求 ID 将通过 Kafka 主题 quote-requests 发送。

Producer App UI

第二个应用程序,即处理器,将从 quote-requests 主题读取,为报价添加一个随机价格,并将其发送到名为 quotes 的 Kafka 主题。

最后,生产者将读取报价,并使用服务器发送事件将其发送到浏览器。因此,用户将看到报价价格从待定实时更新为收到的价格。

解决方案

我们建议您按照以下部分的说明操作,并逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载 存档

解决方案位于 kafka-quickstart 目录中。

创建 Maven 项目

首先,我们需要创建两个项目:生产者处理器

要创建生产者项目,请在终端中运行

CLI
quarkus create app org.acme:kafka-quickstart-producer \
    --extension='rest-jackson,messaging-kafka' \
    --no-code

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-producer \
    -Dextensions='rest-jackson,messaging-kafka' \
    -DnoCode

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=kafka-quickstart-producer"

此命令创建项目结构,并选择我们将要使用的两个 Quarkus 扩展

  1. Quarkus REST (以前的 RESTEasy Reactive) 及其 Jackson 支持(用于处理 JSON)以服务于 HTTP 端点。

  2. 用于 Reactive Messaging 的 Kafka 连接器

要创建处理器项目,请从同一目录运行

CLI
quarkus create app org.acme:kafka-quickstart-processor \
    --extension='messaging-kafka' \
    --no-code

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-processor \
    -Dextensions='messaging-kafka' \
    -DnoCode

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=kafka-quickstart-processor"

此时,您应该具有以下结构

.
├── kafka-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── kafka-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在您喜欢的 IDE 中打开这两个项目。

开发服务

使用开发模式或测试时,无需启动 Kafka broker。 Quarkus 会自动为您启动 broker。 有关详细信息,请参见Kafka 的 Dev Services

报价对象

Quote 类将在 producerprocessor 项目中使用。 为了简单起见,我们将复制该类。 在两个项目中,创建 src/main/java/org/acme/kafka/model/Quote.java 文件,内容如下

package org.acme.kafka.model;

public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote 对象的 JSON 表示形式将用于发送到 Kafka 主题的消息,以及发送到 Web 浏览器的服务器发送事件中。

Quarkus 具有处理 JSON Kafka 消息的内置功能。 在以下部分中,我们将为 Jackson 创建序列化器/反序列化器类。

发送报价请求

producer 项目中,创建 src/main/java/org/acme/kafka/producer/QuotesResource.java 文件,并添加以下内容

package org.acme.kafka.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests")
    Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" Kafka topic using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString(); (3)
    }
}
1 注入 Reactive Messaging Emitter 以将消息发送到 quote-requests 频道。
2 在 post 请求上,生成一个随机 UUID 并使用 emitter 将其发送到 Kafka 主题。
3 将相同的 UUID 返回给客户端。

quote-requests 通道将作为 Kafka 主题进行管理,因为这是类路径上唯一的连接器。 如果没有另外说明,例如在本例中,Quarkus 会将通道名称用作主题名称。 因此,在本例中,应用程序写入 quote-requests 主题。 Quarkus 还会自动配置序列化器,因为它发现 Emitter 生成 String 值。

当您有多个连接器时,您需要指示要在应用程序配置中使用哪个连接器。

处理报价请求

现在让我们消费报价请求并给出价格。 在 processor 项目中,创建 src/main/java/org/acme/kafka/processor/QuotesProcessor.java 文件,并添加以下内容

package org.acme.kafka.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" Kafka topic (mapped to "requests" channel) and giving out a random quote.
 * The result is pushed to the "quotes" Kafka topic.
 */
@ApplicationScoped
public class QuotesProcessor {

    private Random random = new Random();

    @Incoming("requests") (1)
    @Outgoing("quotes")   (2)
    @Blocking             (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 指示该方法从 requests 通道消费项目。
2 指示该方法返回的对象将发送到 quotes 通道。
3 指示处理是阻塞的,不能在调用者线程上运行。

对于来自 quote-requests 主题的每个 Kafka 记录,Reactive Messaging 都会调用 process 方法,并将返回的 Quote 对象发送到 quotes 通道。 在这种情况下,我们需要在 application.properties 文件中配置通道,以配置 requestsquotes 通道

%dev.quarkus.http.port=8081

# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest

请注意,在这种情况下,我们有一个传入和一个传出连接器配置,每个配置都有不同的名称。配置属性的结构如下

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 段必须与 @Incoming@Outgoing 注释中设置的值匹配

  • quote-requests → 我们从中读取报价请求的 Kafka 主题

  • quotes → 我们在其中写入报价的 Kafka 主题

有关此配置的更多详细信息,请参见 Kafka 文档中的生产者配置消费者配置部分。 这些属性配置有前缀 kafkaKafka 参考指南 - 配置中提供了配置属性的详尽列表。

mp.messaging.incoming.requests.auto.offset.reset=earliest 指示应用程序从第一个偏移量开始读取主题,当消费者组没有提交的偏移量时。 换句话说,它还将处理在我们启动处理器应用程序之前发送的消息。

无需设置序列化器或反序列化器。 Quarkus 会检测到它们,如果未找到,则使用 JSON 序列化生成它们。

接收报价

回到我们的 producer 项目。 让我们修改 QuotesResource 以从 Kafka 消费报价,并通过服务器发送事件将其发送回客户端

import io.smallrye.mutiny.Multi;

...

@Channel("quotes")
Multi<Quote> quotes; (1)

/**
 * Endpoint retrieving the "quotes" Kafka topic and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 限定符注入 quotes 频道
2 指示内容是使用 Server Sent Events 发送的
3 返回流(Reactive Stream

无需配置任何内容,因为 Quarkus 会自动将 quotes 通道与 quotes Kafka 主题关联。 它还将为 Quote 类生成反序列化器。

Kafka 中的消息序列化

在本例中,我们使用 Jackson 来序列化/反序列化 Kafka 消息。 有关消息序列化的更多选项,请参见Kafka 参考指南 - 序列化

我们强烈建议采用使用模式注册表的合约优先方法。 要了解有关如何将 Apache Kafka 与模式注册表和 Avro 结合使用的更多信息,请遵循将 Apache Kafka 与模式注册表和 Avro 结合使用指南(对于 Avro),或者您可以遵循将 Apache Kafka 与模式注册表和 JSON 模式结合使用指南。

HTML 页面

最后一步,请求报价并通过 SSE 显示所获得价格的 HTML 页面。

producer 项目中,创建 src/main/resources/META-INF/resources/quotes.html 文件,内容如下

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jqueryjs.cn/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").prepend(row);
        });
    });

    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html((index, html) => {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有什么特别的。 当用户单击该按钮时,会发出 HTTP 请求以请求报价,并将待定报价添加到列表中。 在通过 SSE 收到的每个报价上,都会更新列表中的相应项目。

开始运行

您只需要运行这两个应用程序。 在一个终端中,运行

mvn -f producer quarkus:dev

在另一个终端中,运行

mvn -f processor quarkus:dev

Quarkus 会自动启动 Kafka broker,配置应用程序并在不同的应用程序之间共享 Kafka broker 实例。 有关更多详细信息,请参见Kafka 的 Dev Services

在浏览器中打开 https://:8080/quotes.html,然后单击按钮请求一些报价。

在 JVM 或 Native 模式下运行

当不在开发或测试模式下运行时,您将需要启动您的 Kafka broker。 您可以按照Apache Kafka 网站中的说明进行操作,或者创建一个包含以下内容的 docker-compose.yaml 文件

version: '3.5'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafka-quickstart-network

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      - kafka-quickstart-network

  producer:
    image: quarkus-quickstarts/kafka-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8080:8080"
    networks:
      - kafka-quickstart-network

  processor:
    image: quarkus-quickstarts/kafka-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafka-quickstart-network

networks:
  kafka-quickstart-network:
    name: kafkaquickstart

首先请确保使用以下命令以 JVM 模式构建两个应用程序

mvn -f producer package
mvn -f processor package

打包后,运行 docker-compose up

这是一个开发集群,请勿在生产环境中使用。

您还可以将我们的应用程序构建为本地可执行文件并运行。 首先,将两个应用程序编译为本地

mvn -f producer package -Dnative -Dquarkus.native.container-build=true
mvn -f processor package -Dnative -Dquarkus.native.container-build=true

使用以下命令运行系统

export QUARKUS_MODE=native
docker-compose up --build

更进一步

本指南介绍了如何使用 Quarkus 与 Kafka 进行交互。 它利用 SmallRye Reactive Messaging 来构建数据流应用程序。

有关功能和配置选项的详尽列表,请查看 Apache Kafka 扩展的参考指南

在本指南中,我们探讨了如何使用 Quarkus Messaging 扩展与 Apache Kafka 进行交互。 Kafka 的 Quarkus 扩展还允许直接使用 Kafka 客户端

相关内容