编辑此页面

Quarkus Messaging 扩展

事件驱动的消息传递系统已成为大多数现代应用程序的支柱,能够构建消息驱动的微服务或复杂的数据流管道。

Quarkus 提供了一套全面的消息传递扩展,旨在轻松地与领先的消息传递技术同步。这使开发人员能够专注于构建核心应用程序逻辑,从而无需深入研究各个 API 和消息传递基础设施的复杂性。

Quarkus Messaging

本页面重点介绍所有消息传递扩展的常见功能和开发模型。

其中一些扩展在核心 Quarkus 存储库中维护

一些扩展由社区贡献和维护

其他连接器,例如 JMS 连接器Google PubSub 连接器,没有得到相同级别的集成,需要更多手动配置才能设置。

另一方面,一些与消息传递相关的扩展提出了低级别的提供商特定集成。 本页涵盖的支持级别不涉及这些低级别扩展。 此类扩展的一个非详尽列表如下

Quarkus 消息传递开发模型

Quarkus 通过建立一个统一的模型来发布、消费和处理消息,从而简化了消息驱动的应用程序开发,无论底层代理技术是使用消息队列还是事件流。 构建在 MicroProfile Reactive Messaging 规范之上,Quarkus 消息传递扩展确保与这些技术的无缝集成。 重要的是,熟练掌握反应式编程不是利用这些能力的先决条件。

Reactive Messaging 规范定义了一个基于 CDI 的编程模型,用于实现事件驱动和消息驱动的应用程序。 使用一小组注解,CDI Bean 成为实现与消息代理交互的构建块。 这些交互通过通道发生,应用程序组件在这些通道中读取和写入消息。

通道由唯一名称标识,并使用一组注解声明。

@Incoming@Outgoing 注解

@Incoming@Outgoing 方法注解定义通道,允许从消息代理消费消息以及向消息代理生产消息

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   @Outgoing("sink")
   public String process(String consumedPayload) {
       // Process the incoming message payload and return an updated payload
       return consumedPayload.toUpperCase();
   }

}

@Outgoing 可以单独在方法中使用以生成消息

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageGeneratorBean {

   @Outgoing("sink")
   public Multi<String> generate() {
       return Multi.createFrom().items("a", "b", "c");
   }

}

@Incoming 可以单独使用以消费消息

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   public void process(String consumedPayload) {
       // process the payload
       consumedPayload.toUpperCase();
   }

}

请注意,不应直接从代码中调用使用 @Incoming 和/或 @Outgoing 注解的方法。 它们由框架调用。 让用户代码调用它们不会产生预期的结果。

您可以在 SmallRye Reactive Messaging – 支持的签名 中阅读有关支持的方法签名的更多信息。

Emitters 和 @Channel 注解

应用程序通常需要将消息传递与应用程序的其他部分结合起来,例如,从 HTTP 端点生成消息,或将消费的消息作为响应流式传输。

要将消息从命令式代码发送到特定通道,您需要注入由 @Channel 注解标识的 Emitter 对象

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   Emitter<Double> emitter;

   @GET
   @Path("/send")
   public CompletionStage<Void> send(double d) {
       return emitter.send(d);
   }
}

@Channel 注解使您可以指示将有效负载或消息发送到哪个通道。 Emitter 允许缓冲发送到通道的消息。

为了获得更多控制,使用 Mutiny API,您可以使用 MutinyEmitter 发射器接口

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.MutinyEmitter;

import org.eclipse.microprofile.reactive.messaging.Channel;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   MutinyEmitter<Double> emitter;

   @GET
   @Path("/send")
   public void send(double d) {
       emitter.sendAndAwait(d);
   }

}

@Channel 注解也可以用于从传入通道注入消息流

import org.eclipse.microprofile.reactive.messaging.Channel;

@ApplicationScoped
@Path("/")
public class SseResource {

    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }

}

使用 @Channel 消费消息时,应用程序代码负责订阅流。 在上面的示例中,Quarkus REST (以前的 RESTEasy Reactive) 端点为您处理。

