编辑此页面

Reactive Messaging AMQP 1.0 连接器参考文档

本指南是 AMQP 1.0 入门的配套文档。它更详细地解释了响应式消息传递的 AMQP 连接器的配置和使用。

本文档未涵盖连接器的所有细节。有关更多详细信息,请参阅 SmallRye 响应式消息传递网站

AMQP 连接器允许 Quarkus 应用程序使用 AMQP 1.0 协议发送和接收消息。有关该协议的更多详细信息,请参见 AMQP 1.0 规范。请务必注意,AMQP 1.0 和 AMQP 0.9.1(由 RabbitMQ 实现)不兼容。查看 使用 RabbitMQ 获取更多详情。

AMQP 连接器扩展

要使用连接器,您需要添加 quarkus-messaging-amqp 扩展。

您可以使用以下方式将扩展添加到您的项目中

CLI
quarkus extension add quarkus-messaging-amqp
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
Gradle
./gradlew addExtension --extensions='quarkus-messaging-amqp'

或者直接将以下依赖项添加到您的项目中

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

添加到您的项目后,您可以通过配置 connector 属性将通道映射到 AMQP 地址

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
连接器自动附加

如果您的类路径上只有一个连接器,您可以省略 connector 属性配置。Quarkus 会自动将孤立通道与类路径上找到的(唯一)连接器关联。孤立通道是没有下游使用者的传出通道,或没有上游生产者的传入通道。

可以使用以下方式禁用此自动附加功能

quarkus.messaging.auto-connector-attachment=false

配置 AMQP 代理访问

AMQP 连接器连接到 AMQP 1.0 代理,例如 Apache ActiveMQ 或 Artemis。要配置代理的位置和凭据,请在 application.properties 中添加以下属性

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 配置代理/路由器主机名。您可以按通道(使用 host 属性)或全局(使用 amqp-host)进行配置
2 配置代理/路由器端口。您可以按通道(使用 port 属性)或全局(使用 amqp-port)进行配置。默认值为 5672
3 配置代理/路由器用户名(如果需要)。您可以按通道(使用 username 属性)或全局(使用 amqp-username)进行配置。
4 配置代理/路由器密码(如果需要)。您可以按通道(使用 password 属性)或全局(使用 amqp-password)进行配置。
5 指示 prices 通道由 AMQP 连接器管理

在开发模式下以及运行测试时,AMQP 的开发服务会自动启动 AMQP 代理。

接收 AMQP 消息

假设您的应用程序接收 Message<Double>。您可以直接使用有效负载

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,您可以检索 Message<Double>

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

入站元数据

来自 AMQP 的消息在元数据中包含 IncomingAmqpMetadata 的实例。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

反序列化

连接器将传入的 AMQP 消息转换为响应式消息传递 Message<T> 实例。T 取决于接收到的 AMQP 消息的主体

AMQP 类型系统定义了支持的类型。

AMQP 主体类型 <T>

AMQP 值,包含 AMQP 原始类型

相应的 Java 类型

使用 Binary 类型的 AMQP 值

byte[]

AMQP 序列

List

AMQP 数据(带有二进制内容),并且 content-type 设置为 application/json

JsonObject

具有不同 content-type 的 AMQP 数据

byte[]

如果您使用此 AMQP 连接器(出站连接器)发送对象,则它会被编码为 JSON 并作为二进制文件发送。content-type 设置为 application/json。因此,您可以按如下方式重建对象

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price 实例由连接器自动编码为 JSON
2 您可以使用 JsonObject 接收它
3 然后,您可以使用 mapTo 方法重建实例
mapTo 方法使用 Quarkus Jackson 映射器。查看 本指南以了解有关映射器配置的更多信息。

确认

当与 AMQP 消息关联的响应式消息传递消息被确认时,它会通知代理该消息已被接受

失败管理

如果由 AMQP 消息生成的消息被否定确认,则会应用失败策略。AMQP 连接器支持六种策略

  • fail - 使应用程序失败;将不再处理 AMQP 消息(默认)。AMQP 消息被标记为拒绝。

  • accept - 此策略将 AMQP 消息标记为已接受。处理继续,忽略失败。请参阅 接受的传递状态文档

  • release - 此策略将 AMQP 消息标记为已释放。处理继续,处理下一条消息。代理可以重新传递该消息。请参阅 已释放的传递状态文档

  • reject - 此策略将 AMQP 消息标记为拒绝。处理继续,处理下一条消息。请参阅 拒绝的传递状态文档

  • modified-failed - 此策略将 AMQP 消息标记为已修改,并指示它失败(带有 delivery-failed 属性)。处理继续,处理下一条消息,但代理可能会尝试重新传递该消息。请参阅 已修改的传递状态文档

  • modified-failed-undeliverable-here - 此策略将 AMQP 消息标记为已修改,并指示它失败(带有 delivery-failed 属性)。它还指示应用程序无法处理该消息,这意味着代理不会尝试将该消息重新传递到此节点。处理继续,处理下一条消息。请参阅 已修改的传递状态文档

