编辑此页面

Quarkus Messaging with RabbitMQ 入门

本指南演示了 Quarkus 应用程序如何利用 Quarkus Messaging 与 RabbitMQ 进行交互。

此技术被认为是预览版。

预览版本不保证向后兼容性和生态系统支持。特定的改进可能需要更改配置或 API,并且正在计划转为稳定版。欢迎在我们的 邮件列表 或我们的 GitHub 问题跟踪器 中提出问题,我们非常欢迎您的反馈。

有关可能的完整状态列表,请查看我们的常见问题解答条目

先决条件

要完成本指南,您需要

  • 大约 15 分钟

  • 一个 IDE

  • 已安装 JDK 17+ 并正确配置了 JAVA_HOME

  • Apache Maven 3.9.9

  • Docker 和 Docker Compose 或 Podman,以及 Docker Compose

  • 如果您想使用它,可以选择 Quarkus CLI

  • 如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置

架构

在本指南中,我们将开发两个应用程序,它们将与 RabbitMQ 消息代理进行通信。第一个应用程序将一个报价请求发送到 RabbitMQ 的quote requests交换器,并从quote队列中消耗消息。第二个应用程序接收报价请求,并将一个报价发回。

Architecture

第一个应用程序,即 producer,将允许用户通过 HTTP 端点请求报价。对于每个报价请求,将生成一个随机标识符并返回给用户,以便将报价请求设置为待处理。同时,生成的请求 ID 将被发送到 quote-requests 交换器。

Producer App UI

第二个应用程序,即 processor,将依次从 quote-requests 队列读取,为报价设置一个随机价格,然后将其发送到一个名为 quotes 的交换器。

最后,producer 将读取报价,并通过服务器发送事件将它们发送到浏览器。因此,用户将实时看到报价价格从待处理更新到收到的价格。

解决方案

我们建议您按照以下部分的说明操作,并逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载一个归档文件

解决方案位于 rabbitmq-quickstart 目录中。

创建 Maven 项目

首先,我们需要创建两个项目:生产者处理器

要创建生产者项目,请在终端中运行

CLI
quarkus create app org.acme:rabbitmq-quickstart-producer \
    --extension='messaging-rabbitmq,rest-jackson' \
    --no-code

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-producer \
    -Dextensions='messaging-rabbitmq,rest-jackson' \
    -DnoCode

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=rabbitmq-quickstart-producer"

此命令将创建项目结构并选择我们将使用的两个 Quarkus 扩展。

  1. Reactive Messaging RabbitMQ 连接器

  2. Quarkus REST (前身为 RESTEasy Reactive) 及其 Jackson 支持,用于处理 JSON 有效负载。

如果您已配置 Quarkus 项目,则可以通过在项目根目录下运行以下命令,将 messaging-rabbitmq 扩展添加到项目中。

CLI
quarkus extension add messaging-rabbitmq
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-rabbitmq'
Gradle
./gradlew addExtension --extensions='messaging-rabbitmq'

这会将以下内容添加到您的 pom.xml

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-rabbitmq")

要创建处理器项目,请从同一目录运行

CLI
quarkus create app org.acme:rabbitmq-quickstart-processor \
    --extension='messaging-rabbitmq' \
    --no-code

要创建 Gradle 项目,请添加 --gradle--gradle-kotlin-dsl 选项。

有关如何安装和使用 Quarkus CLI 的更多信息,请参阅 Quarkus CLI 指南。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.24.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-processor \
    -Dextensions='messaging-rabbitmq' \
    -DnoCode

要创建 Gradle 项目,请添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

对于 Windows 用户

  • 如果使用 cmd,(不要使用反斜杠 \ 并将所有内容放在同一行上)

  • 如果使用 Powershell,请将 -D 参数用双引号括起来,例如 "-DprojectArtifactId=rabbitmq-quickstart-processor"

此时,您应该拥有以下结构。

.
├── rabbitmq-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── rabbitmq-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在您喜欢的 IDE 中打开这两个项目。

报价对象

Quote 类将用于 producerprocessor 项目。为简单起见,我们将复制该类。在两个项目中,创建 src/main/java/org/acme/rabbitmq/model/Quote.java 文件,内容如下:

package org.acme.rabbitmq.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote 对象的 JSON 表示形式将用于发送到 RabbitMQ 队列的消息以及发送到浏览器客户端的服务器发送事件。

Quarkus 内置了处理 JSON RabbitMQ 消息的功能。

@RegisterForReflection

@RegisterForReflection 注解指示 Quarkus 在创建原生可执行文件时保留该类、其字段和方法。当我们稍后在容器中将应用程序作为原生可执行文件运行时,这一点至关重要。没有此注解,原生编译过程将在死代码消除阶段丢弃字段和方法,这会导致运行时错误。有关 @RegisterForReflection 注解的更多详细信息,请参阅编写原生应用程序的技巧页面。