您可以在 SmallRye Reactive Messaging – 发射器和通道 文档中阅读有关发射器和通道的更多信息。

消息和元数据

Message 是有效负载周围的信封。 在上面的示例中,仅使用了有效负载,但每个有效负载在 Quarkus 消息传递中都在内部包装在 Message 周围。

Message<T> 接口将 <T> 类型的有效负载与 Metadata(一组任意对象和用于确认 (ack) 和否定确认 (nack) 的异步操作)相关联。

import org.eclipse.microprofile.reactive.messaging.Message;

@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
    // Access the metadata
    MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
    // Process the incoming message and return an updated message
    return consumed.withPayload(consumed.getPayload().toUpperCase());
}

当消息的处理或接收成功时,消息会被确认回代理。 消息之间的确认是链接的,这意味着在处理消息时,传出消息的确认会触发传入消息的确认。 在大多数情况下,ack 和 nack 由您管理,并且连接器允许您为每个通道配置不同的策略。 因此,通常不需要直接与 Message 接口交互。 只有高级用例才需要直接处理 Message。

另一方面,访问 Metadata 在许多情况下可能很实用。 连接器将特定的元数据对象添加到消息中,以允许访问消息头、属性和其他特定于连接器的信息。 您无需与 Message 接口交互即可访问特定于连接器的元数据。 您只需在有效负载参数之后将元数据对象作为方法参数注入即可

import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
    // Access the metadata
    Map<String, Object> props = my.getProperties();
    // Process the payload and return an updated payload
    return payload.toUpperCase();
}

根据连接器的不同,可用于在处理方法中消费的有效负载类型也不同。 您可以实现自定义 MessageConverter 以将有效负载转换为应用程序接受的类型。

通道配置

可以使用 mp.messaging.incoming.<channel-name>mp.messaging.outgoing.<channel-name> 配置属性配置通道属性。

例如,配置 Kafka 连接器以使用自定义反序列化器从 my-topic 主题消费消息

mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest

所有通道都需要 connector 属性,并指定要使用的连接器。 如果您的类路径上只有一个连接器,则可以省略此配置,因为 Quarkus 将自动选择连接器。

可以使用连接器名称配置全局通道属性

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

连接器特定属性在连接器文档中列出。

通道连接和消息传递模式

在启动时,Quarkus 会分析声明的通道以将它们连接在一起,并验证所有通道是否已连接。 具体来说,每个通道都会创建一个消息反应流,该流连接到另一个通道的消息反应流。 遵循反应流协议,在通道之间强制执行反压机制,从而可以控制应用程序资源的使用,而不会过度提交和使系统的部分过载。

另一方面,不可能在运行时以编程方式创建新通道。 但是,有很多模式可以让您实现大多数(如果不是全部)消息传递和集成用例

一些消息传递技术允许消费者订阅一组主题或队列,并允许生产者在消息基础上将消息发送到特定主题。 如果您确定需要在运行时动态配置和创建客户端,则应考虑直接使用低级别客户端。

内部通道

在某些用例中,使用消息传递模式在同一应用程序内传输消息很方便。 当您未将通道连接到消息传递后端(即连接器)时,所有操作都在应用程序内部进行,并且流通过将方法链接在一起创建。 每个链仍然是一个反应流并强制执行反压协议。

该框架会验证生产者/消费者链是否完整,这意味着如果应用程序将消息写入内部通道(使用仅带有 @Outgoing 的方法或 Emitter),则还必须从应用程序内部消费消息(使用仅带有 @Incoming 的方法或使用非托管流)。

启用/禁用通道

默认情况下,所有定义的通道都已启用,但是可以使用配置禁用通道

mp.messaging.incoming.my-channel.enabled=false

这可以与 Quarkus 构建配置文件一起使用,以基于某些构建时条件(例如目标环境)启用/禁用通道。 禁用通道时,您需要确保两件事

  • 已禁用通道的使用位于可以在构建时过滤掉的 Bean 中,

  • 如果没有通道,其余通道仍然可以正常工作。