发送 AMQP 消息

序列化

发送 Message<T> 时,连接器将消息转换为 AMQP 消息。有效负载将转换为 AMQP 消息主体

T AMQP 消息主体

原始类型或 String

带有有效负载的 AMQP 值

InstantUUID

使用相应 AMQP 类型的 AMQP 值

JsonObjectJsonArray

使用二进制内容的 AMQP 数据。content-type 设置为 application/json

io.vertx.mutiny.core.buffer.Buffer

使用二进制内容的 AMQP 数据。未设置 content-type

任何其他类

有效负载将转换为 JSON(使用 Json 映射器)。结果包装到使用二进制内容的 AMQP 数据中。content-type 设置为 application/json

如果消息有效负载无法序列化为 JSON,则消息会被否定确认

出站元数据

发送 Messages 时,您可以添加 OutgoingAmqpMetadata 的实例以影响消息发送到 AMQP 的方式。例如,您可以配置主题、属性

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

动态地址名称

有时需要动态选择消息的目标地址。在这种情况下,您不应在应用程序配置文件中配置地址,而应使用出站元数据来设置地址。

例如,您可以根据传入消息发送到动态地址

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
为了能够按消息设置地址,连接器正在使用匿名发送者

确认

默认情况下,当代理确认消息时,响应式消息传递 Message 会被确认。当使用路由器时,此确认可能不会启用。在这种情况下,请配置 auto-acknowledgement 属性,以便在消息发送到路由器后立即确认消息。

如果 AMQP 消息被代理拒绝/释放/修改(或无法成功发送),则消息会被否定确认。

背压和信用额度

背压由 AMQP 信用额度处理。出站连接器仅请求允许的信用额度。当信用额度达到 0 时,它会等待(以非阻塞方式)直到代理授予 AMQP 发送者更多信用额度。

配置 AMQP 地址

您可以使用 address 属性配置 AMQP 地址

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

如果未设置 address 属性,则连接器使用通道名称。

要使用现有队列,您需要配置 addresscontainer-id 以及可选的 link-name 属性。例如,如果您配置了具有以下内容的 Apache Artemis 代理

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

您需要以下配置

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

如果队列名称不是通道名称,您可能需要配置 link-name 属性

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

要使用 MULTICAST 队列,您需要提供 FQQN(完全限定队列名称),而不仅仅是队列的名称

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

有关 AMQP 地址模型的更多详细信息,请参见 Artemis 文档

执行模型和阻塞处理

响应式消息传递在 I/O 线程上调用您的方法。有关此主题的更多详细信息,请参见 Quarkus 响应式架构文档。但是,您经常需要将响应式消息传递与阻塞处理(例如数据库交互)相结合。为此,您需要使用 @Blocking 注释,指示处理是阻塞的,不应在调用者线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入的有效负载存储到数据库中

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,您可以同时使用两者。第一个提供了更细粒度的调整,例如要使用的工作池以及是否保留顺序。第二个也与 Quarkus 的其他响应式功能一起使用,它使用默认的工作池并保留顺序。

@RunOnVirtualThread

有关在 Java 虚拟线程上运行阻塞处理的信息,请参见 Quarkus 虚拟线程支持与响应式消息传递文档

@Transactional

如果您的方法使用 @Transactional 注释,则即使该方法未使用 @Blocking 注释,也会自动将其视为阻塞方法。

自定义底层 AMQP 客户端

连接器在底层使用 Vert.x AMQP 客户端。有关此客户端的更多详细信息,请参见 Vert.x 网站

您可以通过按如下方式生成 AmqpClientOptions 的实例来自定义底层客户端配置

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

此实例将被检索并用于配置连接器使用的客户端。您需要使用 client-options-name 属性指示客户端的名称

mp.messaging.incoming.prices.client-options-name=my-named-options

