Reactive Messaging AMQP 1.0 连接器参考文档
本指南是 AMQP 1.0 入门的配套文档。它更详细地解释了响应式消息传递的 AMQP 连接器的配置和使用。
本文档未涵盖连接器的所有细节。有关更多详细信息,请参阅 SmallRye 响应式消息传递网站。 |
AMQP 连接器允许 Quarkus 应用程序使用 AMQP 1.0 协议发送和接收消息。有关该协议的更多详细信息,请参见 AMQP 1.0 规范。请务必注意,AMQP 1.0 和 AMQP 0.9.1(由 RabbitMQ 实现)不兼容。查看 使用 RabbitMQ 获取更多详情。
AMQP 连接器扩展
要使用连接器,您需要添加 quarkus-messaging-amqp
扩展。
您可以使用以下方式将扩展添加到您的项目中
quarkus extension add quarkus-messaging-amqp
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
./gradlew addExtension --extensions='quarkus-messaging-amqp'
或者直接将以下依赖项添加到您的项目中
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-amqp")
添加到您的项目后,您可以通过配置 connector
属性将通道映射到 AMQP 地址
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
连接器自动附加
如果您的类路径上只有一个连接器,您可以省略 可以使用以下方式禁用此自动附加功能
|
配置 AMQP 代理访问
AMQP 连接器连接到 AMQP 1.0 代理,例如 Apache ActiveMQ 或 Artemis。要配置代理的位置和凭据,请在 application.properties
中添加以下属性
amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)
mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 | 配置代理/路由器主机名。您可以按通道(使用 host 属性)或全局(使用 amqp-host )进行配置 |
2 | 配置代理/路由器端口。您可以按通道(使用 port 属性)或全局(使用 amqp-port )进行配置。默认值为 5672 。 |
3 | 配置代理/路由器用户名(如果需要)。您可以按通道(使用 username 属性)或全局(使用 amqp-username )进行配置。 |
4 | 配置代理/路由器密码(如果需要)。您可以按通道(使用 password 属性)或全局(使用 amqp-password )进行配置。 |
5 | 指示 prices 通道由 AMQP 连接器管理 |
在开发模式下以及运行测试时,AMQP 的开发服务会自动启动 AMQP 代理。
接收 AMQP 消息
假设您的应用程序接收 Message<Double>
。您可以直接使用有效负载
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class AmqpPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,您可以检索 Message<Double>
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class AmqpPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the AMQP message as `accepted`.
return price.ack();
}
}
入站元数据
来自 AMQP 的消息在元数据中包含 IncomingAmqpMetadata
的实例。
Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
String address = meta.getAddress();
String subject = meta.getSubject();
boolean durable = meta.isDurable();
// Use io.vertx.core.json.JsonObject
JsonObject properties = meta.getProperties();
// ...
});
反序列化
连接器将传入的 AMQP 消息转换为响应式消息传递 Message<T>
实例。T
取决于接收到的 AMQP 消息的主体。
AMQP 类型系统定义了支持的类型。
AMQP 主体类型 | <T> |
---|---|
AMQP 值,包含 AMQP 原始类型 |
相应的 Java 类型 |
使用 |
|
AMQP 序列 |
|
AMQP 数据(带有二进制内容),并且 |
|
具有不同 |
|
如果您使用此 AMQP 连接器(出站连接器)发送对象,则它会被编码为 JSON 并作为二进制文件发送。content-type
设置为 application/json
。因此,您可以按如下方式重建对象
import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-amqp") (1)
public void consume(JsonObject p) { (2)
Price price = p.mapTo(Price.class); (3)
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
1 | Price 实例由连接器自动编码为 JSON |
2 | 您可以使用 JsonObject 接收它 |
3 | 然后,您可以使用 mapTo 方法重建实例 |
mapTo 方法使用 Quarkus Jackson 映射器。查看 本指南以了解有关映射器配置的更多信息。 |
确认
当与 AMQP 消息关联的响应式消息传递消息被确认时,它会通知代理该消息已被接受。
失败管理
如果由 AMQP 消息生成的消息被否定确认,则会应用失败策略。AMQP 连接器支持六种策略
-
fail
- 使应用程序失败;将不再处理 AMQP 消息(默认)。AMQP 消息被标记为拒绝。 -
accept
- 此策略将 AMQP 消息标记为已接受。处理继续,忽略失败。请参阅 接受的传递状态文档。 -
release
- 此策略将 AMQP 消息标记为已释放。处理继续,处理下一条消息。代理可以重新传递该消息。请参阅 已释放的传递状态文档。 -
reject
- 此策略将 AMQP 消息标记为拒绝。处理继续,处理下一条消息。请参阅 拒绝的传递状态文档。 -
modified-failed
- 此策略将 AMQP 消息标记为已修改,并指示它失败(带有delivery-failed
属性)。处理继续,处理下一条消息,但代理可能会尝试重新传递该消息。请参阅 已修改的传递状态文档。 -
modified-failed-undeliverable-here
- 此策略将 AMQP 消息标记为已修改,并指示它失败(带有delivery-failed
属性)。它还指示应用程序无法处理该消息,这意味着代理不会尝试将该消息重新传递到此节点。处理继续,处理下一条消息。请参阅 已修改的传递状态文档。
发送 AMQP 消息
序列化
发送 Message<T>
时,连接器将消息转换为 AMQP 消息。有效负载将转换为 AMQP 消息主体。
T |
AMQP 消息主体 |
---|---|
原始类型或 |
带有有效负载的 AMQP 值 |
|
使用相应 AMQP 类型的 AMQP 值 |
使用二进制内容的 AMQP 数据。 |
|
|
使用二进制内容的 AMQP 数据。未设置 |
任何其他类 |
有效负载将转换为 JSON(使用 Json 映射器)。结果包装到使用二进制内容的 AMQP 数据中。 |
如果消息有效负载无法序列化为 JSON,则消息会被否定确认。
出站元数据
发送 Messages
时,您可以添加 OutgoingAmqpMetadata
的实例以影响消息发送到 AMQP 的方式。例如,您可以配置主题、属性
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withSubject("my-subject")
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
动态地址名称
有时需要动态选择消息的目标地址。在这种情况下,您不应在应用程序配置文件中配置地址,而应使用出站元数据来设置地址。
例如,您可以根据传入消息发送到动态地址
String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withAddress(addressName)
.withDurable(true)
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
为了能够按消息设置地址,连接器正在使用匿名发送者。 |
配置 AMQP 地址
您可以使用 address
属性配置 AMQP 地址
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue
mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue
如果未设置 address
属性,则连接器使用通道名称。
要使用现有队列,您需要配置 address
、container-id
以及可选的 link-name
属性。例如,如果您配置了具有以下内容的 Apache Artemis 代理
<queues>
<queue name="people">
<address>people</address>
<durable>true</durable>
<user>artemis</user>
</queue>
</queues>
您需要以下配置
mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people
如果队列名称不是通道名称,您可能需要配置 link-name
属性
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people
要使用 MULTICAST
队列,您需要提供 FQQN(完全限定队列名称),而不仅仅是队列的名称
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo
mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people
有关 AMQP 地址模型的更多详细信息,请参见 Artemis 文档。
执行模型和阻塞处理
响应式消息传递在 I/O 线程上调用您的方法。有关此主题的更多详细信息,请参见 Quarkus 响应式架构文档。但是,您经常需要将响应式消息传递与阻塞处理(例如数据库交互)相结合。为此,您需要使用 @Blocking
注释,指示处理是阻塞的,不应在调用者线程上运行。
例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入的有效负载存储到数据库中
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有 2 个
它们具有相同的效果。因此,您可以同时使用两者。第一个提供了更细粒度的调整,例如要使用的工作池以及是否保留顺序。第二个也与 Quarkus 的其他响应式功能一起使用,它使用默认的工作池并保留顺序。 |
@RunOnVirtualThread
有关在 Java 虚拟线程上运行阻塞处理的信息,请参见 Quarkus 虚拟线程支持与响应式消息传递文档。 |
@Transactional
如果您的方法使用 |
自定义底层 AMQP 客户端
连接器在底层使用 Vert.x AMQP 客户端。有关此客户端的更多详细信息,请参见 Vert.x 网站。
您可以通过按如下方式生成 AmqpClientOptions
的实例来自定义底层客户端配置
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// You can use the produced options to configure the TLS connection
PemKeyCertOptions keycert = new PemKeyCertOptions()
.addCertPath("./tls/tls.crt")
.addKeyPath("./tls/tls.key");
PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
return new AmqpClientOptions()
.setSsl(true)
.setPemKeyCertOptions(keycert)
.setPemTrustOptions(trust)
.addEnabledSaslMechanism("EXTERNAL")
.setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
.setConnectTimeout(30000)
.setReconnectInterval(5000)
.setContainerId("my-container");
}
此实例将被检索并用于配置连接器使用的客户端。您需要使用 client-options-name
属性指示客户端的名称
mp.messaging.incoming.prices.client-options-name=my-named-options
如果您遇到与代理频繁断开连接的情况,则 AmqpClientOptions
也可以用于设置心跳,如果您需要永久保持 AMQP 连接。某些代理可能会在一定的空闲超时后终止 AMQP 连接。您可以提供一个心跳值,Vert.x proton 客户端将使用该值在打开到远程对等方的传输时通知空闲超时。
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// set a heartbeat of 30s (in milliseconds)
return new AmqpClientOptions()
.setHeartbeat(30000);
}
TLS 配置
AMQP 1.0 消息传递扩展与 Quarkus TLS 注册表集成以配置 Vert.x AMQP 客户端。
要为 AMQP 1.0 通道配置 TLS,您需要在 application.properties
中提供命名的 TLS 配置
quarkus.tls.your-tls-config.trust-store.pem.certs=ca.crt,ca2.pem
# ...
mp.messaging.incoming.prices.tls-configuration-name=your-tls-config
运行状况报告
如果您将 AMQP 连接器与 quarkus-smallrye-health
扩展一起使用,它将有助于就绪性和活跃性探测。AMQP 连接器报告由连接器管理的每个通道的就绪性和活跃性。目前,AMQP 连接器对就绪性检查和活跃性检查使用相同的逻辑。
要禁用运行状况报告,请将通道的 health-enabled
属性设置为 false。在入站端(从 AMQP 接收消息),该检查会验证接收者是否已连接到代理。在出站端(向 AMQP 发送记录),该检查会验证发送者是否已连接到代理。
请注意,消息处理失败会否定确认消息,然后由 failure-strategy
处理。failure-strategy
负责报告失败并影响检查结果。fail
失败策略报告失败,因此检查将报告故障。
使用 RabbitMQ
此连接器适用于 AMQP 1.0。RabbitMQ 实现 AMQP 0.9.1。默认情况下,RabbitMQ 不提供 AMQP 1.0,但有一个插件可以实现。要将 RabbitMQ 与此连接器一起使用,请启用并配置 AMQP 1.0 插件。
尽管存在该插件,但一些 AMQP 1.0 功能无法与 RabbitMQ 一起使用。因此,我们建议以下配置。
要从 RabbitMQ 接收消息
-
将 durable 设置为 false
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false
要将消息发送到 RabbitMQ
-
设置目标地址(不支持匿名发送者)
-
将
use-anonymous-sender
设置为 false
mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false
因此,在使用 RabbitMQ 时,无法动态更改目标地址(使用消息元数据)。
接收云事件
AMQP 连接器配置参考
Quarkus 特定配置
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
类型 |
默认 |
|
是否已显式启用或禁用 AMQP 的开发服务。除非存在现有配置,否则开发服务通常默认启用。对于 AMQP,开发服务会启动代理,除非设置了 环境变量: 显示更多 |
布尔值 |
|
开发服务将侦听的可选固定端口。 如果未定义,将随机选择端口。 环境变量: 显示更多 |
整数 |
|
要使用的镜像。请注意,仅支持 ActiveMQ Artemis 镜像。具体来说,镜像存储库必须以 请查看 Quay 页面上的 activemq-artemis-broker 以查找可用版本。 环境变量: 显示更多 |
字符串 |
|
要传递给容器的 环境变量: 显示更多 |
字符串 |
|
指示由 Quarkus 开发服务管理的 AMQP 代理是否共享。共享时,Quarkus 会使用基于标签的服务发现查找正在运行的容器。如果找到匹配的容器,则会使用该容器,因此不会启动第二个容器。否则,AMQP 的开发服务将启动一个新的容器。 发现使用 容器共享仅在开发模式下使用。 环境变量: 显示更多 |
布尔值 |
|
附加到启动的容器的 当您需要多个共享 AMQP 代理时,会使用此属性。 环境变量: 显示更多 |
字符串 |
|
传递给容器的环境变量。 环境变量: 显示更多 |
Map<String,String> |
传入通道配置
属性 (别名) | 描述 | 强制 | 默认 |
---|---|---|---|
地址 |
AMQP 地址。如果未设置,则使用通道名称 类型:字符串 |
false |
|
自动确认 |
是否必须在收到 AMQP 消息时进行确认 类型:布尔值 |
false |
|
广播 |
是否必须将收到的 AMQP 消息分派给多个订阅者 类型:布尔值 |
false |
|
功能 |
发送者或接收者客户端提出的功能的逗号分隔列表。 类型:字符串 |
false |
|
客户端选项名称 (amqp-client-options-name) |
用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称 类型:字符串 |
false |
|
云事件 |
启用(默认)或禁用云事件支持。如果在传入通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在传出通道上启用,如果消息包含云事件元数据,连接器会将传出消息作为云事件发送。 类型:布尔值 |
false |
|
连接超时 (amqp-connect-timeout) |
连接超时时间(以毫秒为单位) 类型:整数 |
false |
|
容器 ID |
AMQP 容器 ID 类型:字符串 |
false |
|
持久 |
AMQP 订阅是否持久 类型:布尔值 |
false |
|
失败策略 |
指定由 AMQP 消息生成的消息被否定确认时要应用的失败策略。可接受的值为 类型:字符串 |
false |
|
运行状况超时 |
等待以确定与代理的连接是否仍为就绪性检查建立的最大秒数。超过该阈值后,检查将被视为失败。 类型:整数 |
false |
|
主机 (amqp-host) |
代理主机名 类型:字符串 |
false |
|
链接名称 |
链接的名称。如果未设置,则使用通道名称。 类型:字符串 |
false |
|
密码 (amqp-password) |
用于向代理进行身份验证的密码 类型:字符串 |
false |
|
端口 (amqp-port) |
代理端口 类型:整数 |
false |
|
重新连接尝试次数 (amqp-reconnect-attempts) |
重新连接尝试次数 类型:整数 |
false |
|
重新连接间隔 (amqp-reconnect-interval) |
两次重新连接尝试之间的间隔(以秒为单位) 类型:整数 |
false |
|
SNI 服务器名称 (amqp-sni-server-name) |
如果设置,则显式覆盖用于 TLS SNI 服务器名称的主机名 类型:字符串 |
false |
|
选择器 |
设置消息选择器。此属性用于在源端点上定义 类型:字符串 |
false |
|
已启用跟踪 |
是否启用(默认)或禁用跟踪 类型:布尔值 |
false |
|
使用 SSL (amqp-use-ssl) |
AMQP 连接是否使用 SSL/TLS 类型:布尔值 |
false |
|
用户名 (amqp-username) |
用于向代理进行身份验证的用户名 类型:字符串 |
false |
|
虚拟主机 (amqp-virtual-host) |
如果设置,配置用于连接 AMQP 打开帧和 TLS SNI 服务器名称(如果正在使用 TLS)的主机名值 类型:字符串 |
false |
传出通道配置
属性 (别名) | 描述 | 强制 | 默认 |
---|---|---|---|
地址 |
AMQP 地址。如果未设置,则使用通道名称 类型:字符串 |
false |
|
功能 |
发送者或接收者客户端提出的功能的逗号分隔列表。 类型:字符串 |
false |
|
客户端选项名称 (amqp-client-options-name) |
用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称 类型:字符串 |
false |
|
云事件 |
启用(默认)或禁用云事件支持。如果在传入通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在传出通道上启用,如果消息包含云事件元数据,连接器会将传出消息作为云事件发送。 类型:布尔值 |
false |
|
云事件数据内容类型 (cloud-events-default-data-content-type) |
配置传出云事件的默认 类型:字符串 |
false |
|
云事件数据模式 (cloud-events-default-data-schema) |
配置传出云事件的默认 类型:字符串 |
false |
|
云事件插入时间戳 (cloud-events-default-timestamp) |
连接器是否应自动将 类型:布尔值 |
false |
|
云事件模式 |
云事件模式( 类型:字符串 |
false |
|
云事件源 (cloud-events-default-source) |
配置传出云事件的默认 类型:字符串 |
false |
|
云事件主题 (cloud-events-default-subject) |
配置传出云事件的默认 类型:字符串 |
false |
|
云事件类型 (cloud-events-default-type) |
配置传出云事件的默认 类型:字符串 |
false |
|
连接超时 (amqp-connect-timeout) |
连接超时时间(以毫秒为单位) 类型:整数 |
false |
|
容器 ID |
AMQP 容器 ID 类型:字符串 |
false |
|
信用额度检索周期 |
两次尝试检索代理授予的信用额度之间的周期(以毫秒为单位)。当发送者用完信用额度时,使用此时间。 类型:整数 |
false |
|
持久 |
是否将发送的 AMQP 消息标记为持久 类型:布尔值 |
false |
|
运行状况超时 |
等待以确定与代理的连接是否仍为就绪性检查建立的最大秒数。超过该阈值后,检查将被视为失败。 类型:整数 |
false |
|
主机 (amqp-host) |
代理主机名 类型:字符串 |
false |
|
链接名称 |
链接的名称。如果未设置,则使用通道名称。 类型:字符串 |
false |
|
合并 |
连接器是否应允许多个上游 类型:布尔值 |
false |
|
密码 (amqp-password) |
用于向代理进行身份验证的密码 类型:字符串 |
false |
|
端口 (amqp-port) |
代理端口 类型:整数 |
false |
|
重新连接尝试次数 (amqp-reconnect-attempts) |
重新连接尝试次数 类型:整数 |
false |
|
重新连接间隔 (amqp-reconnect-interval) |
两次重新连接尝试之间的间隔(以秒为单位) 类型:整数 |
false |
|
SNI 服务器名称 (amqp-sni-server-name) |
如果设置,则显式覆盖用于 TLS SNI 服务器名称的主机名 类型:字符串 |
false |
|
已启用跟踪 |
是否启用(默认)或禁用跟踪 类型:布尔值 |
false |
|
TTL |
发送的 AMQP 消息的生存时间。0 禁用 TTL 类型:长整型 |
false |
|
使用匿名发送者 |
连接器是否应使用匿名发送者。如果代理支持,则默认值为 类型:布尔值 |
false |
|
使用 SSL (amqp-use-ssl) |
AMQP 连接是否使用 SSL/TLS 类型:布尔值 |
false |
|
用户名 (amqp-username) |
用于向代理进行身份验证的用户名 类型:字符串 |
false |
|
虚拟主机 (amqp-virtual-host) |
如果设置,配置用于连接 AMQP 打开帧和 TLS SNI 服务器名称(如果正在使用 TLS)的主机名值 类型:字符串 |
false |
有条件地配置通道
您可以使用特定配置文件配置通道。因此,仅当启用指定的配置文件时,才会配置通道(并将其添加到应用程序)。
要实现此目的,您需要
-
使用
%my-profile
前缀mp.messaging.[incoming|outgoing].$channel
条目,例如%my-profile.mp.messaging.[incoming|outgoing].$channel.key=value
-
在包含
@Incoming(channel)
和@Outgoing(channel)
注释的 CDI bean 上使用@IfBuildProfile("my-profile")
,这些注释仅需要在启用配置文件时启用。
请注意,响应式消息传递会验证图形是否完整。因此,当使用此类条件配置时,请确保应用程序在启用和禁用配置文件的情况下都能工作。
请注意,此方法也可用于基于配置文件更改通道配置。