@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {

    @Outgoing("my-channel")
    public Multi<String> generate() {
        return Multi.createFrom().items("a", "b", "c");
    }

}

可暂停通道

默认情况下,注入的 @Channel 流未订阅,因此消息流由应用程序代码使用反应流和 Mutiny API 控制。 但是对于 @Incoming 方法,消息流由运行时控制。

可暂停通道提供了一种以编程方式控制消息流的机制。 这在生产者或消费者由于管理生命周期或执行维护操作而需要暂时停止的情况下很有用。

要使用可暂停通道,您需要使用配置属性 pausable 设置为 true 来激活它。

mp.messaging.incoming.my-channel.pausable=true
# optional, by default the channel is NOT paused initially
mp.messaging.outgoing.my-channel.initially-paused=true

如果将通道配置为可暂停,则可以通过通道名称从 ChannelRegistry 以编程方式获取 PausableChannel,并根据需要暂停或恢复通道

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;

@ApplicationScoped
public class PausableController {

    @Inject
    ChannelRegistry registry;

    @PostConstruct
    public void resume() {
        // Wait for the application to be ready
        // Retrieve the pausable channel
        PausableChannel pausable = registry.getPausable("my-channel");
        // Pause the processing of the messages
        pausable.resume();
    }

    public void pause() {
        // Retrieve the pausable channel
        PausableChannel pausable = registry.getPausable("my-channel");
        // Pause the processing of the messages
        pausable.pause();
    }

    @Incoming("my-channel")
    void process(String message) {
        // Process the message
    }

}

此功能独立于连接器,并且在理论上可以与任何连接器支持的通道一起使用。 请注意,暂停消息消费会对从远程代理接收消息的底层消费者施加反压。

Kafka 消费者提供了一个类似的功能来暂停和恢复从主题分区消费消息。 Quarkus Kafka 连接器允许访问底层客户端以暂停/恢复消费。

但是,默认情况下,使用 pause-if-no-requests=true 配置,连接器通过基于下游请求暂停和恢复 Kafka 消费者来自动处理反压。 因此,建议将可暂停通道与默认的 pause-if-no-requests=true 配置一起使用。

多个 Outgoings 和 @Broadcast

默认情况下,在通道中传输的消息仅分派给单个消费者。 拥有多个消费者被认为是错误的,并在部署时报告。

@Broadcast 注解更改了此行为,并指示在通道中传输的消息已分派给所有消费者。 @Broadcast 必须与 @Outgoing 注解一起使用

import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
    return i + 1;
}

@Incoming("out")
public void consume1(int i) {
    //...
}

@Incoming("out")
public void consume2(int i) {
    //...
}

@Broadcast 类似,您可以在同一方法上多次使用 @Outgoing 注解,以指示该方法生成到多个通道的消息

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
    // send messages from channel in to both channels out1 and out2
    return s.toUpperCase();
}

使用多个 Outgoings 对于实现扇出模式很有用,在这种模式中,单个消息由多个目标通道处理。

您可以通过从处理方法返回 Targeted 来有选择地将消息分派到多个 outgoings

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
    // send messages from channel-in to both channel-out1 and channel-out2
    Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
    if (price > 90.0) {
        return targeted.with("out3", price);
    }
    return targeted;
}

多个 Incomings 和 @Merge

默认情况下,单个生产者可以在通道中传输消息。 拥有多个生产者被认为是错误的,并在部署时报告。 @Merge 注解更改了此行为,并指示通道可以有多个生产者。 @Merge 必须与 @Incoming 注解一起使用

@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
    return i + 1;
}

@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
    return i * 2;
}

@Incoming("out")
@Merge
public void getAll(int i) {
    //...
}

@Merge 类似,您可以在同一方法上多次使用 @Incoming 注解,以指示该方法从多个通道消费消息