如果您遇到与代理频繁断开连接的情况,则 AmqpClientOptions 也可以用于设置心跳,如果您需要永久保持 AMQP 连接。某些代理可能会在一定的空闲超时后终止 AMQP 连接。您可以提供一个心跳值,Vert.x proton 客户端将使用该值在打开到远程对等方的传输时通知空闲超时。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

TLS 配置

AMQP 1.0 消息传递扩展与 Quarkus TLS 注册表集成以配置 Vert.x AMQP 客户端。

要为 AMQP 1.0 通道配置 TLS,您需要在 application.properties 中提供命名的 TLS 配置

quarkus.tls.your-tls-config.trust-store.pem.certs=ca.crt,ca2.pem
# ...
mp.messaging.incoming.prices.tls-configuration-name=your-tls-config

运行状况报告

如果您将 AMQP 连接器与 quarkus-smallrye-health 扩展一起使用,它将有助于就绪性和活跃性探测。AMQP 连接器报告由连接器管理的每个通道的就绪性和活跃性。目前,AMQP 连接器对就绪性检查和活跃性检查使用相同的逻辑。

要禁用运行状况报告,请将通道的 health-enabled 属性设置为 false。在入站端(从 AMQP 接收消息),该检查会验证接收者是否已连接到代理。在出站端(向 AMQP 发送记录),该检查会验证发送者是否已连接到代理。

请注意,消息处理失败会否定确认消息,然后由 failure-strategy 处理。failure-strategy 负责报告失败并影响检查结果。fail 失败策略报告失败,因此检查将报告故障。

使用 RabbitMQ

此连接器适用于 AMQP 1.0。RabbitMQ 实现 AMQP 0.9.1。默认情况下,RabbitMQ 不提供 AMQP 1.0,但有一个插件可以实现。要将 RabbitMQ 与此连接器一起使用,请启用并配置 AMQP 1.0 插件。

尽管存在该插件,但一些 AMQP 1.0 功能无法与 RabbitMQ 一起使用。因此,我们建议以下配置。

要从 RabbitMQ 接收消息

  • 将 durable 设置为 false

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

要将消息发送到 RabbitMQ

  • 设置目标地址(不支持匿名发送者)

  • use-anonymous-sender 设置为 false

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

因此,在使用 RabbitMQ 时,无法动态更改目标地址(使用消息元数据)。

接收云事件

AMQP 连接器支持 云事件。当连接器检测到结构化二进制云事件时,它会将 IncomingCloudEventMetadata<T> 添加到 Message 的元数据中。IncomingCloudEventMetadata 包含对强制和可选云事件属性的访问器。

如果连接器无法提取云事件元数据,它将发送不带元数据的 Message。

有关接收云事件的更多信息,请参见 SmallRye 响应式消息传递文档中的 接收云事件

发送云事件

AMQP 连接器支持 云事件。如果满足以下条件,连接器会将出站记录作为云事件发送

  • 消息元数据包含 io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata 实例,

  • 通道配置定义了 cloud-events-typecloud-events-source 属性。

有关发送云事件的更多信息,请参见 SmallRye 响应式消息传递文档中的 发送云事件

AMQP 连接器配置参考

Quarkus 特定配置

构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖

配置属性

类型

默认

开发服务

类型

默认

是否已显式启用或禁用 AMQP 的开发服务。除非存在现有配置,否则开发服务通常默认启用。对于 AMQP,开发服务会启动代理,除非设置了 amqp-hostamqp-port,或者所有响应式消息传递 AMQP 通道都配置了 hostport

环境变量:QUARKUS_AMQP_DEVSERVICES_ENABLED

显示更多

布尔值

开发服务将侦听的可选固定端口。

如果未定义,将随机选择端口。

环境变量:QUARKUS_AMQP_DEVSERVICES_PORT

显示更多

整数

要使用的镜像。请注意,仅支持 ActiveMQ Artemis 镜像。具体来说,镜像存储库必须以 artemiscloud/activemq-artemis-broker 结尾。

请查看 Quay 页面上的 activemq-artemis-broker 以查找可用版本。

环境变量:QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

显示更多

字符串

quay.io/artemiscloud/activemq-artemis-broker:1.0.25

要传递给容器的 AMQ_EXTRA_ARGS 环境变量的值。对于 ActiveMQ Artemis Broker ⇐ 1.0.21,请将此属性设置为 --no-autotune --mapped --no-fsync --relax-jolokia --http-host 0.0.0.0

环境变量:QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

显示更多