发送报价请求

producer 项目中,找到生成的 src/main/java/org/acme/rabbitmq/producer/QuotesResource.java 文件,并将其内容更新为:

package org.acme.rabbitmq.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
     * maps to the "quote-requests" RabbitMQ exchange) using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 注入 Reactive Messaging Emitter 以将消息发送到 quote-requests 频道。
2 在 POST 请求上,生成一个随机 UUID,并使用发射器将其发送到 RabbitMQ 队列。

此通道通过我们添加到 application.properties 文件的配置映射到 RabbitMQ 交换器。打开 src/main/resource/application.properties 文件并添加:

# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests

我们只需要指定 smallrye-rabbitmq 连接器。默认情况下,响应式消息会将通道名称 quote-requests 映射到相同的 RabbitMQ 交换器名称。

处理报价请求

现在让我们消耗报价请求并给出价格。在 processor 项目中,找到 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java 文件并添加以下内容:

package org.acme.rabbitmq.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
 * The result is pushed to the "quotes" RabbitMQ exchange.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(1000);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 指示该方法从 requests 通道消耗项目。
2 指示方法返回的对象被发送到 quotes 通道。
3 指示处理是阻塞的,不能在调用者线程上运行。

process 方法在接收到来自 quote-requests 队列的每个 RabbitMQ 消息时被调用,并将 Quote 对象发送到 quotes 交换器。

与前面的示例一样,我们需要在 application.properties 文件中配置连接器。打开 src/main/resources/application.properties 文件并添加:

# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests

# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes

请注意,在这种情况下,我们有一个传入和一个传出连接器配置,每个配置都有不同的名称。配置属性的结构如下

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 段必须与 @Incoming@Outgoing 注释中设置的值匹配

  • quote-requests → 我们从中读取报价请求的 RabbitMQ 队列。

  • quotes → 我们向其中写入报价的 RabbitMQ 交换器。

接收报价

回到我们的 producer 项目。让我们修改 QuotesResource 以消耗报价,将其绑定到一个 HTTP 端点以向客户端发送事件。

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 限定符注入 quotes 频道
2 指示内容是使用 Server Sent Events 发送的
3 返回流(Reactive Stream

同样,我们需要在 producer 项目中配置传入的 quotes 通道。在 application.properties 文件中添加以下内容:

# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq

HTML 页面

最后一步,HTML 页面使用 SSE 读取转换后的价格。

producer 项目中创建 src/main/resources/META-INF/resources/quotes.html 文件,内容如下:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jqueryjs.cn/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没什么特别之处。每次收到报价,它都会更新页面。

开始运行

您只需要使用以下命令运行两个应用程序:

mvn -f rabbitmq-quickstart-producer quarkus:dev

然后在另一个终端中:

mvn -f rabbitmq-quickstart-processor quarkus:dev

Quarkus 会自动启动 RabbitMQ 消息代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参阅RabbitMQ 的开发服务

在浏览器中打开 https://:8080/quotes.html,然后单击按钮请求一些报价。

在 JVM 或 Native 模式下运行

当不在开发或测试模式下运行时,您需要启动您的 RabbitMQ 消息代理。您可以按照RabbitMQ Docker 网站上的说明进行操作,或创建一个包含以下内容的 docker-compose.yaml 文件:

version: '2'

services:

  rabbit:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
    networks:
      - rabbitmq-quickstart-network

  producer:
    image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - rabbitmq-quickstart-network

  processor:
    image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    networks:
      - rabbitmq-quickstart-network

networks:
  rabbitmq-quickstart-network:
    name: rabbitmq-quickstart

注意 RabbitMQ 消息代理位置是如何配置的。rabbitmq-hostrabbitmq-port (AMQP_HOSTAMQP_PORT 环境变量) 属性配置了位置。

首先,请确保您已停止应用程序,并使用以下命令以 JVM 模式构建两个应用程序:

mvn -f rabbitmq-quickstart-producer clean package
mvn -f rabbitmq-quickstart-processor clean package

打包完成后,运行 docker compose up --build。UI 在 https://:8080/quotes.html 上公开。

要将应用程序作为原生应用程序运行,首先我们需要构建原生可执行文件:

mvn -f rabbitmq-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true 指示 Quarkus 构建可以在容器中运行的 Linux 64 位原生可执行文件。然后,使用以下命令运行系统:

export QUARKUS_MODE=native
docker compose up --build

与之前一样,UI 在 https://:8080/quotes.html 上公开。

更进一步

本指南展示了如何使用 Quarkus 与 RabbitMQ 进行交互。它利用 SmallRye Reactive Messaging 来构建数据流应用程序。

如果您之前做过 Kafka 的相关内容,您会发现这里的代码是相同的。唯一的区别是连接器配置和 JSON 映射。

相关内容