@Incoming("in1")
@Incoming("in2")
public String process(String s) {
    // get messages from channel-1 and channel-2
    return s.toUpperCase();
}

流处理

在某些高级场景中,您可以直接操作消息流,而不是每个单独的消息。

在传入和传出签名中使用 Mutiny API 允许您处理消息流

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class StreamProcessor {

    @Incoming("source")
    @Outgoing("sink")
    public Multi<String> process(Multi<String> in) {
        return in.map(String::toUpperCase);
    }

}

执行模型

Quarkus Messaging 位于 Quarkus 的 反应引擎之上,并利用 Eclipse Vert.x 来分派消息以进行处理。 它支持三种执行模式

  • 事件循环,其中消息在 Vert.x I/O 线程上分派。 请记住,您不应在事件循环上执行阻塞操作。

  • 工作线程,其中消息在工作线程池上分派。

  • 虚拟线程,其中消息在虚拟线程上分派(需要 Java 21+)。 由于虚拟线程未池化,因此为每条消息创建一个新的虚拟线程。 有关更多信息,请参阅专门的 Quarkus 虚拟线程支持 指南。

Quarkus 根据方法签名选择默认执行模式。 如果方法签名是同步的,则消息在工作线程上分派,否则默认为事件循环

方法签名 默认执行模式

@Incoming("source") void process(String payload)

工作线程

@Incoming("source") Uni<Void> process(String payload)

事件循环

@Incoming("source") CompletionStage<Void> process(Message<String> message)

事件循环

@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in)

流处理方法在启动时执行,然后每条消息都在事件循环上分派。

可以使用注解对执行模型进行细粒度控制

  • @Blocking 将强制该方法在工作线程池上执行。 默认的工作线程池在所有通道之间共享。 使用 @Blocking("my-custom-pool"),您可以使用自定义线程池配置通道。 配置属性 smallrye.messaging.worker.my-custom-pool.max-concurrency 指定池中的最大线程数。 您可以在 SmallRye Reactive Messaging 文档中阅读有关阻塞处理的更多信息。

  • @NonBlocking 将强制该方法在事件循环线程上执行。

  • @RunOnVirtualThread 将强制该方法在虚拟线程上执行。 为了利用虚拟线程的轻量级特性,使用 @RunOnVirtualThread 注解的方法的默认最大并发数为 1024。 可以通过设置 smallrye.messaging.worker.<virtual-thread>.max-concurrency 配置属性或与 @Blocking("my-custom-pool") 注解一起使用来更改此值。

@Transactional 注解的存在意味着阻塞执行。

在消息传递应用程序中,生产和消费的消息构成一个有序的事件流,要么由代理强制执行(在主题或队列中),要么由应用程序中的接收和发送顺序强制执行。 为了保持此顺序,Quarkus Messaging 默认按顺序分派消息。 您可以使用 @Blocking(ordered = false)@RunOnVirtualThread 注解覆盖此行为。

传入通道并发

某些连接器支持配置传入通道的并发级别。

mp.messaging.incoming.my-channel.concurrency=4

这会在后台创建传入通道的四个副本,并将它们连接到相同的处理方法。 根据代理技术,这可能有助于通过并发处理多个消息来提高应用程序的吞吐量,同时仍然保留在不同副本中接收的消息的部分顺序。 例如,Kafka 就是这种情况,其中多个消费者可以消费不同的主题分区。

上下文传播

在 Quarkus Messaging 中,在不同处理阶段之间传播上下文的默认机制是 消息上下文。 这提供了一种一致的方式来传递上下文信息以及通过不同阶段的消息。

当与其他扩展集成时,特别是使用 Emitters 时,它依赖于 Mutiny 上下文传播

与 Mutiny 和 MicroProfile 上下文传播的交互

Mutiny 是 Quarkus 中反应式编程的基础,它与 MicroProfile 上下文传播集成。 此集成可以自动捕获和恢复异步边界上的上下文。 要了解有关 Quarkus 和 Mutiny 中上下文传播的更多信息,请参阅 上下文传播 指南。