字符串

--no-autotune --mapped --no-fsync --relax-jolokia

指示由 Quarkus 开发服务管理的 AMQP 代理是否共享。共享时,Quarkus 会使用基于标签的服务发现查找正在运行的容器。如果找到匹配的容器,则会使用该容器,因此不会启动第二个容器。否则,AMQP 的开发服务将启动一个新的容器。

发现使用 quarkus-dev-service-amqp 标签。该值使用 service-name 属性配置。

容器共享仅在开发模式下使用。

环境变量:QUARKUS_AMQP_DEVSERVICES_SHARED

显示更多

布尔值

true

附加到启动的容器的 quarkus-dev-service-aqmp 标签的值。当 shared 设置为 true 时,会使用此属性。在这种情况下,在启动容器之前,AMQP 的开发服务会查找 quarkus-dev-service-amqp 标签设置为配置值的容器。如果找到,它将使用此容器,而不是启动新容器。否则,它会启动一个新容器,并将 quarkus-dev-service-amqp 标签设置为指定的值。

当您需要多个共享 AMQP 代理时,会使用此属性。

环境变量:QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

显示更多

字符串

amqp

传递给容器的环境变量。

环境变量:QUARKUS_AMQP_DEVSERVICES_CONTAINER_ENV__ENVIRONMENT_VARIABLE_NAME_

显示更多

Map<String,String>

传入通道配置

属性 (别名) 描述 强制 默认

地址

AMQP 地址。如果未设置,则使用通道名称

类型:字符串

false

自动确认

是否必须在收到 AMQP 消息时进行确认

类型:布尔值

false

false

广播

是否必须将收到的 AMQP 消息分派给多个订阅者

类型:布尔值

false

false

功能

发送者或接收者客户端提出的功能的逗号分隔列表。

类型:字符串

false

客户端选项名称

(amqp-client-options-name)

用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称

类型:字符串

false

云事件

启用(默认)或禁用云事件支持。如果在传入通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在传出通道上启用,如果消息包含云事件元数据,连接器会将传出消息作为云事件发送。

类型:布尔值

false

true

连接超时

(amqp-connect-timeout)

连接超时时间(以毫秒为单位)

类型:整数

false

1000

容器 ID

AMQP 容器 ID

类型:字符串

false

持久

AMQP 订阅是否持久

类型:布尔值

false

false

失败策略

指定由 AMQP 消息生成的消息被否定确认时要应用的失败策略。可接受的值为 fail(默认)、acceptreleaserejectmodified-failedmodified-failed-undeliverable-here

类型:字符串

false

失败

运行状况超时

等待以确定与代理的连接是否仍为就绪性检查建立的最大秒数。超过该阈值后,检查将被视为失败。

类型:整数

false

3

主机

(amqp-host)

代理主机名

类型:字符串

false

localhost

链接名称

链接的名称。如果未设置,则使用通道名称。

类型:字符串

false

密码

(amqp-password)

用于向代理进行身份验证的密码

类型:字符串

false

端口

(amqp-port)

代理端口

类型:整数

false

5672

重新连接尝试次数

(amqp-reconnect-attempts)

重新连接尝试次数

类型:整数

false

100

重新连接间隔

(amqp-reconnect-interval)

两次重新连接尝试之间的间隔(以秒为单位)

类型:整数

false

10

SNI 服务器名称

(amqp-sni-server-name)

如果设置,则显式覆盖用于 TLS SNI 服务器名称的主机名

类型:字符串

false

选择器

设置消息选择器。此属性用于在源端点上定义 apache.org:selector-filter:string 过滤器,使用基于 SQL 的语法请求服务器筛选器,以确定哪些消息传递给接收者(如果服务器支持)。具体支持的功能和所需的语法可能会因服务器而异。

类型:字符串

false

已启用跟踪

是否启用(默认)或禁用跟踪

类型:布尔值

false

true

使用 SSL

(amqp-use-ssl)

AMQP 连接是否使用 SSL/TLS

类型:布尔值

false

false

用户名

(amqp-username)

用于向代理进行身份验证的用户名

类型:字符串

false

虚拟主机

(amqp-virtual-host)

如果设置,配置用于连接 AMQP 打开帧和 TLS SNI 服务器名称(如果正在使用 TLS)的主机名值

类型:字符串

false

传出通道配置

属性 (别名) 描述 强制 默认

地址

AMQP 地址。如果未设置,则使用通道名称

类型:字符串

false

功能

