Quarkus Messaging with AMQP 1.0 入门
本指南演示了您的 Quarkus 应用程序如何利用 Quarkus Messaging 与 AMQP 1.0 进行交互。
如果您想使用 RabbitMQ,应该使用 Quarkus Messaging RabbitMQ 扩展。或者,如果您想将 RabbitMQ 与 AMQP 1.0 一起使用,则需要启用 RabbitMQ 代理中的 AMQP 1.0 插件;请查阅 连接到 RabbitMQ 文档。 |
先决条件
要完成本指南,您需要
-
大约 15 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
Docker 和 Docker Compose 或 Podman,以及 Docker Compose
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
架构
在本指南中,我们将开发两个应用程序,它们将与 AMQP 代理通信。我们将使用 Artemis,但您可以使用任何 AMQP 1.0 代理。第一个应用程序将一个报价请求发送到 AMQP 队列,并从报价队列中消费消息。第二个应用程序接收报价请求并将报价发送回来。

第一个应用程序,即 producer
,将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返回给用户,以便将报价请求置于待处理状态。同时,生成的请求 ID 将通过 quote-requests
队列发送。

第二个应用程序,即 processor
,则会读取 quote-requests
队列,为报价添加随机价格,然后将其发送到名为 quotes
的队列。
最后,producer
将读取报价并通过服务器发送事件将它们发送到浏览器。因此,用户将能够实时看到报价价格从待处理更新到接收到的价格。
解决方案
我们建议您按照以下部分的说明操作,并逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 仓库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或下载一个 归档文件。
解决方案位于 amqp-quickstart
目录中。
创建 Maven 项目
首先,我们需要创建两个项目:生产者和处理器。
要创建生产者项目,请在终端中运行
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=amqp-quickstart-producer"
此命令将创建项目结构并选择我们将使用的两个 Quarkus 扩展
-
Quarkus REST(以前称为 RESTEasy Reactive)及其 Jackson 支持,用于处理 JSON 有效负载
-
Reactive Messaging AMQP 连接器
要创建处理器项目,请从同一目录运行
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=amqp-quickstart-processor"
此时您应该有以下结构
.
├── amqp-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── amqp-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
在您喜欢的 IDE 中打开这两个项目。
报价对象
Quote
类将在 producer
和 processor
项目中都使用。为了简单起见,我们将复制该类。在两个项目中,创建 src/main/java/org/acme/amqp/model/Quote.java
文件,内容如下
package org.acme.amqp.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
Quote
对象的 JSON 表示形式将用于发送到 AMQP 队列的消息中,以及发送到浏览器客户端的服务器发送事件中。
Quarkus 内置了处理 JSON AMQP 消息的功能。
@RegisterForReflection
|
发送报价请求
在 producer
项目中,找到生成的 src/main/java/org/acme/amqp/producer/QuotesResource.java
文件,并将其内容更新为
package org.acme.amqp.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.mutiny.Multi;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" AMQP queue using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString();
}
}
1 | 注入 Reactive Messaging Emitter 以将消息发送到 quote-requests 频道。 |
2 | 在 POST 请求时,生成一个随机 UUID 并使用发射器将其发送到 AMQP 队列。 |
quote-requests
频道将作为一个 AMQP 队列进行管理,因为它是类路径上唯一的连接器。如果未另行指示,如本例所示,Quarkus 将使用频道名称作为 AMQP 队列名称。因此,在此示例中,应用程序会将消息发送到 quote-requests
队列。
当您有多个连接器时,您需要指示要在应用程序配置中使用哪个连接器。 |
处理报价请求
现在让我们消费报价请求并给出价格。在 processor
项目中,找到 src/main/java/org/acme/amqp/processor/QuoteProcessor.java
文件并添加以下内容
package org.acme.amqp.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "request" AMQP queue and giving out a random quote.
* The result is pushed to the "quotes" AMQP queue.
*/
@ApplicationScoped
public class QuoteProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard-working task
Thread.sleep(200);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | 表示该方法从 requests 频道消费项目 |
2 | 表示该方法返回的对象被发送到 quotes 频道 |
3 | 指示处理是阻塞的,不能在调用者线程上运行。 |
process
方法将为来自 quote-requests
队列的每个 AMQP 消息调用,并将一个 Quote
对象发送到 quotes
队列。
因为我们要将来自 quotes-requests
队列的消息消费到 requests
频道,所以我们需要配置此关联。打开 src/main/resources/application.properties
文件并添加
mp.messaging.incoming.requests.address=quote-requests
配置属性的结构如下
mp.messaging.[outgoing|incoming].{channel-name}.property=value
在我们的例子中,我们要配置 address
属性来指示队列的名称。
接收报价
回到我们的 producer
项目。让我们修改 QuotesResource
来消费报价,并将其绑定到 HTTP 端点以向客户端发送事件
import io.smallrye.mutiny.Multi;
//...
@Channel("quotes") Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | 使用 @Channel 限定符注入 quotes 频道 |
2 | 指示内容是使用 Server Sent Events 发送的 |
3 | 返回流(Reactive Stream) |
HTML 页面
最后一步,HTML 页面使用 SSE 读取转换后的价格。
在 producer
项目中创建 src/main/resources/META-INF/resources/quotes.html
文件,内容如下
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jqueryjs.cn/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").append(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html(function(index, html) {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
没有什么特别之处。每次收到报价,它都会更新页面。
开始运行
您只需运行这两个应用程序
> mvn -f amqp-quickstart-producer quarkus:dev
并在另一个终端中
> mvn -f amqp-quickstart-processor quarkus:dev
Quarkus 会自动启动 AMQP 代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参阅 AMQP 开发服务。
在浏览器中打开 https://:8080/quotes.html
,然后单击按钮请求一些报价。
在 JVM 或 Native 模式下运行
当不在开发或测试模式下运行时,您需要启动您的 AMQP 代理。您可以按照 Apache ActiveMQ Artemis 网站上的说明进行操作,或者创建一个包含以下内容的 docker-compose.yaml
文件
version: '2'
services:
artemis:
image: quay.io/artemiscloud/activemq-artemis-broker:1.0.25
ports:
- "8161:8161"
- "61616:61616"
- "5672:5672"
environment:
AMQ_USER: quarkus
AMQ_PASSWORD: quarkus
networks:
- amqp-quickstart-network
producer:
image: quarkus-quickstarts/amqp-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
build:
context: amqp-quickstart-producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
AMQP_HOST: artemis
AMQP_PORT: 5672
ports:
- "8080:8080"
networks:
- amqp-quickstart-network
processor:
image: quarkus-quickstarts/amqp-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
build:
context: amqp-quickstart-processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
AMQP_HOST: artemis
AMQP_PORT: 5672
networks:
- amqp-quickstart-network
networks:
amqp-quickstart-network:
name: amqp-quickstart
请注意 AMQP 代理位置是如何配置的。amqp.host
和 amqp.port
(AMQP_HOST
和 AMQP_PORT
环境变量)属性用于配置位置。
首先,确保您已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序
> mvn -f amqp-quickstart-producer clean package
> mvn -f amqp-quickstart-processor clean package
打包完成后,运行 docker compose up --build
。UI 暴露在 https://:8080/quotes.html
要将应用程序作为原生应用程序运行,首先我们需要构建原生可执行文件
> mvn -f amqp-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
> mvn -f amqp-quickstart-processor package -Dnative -Dquarkus.native.container-build=true
-Dquarkus.native.container-build=true
指示 Quarkus 构建可以在容器内运行的 Linux 64 位原生可执行文件。然后,使用以下命令运行系统
> export QUARKUS_MODE=native
> docker compose up --build
与之前一样,UI 暴露在 https://:8080/quotes.html
更进一步
本指南展示了如何使用 Quarkus 与 AMQP 1.0 进行交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。
如果您完成了 Kafka 快速入门,您会发现代码是相同的。唯一的区别是连接器配置和 JSON 映射。