为了确保一致的行为,Quarkus Messaging 会禁用通过入站或出站连接器进行消息分派期间的任何上下文传播。 这意味着通过 Emitters 捕获的上下文不会传播到传出通道,并且传入通道不会通过激活上下文(例如,请求上下文)来分派消息。 可以使用 quarkus.messaging.connector-context-propagation 配置属性配置此行为,方法是列出要传播的上下文类型。 例如 quarkus.messaging.connector-context-propagation=CDI 将仅传播 CDI 上下文。

使用 Emitters 进行上下文传播

使用消息传递发射器时,默认情况下不传播上下文。

在某些情况下,您可能希望使用 内部通道将调用方上下文传播到消息处理阶段。

Quarkus 提供了 ContextualEmitter,它是 MutinyEmitterEmitter 的直接替代品,允许您在发送消息时传播上下文。 您可以使用上下文传播注解 @CurrentThreadContext 来配置将从发射器方法传播的上下文。 该注解配置将从该方法捕获和传播的上下文,并且需要在传播器方法(即发射器的调用方,而不是处理方法)上显示。

RequestScopedBean 成为请求范围的 Bean,ContextualEmitter 可以用于通过内部通道 app 在本地分派消息

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.logging.Log;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;

@Path("/")
public class Resource {

    @Channel("app")
    ContextualEmitter<String> emitter;

    @Inject
    RequestScopedBean requestScopedBean;

    @POST
    @Path("/send")
    public void send(String message) {
        requestScopedBean.setValue("Hello");
        emitter.sendAndAwait(message);
    }

}

然后可以在消息处理阶段访问请求范围的 Bean,而与 执行模型无关

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.annotations.Blocking;


@ApplicationScoped
public class Processor {

    @Inject
    RequestScopedBean requestScopedBean;

    @Incoming("app")
    @Blocking
    public void process(String message) {
        Log.infof("Message %s from request %s", message, requestScopedBean.getValue());
    }

}

您还可以使用 @CurrentThreadContext 注解来控制传播哪些上下文。 以下示例显示了如何避免将任何上下文传播到消息处理阶段

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.logging.Log;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;

import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/")
public class Resource {

    @Channel("app")
    ContextualEmitter<String> emitter;

    @Inject
    RequestScopedBean requestScopedBean;

    @POST
    @Path("/send")
    @CurrentThreadContext(propagated = {})
    public void send(String message) {
        requestScopedBean.setValue("Hello");
        emitter.sendAndAwait(message);
    }

}

在前面的示例中,请求范围上下文绑定到的执行上下文(REST 调用)控制上下文的生命周期。 这意味着当 REST 调用完成时,RequestScoped 上下文将被销毁。 因此,您需要确保在 REST 调用完成之前完成处理或消息分派。

有关更多信息,请查看 上下文传播 指南。

请求上下文激活

在某些情况下,您可能需要在处理从代理消费的消息时激活请求上下文。 虽然在 @Incoming 方法上使用 @ActivateRequestContext 是一种选择,但它的生命周期不遵循 Quarkus 消息传递消息的生命周期。 对于传入通道,您可以使用构建时属性 quarkus.messaging.request-scoped.enabled=true 启用请求范围激活。 这将为传入通道处理的每条消息激活请求上下文,并在消息处理完成后关闭上下文。

运行状况检查

Quarkus Messaging 扩展与 SmallRye Health 扩展一起提供每个通道的运行状况检查支持。 启动就绪活动检查的实现取决于连接器。 某些连接器允许配置运行状况检查行为,或完全或按通道禁用它们。

可以使用 quarkus.messaging.health.<channel-name>.enabled 或按运行状况检查类型禁用通道运行状况检查,例如 quarkus.messaging.health.<channel-name>.liveness.enabled

quarkus.messaging.health.enabled 配置属性设置为 false 将完全禁用消息传递运行状况检查。

可观察性

Micrometer 指标

