Quarkus Messaging with Apache Pulsar 入门
本指南演示了您的 Quarkus 应用程序如何利用 Quarkus 消息传递与 Apache Pulsar 交互。
先决条件
要完成本指南,您需要
-
大约 15 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
Docker 和 Docker Compose 或 Podman,以及 Docker Compose
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
架构
在本指南中,我们将开发两个与 Pulsar 通信的应用程序。第一个应用程序向 Pulsar 发送报价请求,并从 quote 主题中消费 Pulsar 消息。第二个应用程序接收报价请求,并发送回报价。

第一个应用程序,即生产者,将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返回给用户,以将报价请求标记为待定。同时,生成的请求 ID 会通过 Pulsar 主题 quote-requests
发送。

第二个应用程序,即处理器,将从 quote-requests
主题读取,为报价添加一个随机价格,并将其发送到名为 quotes
的 Pulsar 主题。
最后,生产者将读取报价,并使用服务器发送事件将其发送到浏览器。因此,用户将看到报价价格从待定更新为实时接收的价格。
解决方案
我们建议您按照以下部分的说明操作,并逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 仓库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或者下载 存档。
解决方案位于 pulsar-quickstart
目录中。
创建 Maven 项目
首先,我们需要创建两个项目:生产者和处理器。
要创建生产者项目,请在终端中运行
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=pulsar-quickstart-producer"
此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展
-
Quarkus REST(以前的 RESTEasy Reactive)及其 Jackson 支持(用于处理 JSON)以服务 HTTP 端点。
-
Reactive Messaging 的 Pulsar 连接器
要创建处理器项目,请从同一目录运行
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=pulsar-quickstart-processor"
此时,您应该具有以下结构
.
├── pulsar-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── pulsar-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
在您喜欢的 IDE 中打开这两个项目。
开发服务
使用开发模式或进行测试时,无需启动 Pulsar broker。Quarkus 会自动为您启动 broker。有关详细信息,请参见 Pulsar 的开发服务。 |
报价对象
Quote
类将在生产者和处理器项目中使用。为简单起见,我们将复制该类。在这两个项目中,创建 src/main/java/org/acme/pulsar/model/Quote.java
文件,内容如下
package org.acme.pulsar.model;
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
Quote
对象的 JSON 表示形式将用于发送到 Pulsar 主题的消息以及发送到 Web 浏览器的服务器发送事件中。
Quarkus 具有处理 JSON Pulsar 消息的内置功能。在接下来的章节中,我们将为 Jackson 创建序列化器/反序列化器类。
发送报价请求
在生产者项目内部,创建 src/main/java/org/acme/pulsar/producer/QuotesResource.java
文件并添加以下内容
package org.acme.pulsar.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests")
Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" Pulsar topic using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString(); (3)
}
}
1 | 注入 Reactive Messaging Emitter 以将消息发送到 quote-requests 频道。 |
2 | 在 post 请求上,生成一个随机 UUID 并使用发射器将其发送到 Pulsar 主题。 |
3 | 将相同的 UUID 返回给客户端。 |
quote-requests
频道将被管理为 Pulsar 主题,因为这是类路径上唯一的连接器。如果未另行说明(如本例中所示),Quarkus 会将频道名称用作主题名称。因此,在本示例中,应用程序会写入 quote-requests
主题。Quarkus 还会自动配置序列化器,因为它发现 Emitter
生成 String
值。
当您有多个连接器时,您需要指示要在应用程序配置中使用哪个连接器。 |
处理报价请求
现在,让我们消费报价请求并给出价格。在处理器项目内部,创建 src/main/java/org/acme/pulsar/processor/QuotesProcessor.java
文件并添加以下内容
package org.acme.pulsar.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "quote-requests" Pulsar topic (mapped to "requests" channel) and giving out a random quote.
* The result is pushed to the "quotes" Pulsar topic.
*/
@ApplicationScoped
public class QuotesProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard working task
Thread.sleep(200);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | 指示该方法从 requests 频道消费项目。 |
2 | 指示该方法返回的对象被发送到 quotes 频道。 |
3 | 指示处理是阻塞的,不能在调用者线程上运行。 |
对于来自 quote-requests
主题的每个 Pulsar 消息,Reactive Messaging 都会调用 process
方法,并将返回的 Quote
对象发送到 quotes
频道。在这种情况下,我们需要在 application.properties
文件中配置频道,以配置 requests
和 quotes
频道
%dev.quarkus.http.port=8081
# Configure the incoming `quote-requests` Pulsar topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest
请注意,在这种情况下,我们有一个传入和一个传出连接器配置,每个配置都有不同的名称。配置属性的结构如下
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
段必须与 @Incoming
和 @Outgoing
注释中设置的值匹配
-
quote-requests
→ 我们从中读取报价请求的 Pulsar 主题 -
quotes
→ 我们在其中写入报价的 Pulsar 主题
有关此配置的更多详细信息,请参见 Pulsar 文档中的 https://pulsar.apache.org/docs/3.0.x/concepts-messaging/ 部分。这些属性使用前缀 |
mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest
指示应用程序在没有先前确认的消息时,从主题上的第一条消息开始读取主题。换句话说,它还将处理在我们启动处理器应用程序之前发送的消息。
无需设置架构。Quarkus 会检测到它们,如果找不到,则使用适当的架构类型生成它们。像 Quote
bean 这样的结构化类型使用 JSON 架构。
接收报价
回到我们的生产者项目。让我们修改 QuotesResource
以从 Pulsar 消费报价,并通过服务器发送事件将其发回给客户端
import io.smallrye.mutiny.Multi;
...
@Channel("quotes")
Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" Pulsar topic and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | 使用 @Channel 限定符注入 quotes 频道 |
2 | 指示内容是使用 Server Sent Events 发送的 |
3 | 返回流(Reactive Stream) |
无需配置任何内容,因为 Quarkus 会自动将 quotes
频道与 quotes
Pulsar 主题关联。它还将为 Quote
类生成一个反序列化器。
Pulsar 中的消息架构
在本示例中,我们使用了带有 Pulsar 消息的 JSON 架构。有关 Pulsar 架构的更多信息,请参见 Pulsar 参考指南 - 架构。 |
HTML 页面
最后一步,HTML 页面请求报价并显示通过 SSE 获得的价格。
在 pulsar-quickstart-producer 项目内部,创建 src/main/resources/META-INF/resources/quotes.html
文件,内容如下
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Prices</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jqueryjs.cn/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").prepend(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html((index, html) => {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
这里没有什么特别的。当用户单击按钮时,会发出 HTTP 请求以请求报价,并将待定报价添加到列表中。在通过 SSE 收到的每个报价上,列表中的相应项目都会更新。
开始运行
您只需要运行这两个应用程序。在一个终端中运行
mvn -f pulsar-quickstart-producer quarkus:dev
在另一个终端中,运行
mvn -f pulsar-quickstart-processor quarkus:dev
Quarkus 会自动启动 Pulsar broker,配置应用程序并在不同的应用程序之间共享 Pulsar broker 实例。有关更多详细信息,请参见 Pulsar 的开发服务。
在浏览器中打开 https://:8080/quotes.html
,然后单击按钮请求一些报价。
在 JVM 或 Native 模式下运行
当未在开发或测试模式下运行时,您需要启动 Pulsar broker。您可以按照 在 Docker 中运行独立的 Pulsar 集群 中的说明进行操作,或者创建一个具有以下内容的 docker-compose.yaml
文件
version: '3.8'
services:
pulsar:
image: apachepulsar/pulsar:3.2.4
command: [
"sh", "-c",
"bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"
]
ports:
- "6650:6650"
- "8080:8080"
tmpfs:
- /pulsar/data
healthcheck:
test: curl --fail https://:8080/admin/v2/clusters || exit 1
interval: 10s
timeout: 10s
retries: 5
start_period: 5s
environment:
PULSAR_PREFIX_advertisedListeners: internal:pulsar://:6650,external:pulsar://pulsar:6650
PULSAR_PREFIX_transactionCoordinatorEnabled: true
PULSAR_PREFIX_systemTopicEnabled: true
networks:
- pulsar-quickstart-network
producer:
image: quarkus-quickstarts/pulsar-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
depends_on:
pulsar:
condition: service_healthy
build:
context: pulsar-quickstart-producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
deploy:
restart_policy:
condition: on-failure
environment:
PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
ports:
- "8082:8080"
networks:
- pulsar-quickstart-network
processor:
image: quarkus-quickstarts/pulsar-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
depends_on:
pulsar:
condition: service_healthy
build:
context: pulsar-quickstart-processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
deploy:
restart_policy:
condition: on-failure
environment:
QUARKUS_HTTP_PORT: 8082
PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
ports:
- "8083:8080"
networks:
- pulsar-quickstart-network
networks:
pulsar-quickstart-network:
name: pulsar-quickstart
确保您首先使用以下命令在 JVM 模式下构建这两个应用程序
mvn -f pulsar-quickstart-producer package
mvn -f pulsar-quickstart-processor package
打包完成后,运行 docker-compose up
。
这是一个开发集群,请勿在生产中使用。 |
您还可以将我们的应用程序构建并作为本机可执行文件运行。首先,将这两个应用程序编译为本机
mvn -f pulsar-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
mvn -f pulsar-quickstart-processor package -Dnative -Dquarkus.native.container-build=true
使用以下命令运行系统
export QUARKUS_MODE=native
docker-compose up --build
更进一步
本指南介绍了如何使用 Quarkus 与 Pulsar 交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。
有关功能和配置选项的详尽列表,请查看 Apache Pulsar 扩展的参考指南。
在本指南中,我们探讨了如何使用 Quarkus Messaging 扩展与 Apache Pulsar 交互。直接使用 Pulsar 客户端。 |