发送者或接收者客户端提出的功能的逗号分隔列表。

类型:字符串

false

客户端选项名称

(amqp-client-options-name)

用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称

类型:字符串

false

云事件

启用(默认)或禁用云事件支持。如果在传入通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在传出通道上启用,如果消息包含云事件元数据,连接器会将传出消息作为云事件发送。

类型:布尔值

false

true

云事件数据内容类型

(cloud-events-default-data-content-type)

配置传出云事件的默认 datacontenttype 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 datacontenttype 属性,则使用此值

类型:字符串

false

云事件数据模式

(cloud-events-default-data-schema)

配置传出云事件的默认 dataschema 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 dataschema 属性,则使用此值

类型:字符串

false

云事件插入时间戳

(cloud-events-default-timestamp)

连接器是否应自动将 time 属性插入到传出云事件中。需要将 cloud-events 设置为 true。如果消息本身未配置 time 属性,则使用此值

类型:布尔值

false

true

云事件模式

云事件模式(structuredbinary(默认))。指示如何在传出记录中写入云事件

类型:字符串

false

二进制

云事件源

(cloud-events-default-source)

配置传出云事件的默认 source 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 source 属性,则使用此值

类型:字符串

false

云事件主题

(cloud-events-default-subject)

配置传出云事件的默认 subject 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 subject 属性,则使用此值

类型:字符串

false

云事件类型

(cloud-events-default-type)

配置传出云事件的默认 type 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 type 属性,则使用此值

类型:字符串

false

连接超时

(amqp-connect-timeout)

连接超时时间(以毫秒为单位)

类型:整数

false

1000

容器 ID

AMQP 容器 ID

类型:字符串

false

信用额度检索周期

两次尝试检索代理授予的信用额度之间的周期(以毫秒为单位)。当发送者用完信用额度时,使用此时间。

类型:整数

false

2000

持久

是否将发送的 AMQP 消息标记为持久

类型:布尔值

false

false

运行状况超时

等待以确定与代理的连接是否仍为就绪性检查建立的最大秒数。超过该阈值后,检查将被视为失败。

类型:整数

false

3

主机

(amqp-host)

代理主机名

类型:字符串

false

localhost

链接名称

链接的名称。如果未设置,则使用通道名称。

类型:字符串

false

合并

连接器是否应允许多个上游

类型:布尔值

false

false

密码

(amqp-password)

用于向代理进行身份验证的密码

类型:字符串

false

端口

(amqp-port)

代理端口

类型:整数

false

5672

重新连接尝试次数

(amqp-reconnect-attempts)

重新连接尝试次数

类型:整数

false

100

重新连接间隔

(amqp-reconnect-interval)

两次重新连接尝试之间的间隔(以秒为单位)

类型:整数

false

10

SNI 服务器名称

(amqp-sni-server-name)

如果设置,则显式覆盖用于 TLS SNI 服务器名称的主机名

类型:字符串

false

已启用跟踪

是否启用(默认)或禁用跟踪

类型:布尔值

false

true

TTL

发送的 AMQP 消息的生存时间。0 禁用 TTL

类型:长整型

false

0

使用匿名发送者

连接器是否应使用匿名发送者。如果代理支持,则默认值为 true,否则为 false。如果不支持,则无法动态更改目标地址。

类型:布尔值

false

使用 SSL

(amqp-use-ssl)

AMQP 连接是否使用 SSL/TLS

类型:布尔值

false

false

用户名

(amqp-username)

用于向代理进行身份验证的用户名

类型:字符串

false

虚拟主机

(amqp-virtual-host)

如果设置,配置用于连接 AMQP 打开帧和 TLS SNI 服务器名称(如果正在使用 TLS)的主机名值

类型:字符串

false

有条件地配置通道

您可以使用特定配置文件配置通道。因此,仅当启用指定的配置文件时,才会配置通道(并将其添加到应用程序)。

要实现此目的,您需要

  1. 使用 %my-profile 前缀 mp.messaging.[incoming|outgoing].$channel 条目,例如 %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. 在包含 @Incoming(channel)@Outgoing(channel) 注释的 CDI bean 上使用 @IfBuildProfile("my-profile"),这些注释仅需要在启用配置文件时启用。

请注意,响应式消息传递会验证图形是否完整。因此,当使用此类条件配置时,请确保应用程序在启用和禁用配置文件的情况下都能工作。

请注意,此方法也可用于基于配置文件更改通道配置。

相关内容