Quarkus Messaging 扩展提供了简单但有用的指标来监视消息传递系统的运行状况。 Micrometer 扩展 公开了这些指标。

可以收集以下每个通道的指标,并使用 channel 标记标识

  • quarkus.messaging.message.count:生成或接收的消息数

  • quarkus.messaging.message.acks:成功处理的消息数

  • quarkus.messaging.message.failures:处理失败的消息数

  • quarkus.messaging.message.duration:消息处理的持续时间

出于向后兼容的原因,默认情况下不启用通道指标,可以使用以下方法启用:smallrye.messaging.observation.enabled=true

OpenTelemetry 跟踪

某些 Quarkus Messaging 连接器与 OpenTelemetry Tracing 开箱即用地集成。 当存在 OpenTelemetry 扩展时,传出消息会传播当前跟踪范围。 在传入通道上,如果收到的消息包含跟踪信息,则消息处理会将消息范围继承为父范围。

您可以使用以下配置为特定通道禁用跟踪

mp.messaging.incoming.data.tracing-enabled=false

TLS 配置

某些消息传递扩展与 Quarkus TLS 注册表 集成以配置底层客户端。 要在通道上配置 TLS,您需要将命名的 TLS 配置提供给 tls-configuration-name 属性

quarkus.tls.my-tls-config.trust-store=truststore.jks
quarkus.tls.my-tls-config.trust-store-password=secret
mp.messaging.incoming.my-channel.tls-configuration-name=my-tls-config

或者您可以在连接器的所有通道上全局配置它

mp.messaging.connector.smallrye-pulsar.tls-configuration-name=my-tls-config

目前,以下消息传递扩展支持通过 Quarkus TLS 注册表进行配置

  • Kafka:为 Kafka 客户端提供 ssl.engine.factory.class 属性。

  • Pulsar:仅支持 mTLS 身份验证。

  • RabbitMQ

  • AMQP 1.0

  • MQTT

测试

使用开发服务进行测试

大多数 Quarkus Messaging 扩展都提供开发服务,以简化应用程序的开发和测试。 开发服务创建一个代理实例,该实例配置为与 Quarkus Messaging 扩展开箱即用地工作。

在测试期间,Quarkus 会创建一个单独的代理实例来针对它运行测试。

您可以在 开发服务 指南中阅读有关开发服务的更多信息,包括平台扩展提供的开发服务列表。

使用 InMemoryConnector 进行测试

在不启动代理的情况下测试应用程序可能很有用。 为此,您可以将连接器管理的通道切换内存中

此方法仅适用于 JVM 测试。 它不能用于本机测试(因为它们不支持注入)。

假设我们要测试以下示例应用程序

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyMessagingApplication {

    @Inject
    @Channel("words-out")
    Emitter<String> emitter;

    public void sendMessage(String out) {
        emitter.send(out);
    }

    @Incoming("words-in")
    @Outgoing("uppercase")
    public Message<String> toUpperCase(Message<String> message) {
        return message.withPayload(message.getPayload().toUpperCase());
    }

}

首先,将以下测试依赖项添加到您的应用程序

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

然后,按如下方式创建 Quarkus 测试资源

public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in");   (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase");  (2)
        Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out");  (3)
        env.putAll(props1);
        env.putAll(props2);
        env.putAll(props3);
        return env;  (4)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (5)
    }
}
1 将传入通道 words-in(消费的消息)切换到内存中。
2 将传出通道 words-out(生成的消息)切换到内存中。
3 将传出通道 uppercase(已处理的消息)切换到内存中。
4 构建并返回一个 Map,其中包含配置应用程序以使用内存通道所需的所有属性。
5 测试停止时,清除 InMemoryConnector(放弃所有已接收和已发送的消息)

使用上面创建的测试资源创建 @QuarkusTest

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;

