Emitter - 连接命令式和响应式世界

在之前一篇关于 Kafka 和 Avro 的博客文章中,我们使用了 emitter 来发送 Kafka 消息。

architecture

在这篇文章中,我们将更仔细地研究这个 emitter 构造。

注入 Emitter

注入 emitter 很简单。您需要指定目标通道,即消息发送的目的地。

@Inject @Channel("movies") Emitter<Movie> emitter;

请记住,Reactive Messaging 使用 channels 作为主要抽象。它们可以是内存中的通道,也可以映射到远程代理。

在前面的代码片段中,我们注入了一个 Emitter<Movie>。这意味着您将发送包含电影作为有效载荷的消息。因此,指定的类型是有效载荷的类型。这允许您直接发送有效载荷(自动包装在消息中)或发送更详细的消息,其中包含电影作为有效载荷。

Movie movie = ...

// Send payloads directly
emitter.send(movie);

// Send messages
emitter.send(Message.of(movie));

发送有效载荷

发送有效载荷是发送数据的最简单方式。您只需将有效载荷(例如 Movie 实例)传递给 send 方法。底层会自动创建一个简单的 Message 来包装有效载荷。

当与有效载荷一起使用时,send 方法会返回一个 CompletionStage,指示消息处理是否成功或失败。

emitter.send(movie)
    .whenComplete((success, failure) -> {
        if (failure != null) {
            System.out.println("D'oh! " + failure.getMessage());
        } else {
            System.out.println("Message processed successfully");
        }
    });

处理(稍后我们将看到实际的发出操作)是异步发生的。因此,返回的 CompletionStage 会告知您消息何时被处理。当消息被确认时,CompletionStage 会成功完成。大多数情况下,这意味着处理已顺利完成,或者消息已成功发送到代理。如果出现问题,CompletionStage 会以异常方式完成。传递的异常会告知您原因。

发送消息

虽然发送有效载荷更直接,但有时您需要为消息附加元数据,例如配置消息在 Kafka 中的写入方式、追踪信息等。Emitter 也允许发送消息,从而附加您想要的元数据。在下面的示例中,我们配置了出站 Kafka 记录。我们设置了键、主题等。这样,您可以将消息分派到不同的主题,甚至动态决定。

OutgoingKafkaRecordMetadata<?> metadata = OutgoingKafkaRecordMetadata.builder()
        .withTopic("movies")
        .withKey(movie.getYear())
        .build();
emitter.send(Message.of(movie).addMetadata(metadata));

发出是异步的

Emitter 在命令式和响应式世界之间架起了一座桥梁。当您发出一条消息时,这条消息不会立即被处理。消费消息的下游组件属于 Reactive Streams。立即传递消息会违反 Reactive Streams 协议。我们必须确保下游组件已准备好接收该消息。因此,Emitter 不会直接推送消息,而是将其放入一个缓冲区,用于处理下游的容量(在 Reactive Streams 的术语中称为请求)。

buffer

下游组件会根据其发出的请求接收消息,确保其容量永远不会被超出。

溢出管理

但是,缓冲区也带来了溢出问题。如果您发出过多消息而下游无法跟上,消息将存储在缓冲区中,直到达到最大容量。此时,您将无法再发出消息,任何尝试发出的操作都会抛出异常。但是在这种情况下,我们能做什么呢?在注入 emitter 时,您可以配置一个溢出策略。例如,您可以设置缓冲区大小、使用无界缓冲区、丢弃消息、失败,或者只是忽略背压并让下游处理。默认情况下,它使用一个缓冲区,但根据您的用例,您可能希望进行不同的配置。

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1000)
Emitter<Movie> emitter1;

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.NONE)
Emitter<Movie> emitter2;

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)
Emitter<Movie> emitter3;

结论

本文简要介绍了 Reactive Messaging 中的 Emitter 构造。更多信息可在 SmallRye Reactive Messaging 文档 中找到。

在下一个 Quarkus 版本(1.9)中,此功能将通过两个非常好的增强功能得到改进。首先,它将提供一个 Mutiny 变体,简化与 Mutiny API 的集成。然后,对于 Kafka 用例,将可以直接发出键/值对,而无需使用元数据。

敬请期待!我们将在后续文章中介绍这些内容!