import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Inject
    MyMessagingApplication app;

    @Test
    void test() {
        InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
        InMemorySource<String> wordsIn = connector.source("words-in"); (3)
        InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)

        app.sendMessage("Hello"); (5)
        assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)

        wordsIn.send("Bonjour"); (7)
        await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
    }
}
1 使用 @Connector@Any 限定符将内存连接器注入到您的测试类中。
2 检索传出通道 (words-out) - 该通道必须已在测试资源中切换到内存中。
3 检索传入通道 (words-in)
4 检索传出通道 (uppercase)
5 使用注入的应用程序 Bean 调用 sendMessage 方法,以使用带有通道 words-out 的发射器发送消息。
6 words-out 内存通道上使用 received 方法来检查应用程序生成的消息。
7 words-in 内存通道上使用 send 方法发送消息。 应用程序将处理此消息并将消息发送到 uppercase 通道。
8 uppercase 通道上使用 received 方法来检查应用程序生成的消息。

内存连接器仅用于测试目的。 使用内存连接器时需要考虑一些注意事项

  • 内存连接器仅传输使用 InMemorySource#send 方法发送的对象(有效负载或配置的消息)。 应用程序方法接收的消息不会包含特定于连接器的元数据。

  • 默认情况下,内存通道在 InMemorySource#send 方法的调用方线程上分派消息,该线程将是单元测试中的主线程。 但是,大多数其他连接器处理在单独的重复 Vert.x 上下文上分派消息的上下文传播。

quarkus-test-vertx 依赖项提供了 @io.quarkus.test.vertx.RunOnVertxContext 注解,当在测试方法上使用时,会在 Vert.x 上下文中执行测试。

如果您的测试依赖于上下文传播,您可以使用 run-on-vertx-context 属性配置内存连接器通道以在 Vert.x 上下文中分派事件,包括消息和确认。 或者,您可以使用 InMemorySource#runOnVertxContext 方法切换此行为。

通道装饰器

通道装饰器 是一种拦截和装饰与消息传递通道对应的反应流的方法。 这对于向通道添加自定义行为(例如日志记录、指标或错误处理)非常有用。

因此,可以为传入通道实现实现 PublisherDecorator 的 Bean,为传出通道实现 SubscriberDecorator。 由于两个 API 是对称的,因此您可以在同一 Bean 中实现两个接口。 这些 Bean 由 Quarkus 自动发现并按优先级应用(从最小值到最大值)。

默认情况下,Quarkus 扩展包含一些装饰器。

传入通道 (PublisherDecorator) 按优先级顺序

  • io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator (-100):清除传入通道的上下文传播

  • io.smallrye.reactive.messaging.providers.locals.ContextDecorator (0):确保消息在消息上下文中分派

  • io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator (100):处理可暂停通道

  • io.smallrye.reactive.messaging.providers.IncomingInterceptorDecorator (500):处理 IncomingInterceptor Bean

  • io.smallrye.reactive.messaging.providers.metrics.MetricDecorator (1000):MicroProfile 指标支持,使用 quarkus-smallrye-metrics 扩展启用

  • io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator (1000):Micrometer 指标支持,使用 quarkus-micrometer 扩展启用

  • io.smallrye.reactive.messaging.providers.extension.ObservationDecorator (1000):传入通道的消息观察支持

  • io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator (1000):处理可暂停通道

  • io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingIncomingDecorator (1000):包含在 quarkus-opentelemetry 扩展中,传播跟踪信息

传出通道 (SubscriberDecorator)

  • io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator (-100):清除传出通道的上下文传播

  • io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator (1000):传出通道的消息观察支持

  • io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator (1000):处理可暂停通道

  • io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator (1000):包含在 quarkus-opentelemetry 扩展中,传播跟踪信息

  • io.smallrye.reactive.messaging.providers.OutgoingInterceptorDecorator (2000):处理 OutgoingInterceptor Bean

更进一步

本指南展示了 Quarkus 消息传递扩展的一般原则。

如果您想进一步了解,可以查看 SmallRye Reactive Messaging 文档,其中包含有关这些概念和更多内容的深入文档。

相关内容