编辑此页面

Apache Pulsar 参考指南

本参考指南演示了您的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Pulsar 进行交互。

1. 简介

Apache Pulsar 是一个为云构建的开源分布式消息传递和流处理平台。它提供了一个多租户、高性能的解决方案来服务消息传递,并具有分层存储功能。

Pulsar 实现了发布-订阅模式

  • 生产者将消息发布到主题

  • 消费者创建这些主题的订阅以接收和处理传入的消息,并在处理完成后向 broker 发送确认

  • 创建订阅后,Pulsar 会保留所有消息,即使消费者已断开连接。仅当消费者确认已成功处理所有这些消息时,才会丢弃保留的消息。

Pulsar 集群包含

  • 一个或多个broker,它们是无状态组件。

  • 用于维护主题元数据、模式、协调和集群配置的元数据存储

  • 一组用于消息持久化存储的 bookies

2. Apache Pulsar 的 Quarkus 扩展

Quarkus 通过 SmallRye Reactive Messaging 框架为 Apache Pulsar 提供支持。它基于 Eclipse MicroProfile Reactive Messaging 规范 3.0,提出了一个灵活的编程模型,将 CDI 和事件驱动桥接起来。

本指南深入探讨了 Apache Pulsar 和 SmallRye Reactive Messaging 框架。要快速入门,请查看 开始使用 Apache Pulsar 的 Quarkus Messaging

您可以通过在项目基本目录中运行以下命令将 messaging-pulsar 扩展添加到您的项目

CLI
quarkus extension add messaging-pulsar
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
Gradle
./gradlew addExtension --extensions='messaging-pulsar'

这会将以下内容添加到您的构建文件中

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-pulsar")

该扩展包括 pulsar-clients-original 版本 3.0.0 作为传递依赖项,并且与 Pulsar broker 版本 2.10.x 兼容。

3. 配置 SmallRye Pulsar 连接器

由于 SmallRye Reactive Messaging 框架支持不同的消息传递后端,如 Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT 等,因此它采用通用词汇表

  • 应用程序发送和接收消息Message 包装一个有效负载,并且可以使用一些元数据进行扩展。这不应与 Pulsar Message 混淆,后者由值、键组成。使用 Pulsar 连接器,Reactive Messaging 消息对应于 Pulsar 消息

  • 消息在通道上传输。应用程序组件连接到通道以发布和消费消息。Pulsar 连接器将通道映射到 Pulsar 主题

  • 通道使用连接器连接到消息传递后端。配置连接器以将传入消息映射到特定通道(由应用程序消费)并收集发送到特定通道的传出消息。每个连接器都专用于特定的消息传递技术。例如,处理 Pulsar 的连接器名为 smallrye-pulsar

Pulsar 连接器的最小配置,带有一个传入通道,如下所示

%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 配置生产环境配置文件的 Pulsar broker 服务 URL。您可以使用 mp.messaging.incoming.$channel.serviceUrl 属性全局配置它,也可以按通道进行配置。在开发模式和运行测试时,Pulsar 的 Dev Services 会自动启动 Pulsar broker。
2 配置连接器以管理价格通道。默认情况下,主题名称与通道名称相同。

您可以配置主题属性以覆盖它。

%prod 前缀表示该属性仅在应用程序以生产模式运行时使用(因此不在开发或测试模式下)。有关更多详细信息,请参阅 配置文件文档
连接器自动附加

如果您的类路径上只有一个连接器,则可以省略 connector 属性配置。Quarkus 会自动将孤立通道关联到类路径上找到的(唯一)连接器。孤立通道是没有下游消费者的传出通道或没有上游生产者的传入通道。

可以使用以下方式禁用此自动附加功能

quarkus.messaging.auto-connector-attachment=false

有关更多配置选项,请参见 配置 Pulsar 客户端

4. 从 Pulsar 接收消息

Pulsar 连接器使用 Pulsar 客户端连接到 Pulsar broker,并创建消费者以从 Pulsar broker 接收消息,并将每个 Pulsar Message 映射到 Reactive Messaging Message

4.1. 示例

假设您有一个正在运行的 Pulsar broker,可以使用 pulsar:6650 地址访问。配置您的应用程序以接收 prices 通道上的 Pulsar 消息,如下所示

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. 配置 Pulsar broker 服务 URL。

  2. 确保消费者订阅从 Earliest 位置开始接收消息。

您无需设置 Pulsar 主题或消费者名称。默认情况下,连接器使用通道名称 (prices)。您可以配置 topicconsumerName 属性以覆盖它们。

在 Pulsar 中,消费者需要为主题订阅提供 subscriptionName。如果未提供,连接器将生成唯一的订阅名称

然后,您的应用程序可以直接接收 double 有效负载

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,您可以检索 Reactive Messaging 类型 Message<Double>

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

Reactive Messaging Message 类型允许消费方法访问传入消息元数据并手动处理确认。

如果您想直接访问 Pulsar 消息对象,请使用

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

org.apache.pulsar.client.api.Message 由底层 Pulsar 客户端提供,可以直接与消费方法一起使用。

或者,您的应用程序可以将 Multi 注入到您的 bean 中,使用通道名称标识并订阅其事件,如以下示例所示

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

使用 @Channel 消费消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST(以前的 RESTEasy Reactive)端点为您处理此问题。

以下类型可以作为通道注入

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与之前的 Message 示例一样,如果您的注入通道接收有效负载 (Multi<T>),它会自动确认消息,并支持多个订阅者。如果您的注入通道接收 Message (Multi<Message<T>>),您将负责确认和广播。

4.2. 阻塞处理

Reactive Messaging 在 I/O 线程上调用您的方法。有关此主题的更多详细信息,请参见 Quarkus Reactive Architecture 文档。但是,您通常需要将 Reactive Messaging 与阻塞处理(如数据库交互)结合使用。为此,您需要使用 @Blocking 注释,指示处理是阻塞的,不应在调用者线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入的有效负载存储到数据库中

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,您可以同时使用两者。第一个提供了更细粒度的调整,例如要使用的工作池以及是否保留顺序。第二个也用于 Quarkus 的其他 Reactive 功能,它使用默认工作池并保留顺序。

有关 @Blocking 注释的使用的详细信息,请参见 SmallRye Reactive Messaging – 处理阻塞执行

@RunOnVirtualThread

有关在 Java 虚拟线程上运行阻塞处理的信息,请参见 Quarkus 虚拟线程支持与响应式消息传递文档

@Transactional

如果您的方法使用 @Transactional 注释,则即使该方法未使用 @Blocking 注释,也会自动将其视为阻塞方法。

4.3. Pulsar 订阅类型

Pulsar subscriptionType 消费者配置可以灵活地用于实现不同的消息传递场景,例如发布-订阅或排队。

  • Exclusive 订阅类型允许为“扇出式发布-订阅消息传递”指定唯一订阅名称。这是默认订阅类型。

  • SharedKey_SharedFailover 订阅类型允许多个消费者共享相同的订阅名称,以实现消费者之间的“消息排队”。

如果未提供订阅名称,Quarkus 将生成唯一 ID。

4.4. 反序列化和 Pulsar 模式

Pulsar 连接器允许为底层 Pulsar 消费者配置模式配置。有关更多信息,请参见 Pulsar 模式配置 & 自动模式发现

4.5. 确认策略

当从 Pulsar 消息生成的消息被确认时,连接器会向 Pulsar broker 发送 确认请求。大多数情况下,所有 Reactive Messaging 消息都需要确认,这会自动处理。可以使用以下两种策略将确认请求发送到 Pulsar broker

  • Individual acknowledgement 是默认策略,每个消息都会向 broker 发送确认请求。

  • Cumulative acknowledgement,使用 ack-strategy=cumulative 配置,消费者仅确认其收到的最后一条消息。流中直到(包括)提供的消息的所有消息都不会重新传递给该消费者。

默认情况下,Pulsar 消费者不会等待 broker 的确认确认来验证确认。您可以使用 ackReceiptEnabled=true 启用此功能。

4.6. 故障处理策略

如果从 Pulsar 消息生成的消息被 nacked,则会应用失败策略。Quarkus Pulsar 扩展支持 4 种策略

  • nack (default)否定确认 发送到 broker,触发 broker 将此消息重新传递给消费者。可以使用 negativeAckRedeliveryDelayMicrosnegativeAck.redeliveryBackoff 属性进一步配置否定确认。

  • fail 使应用程序失败,不再处理消息。

  • ignore 将记录失败,但将应用确认策略并继续处理。

  • continue 将记录失败,但继续处理而不应用确认或否定确认。此策略可以与 确认超时 配置一起使用。

  • reconsume-later 使用 reconsumeLater API 将消息发送到 重试消息主题,以便延迟重新消费。可以使用 reconsumeLater.delay 属性配置延迟,默认为 3 秒。可以通过将 io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata 实例添加到失败元数据来配置每个消息的自定义延迟或属性。

4.6.1. 确认超时

类似于否定确认,使用 确认超时 机制,Pulsar 客户端会跟踪未确认的消息,在给定的 ackTimeout 周期内,并向 broker 发送 redeliver unacknowledged messages request,因此 broker 会将未确认的消息重新发送给消费者。

要配置超时和重新传递退避机制,您可以设置 ackTimeoutMillisackTimeout.redeliveryBackoff 属性。ackTimeout.redeliveryBackoff 值接受以逗号分隔的值,分别为最小延迟(以毫秒为单位)、最大延迟(以毫秒为单位)和乘数

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

4.6.2. 稍后重新消费和重试消息主题

重试消息主题 将未成功消费的消息推送到死信主题,并继续消息消费。请注意,死信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试消息主题。

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

4.6.3. 死信主题

死信主题 将未成功消费的消息推送到死信主题,并继续消息消费。请注意,死信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试消息主题。

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

用于重新传递的否定确认确认超时方法将重新传递至少包含一条未处理消息的整个消息批次。有关更多信息,请参见 生产者批处理

4.7. 批量接收 Pulsar 消息

默认情况下,传入方法单独接收每条 Pulsar 消息。您可以使用 batchReceive=true 属性启用批处理模式,或者在消费者配置中设置 batchReceivePolicy

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
        String key = msg.getKey();
        String topic = msg.getTopicName();
        long timestamp = msg.getEventTime();
        //... process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
        //... process messages
    }
}

或者,您可以直接将有效负载列表接收到消费方法

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus 会自动检测传入通道的批处理类型,并自动设置批处理配置。您可以使用 mp.messaging.incoming.$channel.batchReceive 属性显式配置批处理模式。

5. 向 Pulsar 发送消息

Pulsar 连接器可以将 Reactive Messaging `Message`s 作为 Pulsar Message 写入。

5.1. 示例

假设您有一个正在运行的 Pulsar broker,可以使用 pulsar:6650 地址访问。配置您的应用程序以将来自 prices 通道的消息写入 Pulsar Messages,如下所示

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. 配置 Pulsar broker 服务 URL。

您无需设置 Pulsar 主题或生产者名称。默认情况下,连接器使用通道名称 (prices)。您可以配置 topicproducerName 属性以覆盖它们。

然后,您的应用程序必须将 Message<Double> 发送到 prices 通道。它可以像以下代码段中那样使用 double 有效负载

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

请注意,generate 方法返回一个 Multi<Double>,它实现了 Flow.Publisher 接口。框架将使用此发布者生成消息并将其发送到配置的 Pulsar 主题。

您可以返回 io.smallrye.reactive.messaging.pulsar.OutgoingMessage 而不是返回有效负载来发送 Pulsar 消息

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

有效负载可以包装在 org.eclipse.microprofile.reactive.messaging.Message 中,以更好地控制写入的记录

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

发送 Messages 时,您可以添加 io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata 实例以影响消息的写入方式到 Pulsar。

除了返回 Flow.Publisher 的方法签名外,传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.2. 序列化和 Pulsar 模式

Pulsar 连接器允许为底层 Pulsar 生产者配置模式配置。有关更多信息,请参见 Pulsar 模式配置 & 自动模式发现

5.3. 发送键/值对

为了将 Kev/Value 对发送到 Pulsar,您可以使用 KeyValue 模式配置 Pulsar 生产者模式。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

如果您需要更好地控制写入的记录,请使用 PulsarOutgoingMessageMetadata

5.4. 确认

从生产者接收消息后,Pulsar broker 会为该消息分配一个 MessageId 并将其发回给生产者,以确认消息已发布。

默认情况下,连接器会等待 Pulsar 确认记录以继续处理(确认收到的 Message)。您可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此功能。

如果无法写入记录,则消息将被 nacked

如果发生故障,Pulsar 客户端会自动重试发送消息,直到达到 发送超时。可以使用 sendTimeoutMs 属性配置 发送超时,默认值为 30 秒。

5.5. 背压和飞行中记录

Pulsar 出站连接器处理背压,监控等待写入 Pulsar broker 的挂起消息数量。挂起消息的数量使用 maxPendingMessages 属性配置,默认值为 1000。

连接器一次仅发送该数量的消息。在至少一条挂起消息被 broker 确认之前,不会发送其他消息。然后,当 broker 的一条挂起消息被确认时,连接器会向 Pulsar 写入一条新消息。

您还可以通过将 maxPendingMessages 设置为 0 来删除挂起消息的限制。请注意,Pulsar 还允许使用 maxPendingMessagesAcrossPartitions 配置每个分区的挂起消息数量。

5.6. 生产者批处理

默认情况下,Pulsar 生产者将各个消息批量处理在一起以发布到 broker。您可以使用 batchingMaxPublishDelayMicrosbatchingPartitionSwitchFrequencyByPublishDelaybatchingMaxMessagesbatchingMaxBytes 配置属性配置批处理参数,或者使用 batchingEnabled=false 完全禁用它。

使用 Key_Shared 消费者订阅时,可以将 batcherBuilder 配置为 BatcherBuilder.KEY_BASED

6. Pulsar 事务和精确一次处理

Pulsar 事务 使事件流应用程序能够在一个原子操作中消费、处理和生成消息。

事务允许一个或多个生产者将一批消息发送到多个主题,其中批处理中的所有消息最终对任何消费者可见,或者没有消息对消费者可见。

为了使用,需要在 broker 配置中激活事务支持,使用 transactionCoordinatorEnabled=truesystemTopicEnabled=true broker 配置。

在客户端,还需要在 PulsarClient 配置中启用事务支持

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar 连接器提供 PulsarTransactions 自定义发射器,用于在事务中写入记录。

它可以用作常规发射器 @Channel

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

给定给 withTransaction 方法的函数接收一个 TransactionalEmitter 用于生成记录,并返回一个 Uni,它提供事务的结果。如果处理成功完成,则刷新生产者并提交事务。如果处理抛出异常,返回失败的 Uni,或将 TransactionalEmitter 标记为中止,则事务将被中止。

多个事务生产者可以参与单个事务。这确保所有消息都使用启动的事务发送,并且在事务提交之前,所有参与的生产者都会被刷新。

如果在 Vert.x 上下文中调用此方法,则处理函数也会在该上下文中调用。否则,它将在生产者的发送线程上调用。

6.1. 精确一次处理

Pulsar 事务 API 还允许在事务中管理消费者偏移量,以及生成的消息。这反过来使消费者能够与事务生产者以消费-转换-生产模式耦合,也称为精确一次处理。这意味着应用程序消费消息,处理它们,将结果发布到主题,并在事务中提交已消费消息的偏移量。

PulsarTransactions 发射器还提供了一种在事务中将精确一次处理应用于传入 Pulsar 消息的方法。

以下示例在事务中包含一批 Pulsar 消息。

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

    @ApplicationScoped
    public class PulsarExactlyOnceProcessor {

        @Inject
        @Channel("tx-out-example")
        PulsarTransactions<Integer> txProducer;

        @Incoming("in-channel")
        public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
            return txProducer.withTransactionAndAck(batch, emitter -> {
                for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
                    emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
                }
                return Uni.createFrom().voidItem();
            });
        }

    }

如果处理成功完成,则在事务中确认消息并提交事务。

使用精确一次处理时,消息只能单独确认,而不能累积确认。

如果处理需要中止,则消息将被 nack。可以使用其中一种失败策略来重试处理或仅进行故障停止。请注意,如果事务失败并中止,则从 withTransaction 返回的 Uni 将产生失败。

应用程序可以选择处理错误情况,但为了消息消费继续,从 @Incoming 方法返回的 Uni 不能导致失败。PulsarTransactions#withTransactionAndAck 方法将 ack 和 nack 消息,但不会停止 Reactive 流。忽略失败只会将消费者重置为上次提交的偏移量,并从那里恢复处理。

为了避免在失败情况下出现重复,建议在 broker 端启用消息重复数据删除和批处理索引级别确认

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

7. Pulsar 模式配置 & 自动模式发现

Pulsar 消息以有效负载作为非结构化字节数组存储。Pulsar 模式 定义了如何将结构化数据序列化为原始消息字节。模式 在生产者和消费者中应用,以使用强制数据结构进行写入和读取。它在数据发布到主题之前将数据序列化为原始字节,并在数据传递给消费者之前将原始字节反序列化。

Pulsar 使用模式注册表作为中央存储库来存储注册的模式信息,这使生产者/消费者可以通过 broker 协调主题消息的模式。默认情况下,Apache BookKeeper 用于存储模式。

Pulsar API 为许多 基本类型复杂类型(如键/值、Avro 和 Protobuf)提供了内置模式信息。

Pulsar 连接器允许使用 schema 属性将模式指定为基本类型

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

如果 schema 属性的值与 Schema Type 匹配,则将使用该类型创建一个简单模式,并将其用于该通道。

Pulsar 连接器允许通过 CDI 提供 Schema bean 来配置复杂模式类型,并使用 @Identifier 限定符标识。

例如,以下 bean 提供了一个 JSON 模式和一个键/值模式

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

要使用定义的模式配置传入通道 users,您需要将 schema 属性设置为模式 user-schema 的标识符

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

如果未找到 schema 属性,则连接器会查找使用通道名称标识的 Schema bean。例如,传出通道 a-channel 将使用键/值模式。

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

如果未提供模式信息,传入通道将使用 Schema.AUTO_CONSUME(),而传出通道将使用 Schema.AUTO_PRODUCE_BYTES() 模式。

7.1. 自动模式发现

当使用 Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar) 时,Quarkus 通常可以自动检测要配置的正确 Pulsar 模式。此自动检测基于 @Incoming@Outgoing 方法的声明,以及注入的 @Channels。

例如,如果您声明

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

并且您的配置表明 generated-price 通道使用 smallrye-pulsar 连接器,那么 Quarkus 将自动将 generated-price 通道的 schema 属性设置为 Pulsar Schema INT32

类似地,如果您声明

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

并且您的配置表明 my-pulsar-consumer 通道使用 smallrye-pulsar 连接器,那么 Quarkus 将自动将 schema 属性设置为 Pulsar BYTES Schema。

最后,如果您声明

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

并且您的配置表明 price-create 通道使用 smallrye-pulsar 连接器,那么 Quarkus 将自动将 schema 设置为 Pulsar INT64 Schema。

Pulsar Schema 自动检测支持的完整类型集为

  • shortjava.lang.Short

  • intjava.lang.Integer

  • longjava.lang.Long

  • floatjava.lang.Float

  • doublejava.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • 从 Avro 模式生成的类,以及 Avro GenericRecord,将使用 AVRO 模式类型配置

  • 从 Protobuf 模式生成的类,将使用 PROTOBUF 模式类型配置

  • 其他类将自动使用 JSON 模式类型配置

请注意,JSON 模式类型强制执行模式验证。

除了这些 Pulsar 提供的模式外,Quarkus 还提供以下不强制执行验证的模式实现

  • io.vertx.core.buffer.Buffer 将使用 io.quarkus.pulsar.schema.BufferSchema 模式配置

  • io.vertx.core.json.JsonObject 将使用 io.quarkus.pulsar.schema.JsonObjectSchema 模式配置

  • io.vertx.core.json.JsonArray 将使用 io.quarkus.pulsar.schema.JsonArraySchema 模式配置

  • 对于无模式 Json 序列化,如果 schema 配置设置为 ObjectMapper<fully_qualified_name_of_the_bean>,则将使用 Jackson ObjectMapper 生成模式,而不强制执行 Pulsar Schema 验证。可以使用 io.quarkus.pulsar.schema.ObjectMapperSchema 显式配置 JSON 模式而不进行验证。

如果通过配置设置了 schema,则不会被自动检测替换。

如果您在序列化器自动检测方面遇到任何问题,您可以通过设置 quarkus.messaging.pulsar.serializer-autodetection.enabled=false 完全关闭它。如果您发现需要这样做,请在 Quarkus 问题跟踪器 中提交一个错误,以便我们可以解决您遇到的任何问题。

8. Pulsar 的 Dev Services

使用 Quarkus Messaging Pulsar 扩展 (quarkus-messaging-pulsar) 时,Pulsar 的 Dev Services 会在开发模式和运行测试时自动启动 Pulsar broker。因此,您不必手动启动 broker。应用程序会自动配置。

8.1. 启用/禁用 Pulsar 的 Dev Services

Pulsar 的 Dev Services 会自动启用,除非

  • quarkus.pulsar.devservices.enabled 设置为 false

  • 配置了 pulsar.client.serviceUrl

  • 所有 Reactive Messaging Pulsar 通道都设置了 serviceUrl 属性

Pulsar 的 Dev Services 依赖于 Docker 来启动 broker。如果您的环境不支持 Docker,您需要手动启动 broker,或连接到已运行的 broker。您可以使用 pulsar.client. 配置 broker 地址。

8.2. 共享 broker

大多数情况下,您需要在应用程序之间共享代理。Pulsar 的 Dev Services 为运行在开发模式下的多个 Quarkus 应用程序实现了一个服务发现机制,以共享单个代理。

Pulsar 的 Dev Services 使用 quarkus-dev-service-pulsar 标签启动容器,该标签用于标识容器。

如果您需要多个(共享)代理,您可以配置 quarkus.pulsar.devservices.service-name 属性并指定代理名称。它会查找具有相同值的容器,如果找不到,则启动一个新的容器。默认服务名称为 pulsar

默认情况下,共享在开发模式下启用,但在测试模式下禁用。您可以使用 quarkus.pulsar.devservices.shared=false 禁用共享。

8.3. 设置端口

默认情况下,Pulsar 的 Dev Services 会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.pulsar.devservices.port 属性来设置端口。

请注意,Pulsar 发布的地址会自动配置为所选端口。

8.4. 配置镜像

Pulsar 的 Dev Services 支持 官方 Apache Pulsar 镜像

可以这样配置自定义镜像名称

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

8.5. 配置 Pulsar 代理

您可以使用自定义代理配置来配置 Pulsar 的 Dev Services。

以下示例启用了事务支持

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

8.6. 配置参考

构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖

配置属性

类型

默认

如果已显式启用或禁用 Pulsar 的 Dev Services。Dev Services 通常默认启用,除非存在现有配置。对于 Pulsar,Dev Services 会启动一个代理,除非设置了 pulsar.client.serviceUrl 或所有 Reactive Messaging Pulsar 通道都配置了 serviceUrl

环境变量:QUARKUS_PULSAR_DEVSERVICES_ENABLED

显示更多

布尔值

开发服务将侦听的可选固定端口。

如果未定义,将随机选择端口。

环境变量:QUARKUS_PULSAR_DEVSERVICES_PORT

显示更多

整数

要使用的镜像。请注意,仅支持 Apache Pulsar 镜像。具体来说,镜像存储库必须以 apachepulsar/pulsar 结尾。查看 https://hub.docker.com/r/apachepulsar/pulsar 以查找可用版本。

环境变量:QUARKUS_PULSAR_DEVSERVICES_IMAGE_NAME

显示更多

字符串

apachepulsar/pulsar:3.2.4

指示 Quarkus Dev Services 管理的 Pulsar 代理是否共享。共享时,Quarkus 会使用基于标签的服务发现来查找正在运行的容器。如果找到匹配的容器,则会使用该容器,因此不会启动第二个容器。否则,Pulsar 的 Dev Services 会启动一个新的容器。

该发现使用 quarkus-dev-service-pulsar 标签。该值使用 service-name 属性配置。

容器共享仅在开发模式下使用。

环境变量:QUARKUS_PULSAR_DEVSERVICES_SHARED

显示更多

布尔值

true

附加到启动的容器的 quarkus-dev-service-pulsar 标签的值。当 shared 设置为 true 时,将使用此属性。在这种情况下,在启动容器之前,Pulsar 的 Dev Services 会查找 quarkus-dev-service-pulsar 标签设置为配置值的容器。如果找到,它将使用此容器而不是启动新的容器。否则,它会启动一个将 quarkus-dev-service-pulsar 标签设置为指定值的新容器。

当您需要多个共享 Pulsar 代理时,将使用此属性。

环境变量:QUARKUS_PULSAR_DEVSERVICES_SERVICE_NAME

显示更多

字符串

pulsar

要在 Pulsar 实例上设置的代理配置

环境变量:QUARKUS_PULSAR_DEVSERVICES_BROKER_CONFIG__ENVIRONMENT_VARIABLE_NAME_

显示更多

Map<String,String>

9. 配置 Pulsar 客户端

Pulsar 客户端、消费者和生产者非常可定制,可以配置 Pulsar 客户端应用程序的行为方式。

Pulsar 连接器为每个通道创建一个 Pulsar 客户端,以及一个消费者或生产者,每个通道都有合理的默认设置,以简化它们的配置。尽管创建过程是自动处理的,但所有可用的配置选项仍然可以通过 Pulsar 通道进行配置。

虽然创建 PulsarClientPulsarConsumerPulsarProducer 的惯用方式是通过构建器 API,但本质上这些 API 每次都会构建一个配置对象,以便传递给实现。这些是 ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

Pulsar 连接器允许直接接收这些配置对象的属性。例如,PulsarClient 的代理身份验证信息使用 authPluginClassNameauthParams 属性接收。为了配置传入通道 data 的身份验证

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

请注意,Pulsar 消费者属性 subscriptionInitialPosition 也配置了 Earliest 值,该值表示枚举值 SubscriptionInitialPosition.Earliest

此方法涵盖了大多数配置情况。但是,无法以这种方式配置非可序列化对象,例如 CryptoKeyReaderServiceUrlProvider 等。Pulsar 连接器允许考虑 Pulsar 配置数据对象的实例 – ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

检索并使用此实例来配置连接器使用的客户端。您需要使用 client-configurationconsumer-configurationproducer-configuration 属性来指示客户端的名称

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

如果未配置 [client|consumer|producer]-configuration,则连接器将查找使用通道名称标识的实例

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

您还可以提供一个 Map<String, Object>,其中包含按键的配置值

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

不同的配置源按以下优先级顺序加载,从最不重要到最重要

  1. 使用默认配置标识符 default-pulsar-clientdefault-pulsar-consumerdefault-pulsar-producer 生成的 Map<String, Object> 配置映射。

  2. 使用配置或通道名称中的标识符生成的 Map<String, Object> 配置映射

  3. 使用通道配置或通道名称中的标识符生成的 [Client|Producer|Consuemr]ConfigurationData 对象

  4. [Client|Producer|Consuemr]ConfigurationData 字段名称命名的通道配置属性。

有关配置选项的详尽列表,请参阅 配置参考

9.1. 配置 Pulsar 身份验证

Pulsar 提供了一个可插拔的身份验证框架,Pulsar 代理/代理使用此机制来验证客户端的身份。

可以使用 authPluginClassNameauthParams 属性在 application.properties 文件中配置客户端

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

或者以编程方式

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

9.1.1. 使用 mTLS 配置到 Pulsar 的身份验证

Pulsar Messaging 扩展与 Quarkus TLS 注册表集成,以使用 mTLS 验证客户端身份。

要为 Pulsar 通道配置 mTLS,您需要在 application.properties 中提供一个命名的 TLS 配置

quarkus.tls.my-tls-config.trust-store.p12.path=target/certs/pulsar-client-truststore.p12
quarkus.tls.my-tls-config.trust-store.p12.password=secret
quarkus.tls.my-tls-config.key-store.p12.path=target/certs/pulsar-client-keystore.p12
quarkus.tls.my-tls-config.key-store.p12.password=secret

mp.messaging.incoming.prices.tls-configuration-name=my-tls-config

9.1.2. 配置对 Datastax Luna Streaming 的访问

Luna Streaming 是 Apache Pulsar 的生产就绪版本,具有 DataStax 的工具和支持。创建 DataStax Luna Pulsar 租户后,请记下自动生成的令牌,并配置令牌身份验证

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

确保提前创建主题,或在命名空间配置中启用自动主题创建

请注意,主题配置需要引用主题的完整名称

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

9.1.3. 配置对 StreamNative Cloud 的访问

StreamNative Cloud 是一种完全托管的 Pulsar 即服务,可在不同的部署选项中使用,无论是完全托管、在公共云上但由 StreamNative 管理还是在 Kubernetes 上自行管理。

StreamNative Pulsar 集群使用 Oauth2 身份验证,因此您需要确保存在一个具有所需 Pulsar 命名空间/主题权限服务帐户您的应用程序正在使用。

接下来,您需要下载服务帐户的 密钥文件(用作 私钥),并记下群集的 颁发者 URL(通常为 https://auth.streamnative.cloud/)和 受众(例如 urn:sn:pulsar:o-rf3ol:redhat)。StreamNative Cloud 控制台中 管理 部分中的 Pulsar 客户端 页面可帮助您完成此过程。

要使用 Pulsar Oauth2 身份验证配置您的应用程序

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

请注意,pulsar.client.authParams 配置包含一个 Json 字符串,其中 issuerUrlaudienceprivateKey 的格式为 data:application/json;base64,<base64-encoded-key-file>

或者,您可以以编程方式配置身份验证

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

这假设密钥文件作为资源包含在应用程序类路径中,然后配置将如下所示

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

请注意,使用以 pulsar-auth 标识的客户端配置的通道需要设置 client-configuration 属性。

10. 健康检查

Quarkus 扩展报告由 Pulsar 连接器管理的每个通道的启动、就绪和活动状态。健康检查依赖于 Pulsar 客户端来验证是否已与代理建立连接。

当与代理建立连接时,入站和出站通道的 启动就绪 探针都会报告 OK

当与代理建立连接 并且 没有捕获到任何故障时,入站和出站通道的 活动 探针都会报告 OK

请注意,消息处理失败会 nack 消息,然后由失败策略处理。失败策略负责报告故障并影响活动检查的结果。fail 失败策略会报告故障,因此活动检查会报告故障。

11. 配置参考

以下是 Pulsar 连接器通道、消费者、生产者和客户端的配置属性列表。有关如何配置 Pulsar 客户端的更多信息,请参阅 Pulsar 客户端配置

11.1. 入站通道配置(从 Pulsar 接收)

以下属性使用以下方式配置

mp.messaging.incoming.your-channel-name.attribute=value
表 1. 'smallrye-pulsar' 连接器的入站属性
属性 (别名) 描述 类型 强制 默认

ack-strategy

指定在确认从记录生成的消息时要应用的提交策略。值可以是 ackcumulative

字符串

false

ack

ackTimeout.redeliveryBackoff

用于配置 ack 超时 MultiplierRedeliveryBackoff 的逗号分隔值,最小延迟、最大延迟、乘数。

字符串

false

batchReceive

是否使用批量接收来消费消息

布尔值

false

false

client-configuration

提供此通道的默认 Pulsar 客户端配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

字符串

false

consumer-configuration

提供此通道的默认 Pulsar 消费者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

字符串

false

deadLetterPolicy.deadLetterTopic

将发送失败消息的死信主题的名称

字符串

false

deadLetterPolicy.initialSubscriptionName

死信主题的初始订阅名称的名称

字符串

false

deadLetterPolicy.maxRedeliverCount

在发送到死信主题之前,消息将被重新传递的最大次数

整数

false

deadLetterPolicy.retryLetterTopic

将发送失败消息的重试主题的名称

字符串

false

失败策略

指定在对从记录生成的消息进行负面确认 (nack) 时要应用的失败策略。值可以是 nack(默认)、failignore 或 `reconsume-later

字符串

false

nack

health-enabled

是否启用(默认)或禁用健康报告

布尔值

false

true

negativeAck.redeliveryBackoff

用于配置负面 ack MultiplierRedeliveryBackoff 的逗号分隔值,最小延迟、最大延迟、乘数。

字符串

false

reconsumeLater.delay

reconsume 失败策略的默认延迟(以秒为单位)

long

false

3

schema

此通道的 Pulsar 模式类型。配置后,将使用给定的 SchemaType 构建模式并用于通道。如果不存在,则模式解析为搜索使用 @Identifier 和通道名称限定的 Schema 类型的 CDI bean。作为回退,将使用 AUTO_CONSUME 或 AUTO_PRODUCE。

字符串

false

serviceUrl

Pulsar 服务的服务 URL

字符串

false

pulsar://:6650

topic

消费/填充的 Pulsar 主题。如果未设置,则使用通道名称

字符串

false

已启用跟踪

是否启用(默认)或禁用跟踪

布尔值

false

true

您还可以配置底层 Pulsar 消费者支持的属性。

这些属性也可以使用 pulsar.consumer 前缀全局配置

pulsar.consumer.subscriptionInitialPosition=Earliest
表 2. Pulsar 消费者属性
属性 描述 类型 配置文件 默认

topicNames

主题名称

Set

true

[]

topicsPattern

主题模式

Pattern

true

subscriptionName

订阅名称

String

true

subscriptionType

订阅类型。
有四种订阅类型可用
* Exclusive
* Failover
* Shared
* Key_Shared

SubscriptionType

true

Exclusive

subscriptionProperties

Map

true

subscriptionMode

SubscriptionMode

true

Durable

messageListener

MessageListener

false

consumerEventListener

ConsumerEventListener

false

negativeAckRedeliveryBackoff

自定义消息的接口是 negativeAcked 策略。您可以为消费者指定 RedeliveryBackoff

RedeliveryBackoff

false

ackTimeoutRedeliveryBackoff

自定义消息的接口是 ackTimeout 策略。您可以为消费者指定 RedeliveryBackoff

RedeliveryBackoff

false

receiverQueueSize

消费者接收队列的大小。
例如,应用程序调用 Receive 之前消费者累积的消息数。
高于默认值的值会增加消费者吞吐量,但会以更高的内存利用率为代价。

整数

true

1000

acknowledgementsGroupTimeMicros

将消费者确认分组一段时间。
默认情况下,消费者使用 100 毫秒的分组时间向代理发送确认。
将分组时间设置为 0 会立即发送确认。
更长的 ack 分组时间效率更高,但代价是在发生故障后消息重新传递略有增加。

long

true

100000

maxAcknowledgmentGroupSize

按消息数量对消费者确认进行分组。

整数

true

1000

negativeAckRedeliveryDelayMicros

在重新传递处理失败的消息之前等待的延迟。
当应用程序使用 Consumer#negativeAcknowledge(Message) 时,失败的消息会在固定超时后重新传递。

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

跨分区的最大总接收队列大小。
如果总接收队列大小超过此值,此设置会减小各个分区的接收队列大小。

整数

true

50000

consumerName

消费者名称

String

true

ackTimeoutMillis

未确认消息的超时

long

true

0

tickDurationMillis

ack-timeout 重新传递的粒度。
当将 ack-timeout 设置为更大的值(例如,1 小时)时,使用更高的 tickDurationMillis 会减少跟踪消息的内存开销。

long

true

1000

priorityLevel

消费者在共享订阅类型中,代理在分派消息时为其提供更高优先级的优先级级别。
代理遵循降序优先级。例如,0=最高优先级,1、2,…​
在共享订阅类型中,如果使用者 有许可证,代理 首先将消息分派给最高优先级级别的使用者。否则,代理会考虑下一个优先级级别的使用者。
示例 1
如果订阅具有 priorityLevel 为 0 的 consumerA 和 priorityLevel 为 1 的 consumerB,则代理 仅将消息分派给 consumerA,直到其用完许可证,然后开始将消息分派给 consumerB。
示例 2
消费者优先级,级别,许可证
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

代理将消息分派给使用者的顺序为:C1、C2、C3、C1、C4、C5、C4。

整数

true

0

maxPendingChunkedMessage

保持挂起分块消息的队列的最大大小。当达到阈值时,使用者会删除挂起消息以优化内存利用率。

整数

true

10

autoAckOldestChunkedMessageOnQueueFull

当达到 maxPendingChunkedMessage 的阈值时,是否自动确认挂起分块消息。如果设置为 false,则这些消息将由其代理重新传递。

布尔值

true

false

expireTimeOfIncompleteChunkedMessageMillis

如果使用者未能在指定的时间段内收到所有块,则使不完整块过期的时间间隔。默认值为 1 分钟。

long

true

60000

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

cryptoFailureAction

当消费者收到无法解密的消息时应采取的措施。
* FAIL:这是默认选项,用于使消息失败,直到加密成功为止。
* DISCARD:静默确认,并且不将消息传递给应用程序。
* CONSUME:将加密的消息传递给应用程序。应用程序有责任解密消息。

消息的解压缩失败。

如果消息包含批处理消息,则客户端无法检索批处理中的单个消息。

传递的加密消息包含 EncryptionContext,其中包含应用程序可用于解密已消费消息有效负载的加密和压缩信息。

ConsumerCryptoFailureAction

true

FAIL

properties

此消费者的名称或值属性。

properties 是附加到消费者的应用程序定义的元数据。

在获取主题统计信息时,将此元数据与消费者统计信息相关联,以便于识别。

SortedMap

true

{}

readCompacted

如果启用 readCompacted,则消费者从压缩的主题读取消息,而不是读取主题的完整消息积压。

消费者仅在压缩主题中看到每个键的最新值,直到达到压缩积压时的主题消息中的点。超出该点后,像往常一样发送消息。

仅在订阅持久主题时启用 readCompacted,该主题具有单个活动消费者(例如故障或独占订阅)。

尝试在非持久主题的订阅或共享订阅上启用它会导致订阅调用抛出 PulsarClientException

布尔值

true

false

subscriptionInitialPosition

首次订阅主题时,设置光标的初始位置。

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

使用模式获取主题消费者时的主题自动发现周期。

默认值和最小值是 1 分钟。

整数

true

60

regexSubscriptionMode

使用正则表达式订阅主题时,您可以选择特定类型的主题。

* PersistentOnly:仅订阅持久主题。
* NonPersistentOnly:仅订阅非持久主题。
* AllTopics:同时订阅持久和非持久主题。

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

消费者的死信策略。

默认情况下,某些消息可能会重新传递多次,甚至可能永远不会停止。

通过使用死信机制,消息具有最大重新传递计数。当超出最大重新传递次数时,消息会自动发送到死信主题并进行确认

您可以通过设置 deadLetterPolicy 来启用死信机制。

在指定死信策略,但不指定 ackTimeoutMillis 时,可以将 ack 超时设置为 30000 毫秒。

DeadLetterPolicy

true

retryEnable

布尔值

true

false

batchReceivePolicy

BatchReceivePolicy

false

autoUpdatePartitions

如果启用了 autoUpdatePartitions,则消费者会自动订阅分区增加。

注意:这仅适用于分区消费者。

布尔值

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

replicateSubscriptionState

如果启用了 replicateSubscriptionState,则订阅状态将复制到地理复制群集。

布尔值

true

false

resetIncludeHead

布尔值

true

false

keySharedPolicy

KeySharedPolicy

false

batchIndexAckEnabled

布尔值

true

false

ackReceiptEnabled

布尔值

true

false

poolMessages

布尔值

true

false

payloadProcessor

MessagePayloadProcessor

false

startPaused

布尔值

true

false

autoScaledReceiverQueueSizeEnabled

布尔值

true

false

topicConfigurations

List

true

[]

11.2. 出站通道配置(发布到 Pulsar)

表 3. 'smallrye-pulsar' 连接器的出站属性
属性 (别名) 描述 类型 强制 默认

client-configuration

提供此通道的默认 Pulsar 客户端配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

字符串

false

health-enabled

是否启用(默认)或禁用健康报告

布尔值

false

true

maxPendingMessages

保持挂起消息的队列的最大大小,即等待接收来自代理的确认的消息

整数

false

1000

producer-configuration

提供此通道的默认 Pulsar 生产者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。

字符串

false

schema

此通道的 Pulsar 模式类型。配置后,将使用给定的 SchemaType 构建模式并用于通道。如果不存在,则模式解析为搜索使用 @Identifier 和通道名称限定的 Schema 类型的 CDI bean。作为回退,将使用 AUTO_CONSUME 或 AUTO_PRODUCE。

字符串

false

serviceUrl

Pulsar 服务的服务 URL

字符串

false

pulsar://:6650

topic

消费/填充的 Pulsar 主题。如果未设置,则使用通道名称

字符串

false

已启用跟踪

是否启用(默认)或禁用跟踪

布尔值

false

true

waitForWriteCompletion

客户端是否在确认消息之前等待代理确认写入的记录

布尔值

false

true

您还可以配置底层 Pulsar 生产者支持的属性。

这些属性也可以使用 pulsar.producer 前缀全局配置

pulsar.producer.batchingEnabled=false
表 4. Pulsar 生产者属性
属性 描述 类型 配置文件 默认

topicName

主题名称

String

true

producerName

生产者名称

String

true

sendTimeoutMs

消息发送超时(毫秒)。
如果在 sendTimeout 过期之前服务器未确认消息,则会发生错误。

long

true

30000

blockIfQueueFull

如果设置为 true,则当传出消息队列已满时,生产者的 SendSendAsync 方法会阻止,而不是失败并引发错误。
如果设置为 false,则当传出消息队列已满时,生产者的 SendSendAsync 方法会失败,并发生 ProducerQueueIsFullError 异常。

MaxPendingMessages 参数确定传出消息队列的大小。

布尔值

true

false

maxPendingMessages

挂起消息队列的最大大小。

例如,等待接收来自 代理 的确认的消息。

默认情况下,当队列已满时,所有对 SendSendAsync 方法的调用都会失败,除非 您将 BlockIfQueueFull 设置为 true

整数

true

0

maxPendingMessagesAcrossPartitions

跨分区的最大挂起消息数。

如果总数超过配置的值,请使用此设置来降低每个分区的最大挂起消息数 (#setMaxPendingMessages(int))。

整数

true

0

messageRoutingMode

分区主题 上,生产者的消息路由逻辑。
仅当消息上未设置键时才应用该逻辑。
可用选项如下
* pulsar.RoundRobinDistribution:循环
* pulsar.UseSinglePartition:将所有消息发布到单个分区
* pulsar.CustomPartition:自定义分区方案

MessageRoutingMode

true

hashingScheme

确定在其中发布特定消息的分区(仅限分区主题)的哈希函数。
可用选项如下
* pulsar.JavastringHash:等效于 Java 中的 string.hashCode()
* pulsar.Murmur3_32Hash:应用 Murmur3 哈希函数
* pulsar.BoostHash:应用来自 C++ 的 Boost 库的哈希函数

HashingScheme

true

JavaStringHash

cryptoFailureAction

当加密失败时,生产者应采取的操作。
* FAIL:如果加密失败,则未加密消息将无法发送。
* SEND:如果加密失败,则发送未加密消息。

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

MessageRouter

false

batchingMaxPublishDelayMicros

发送消息的批处理时间段。

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

整数

true

10

batchingMaxMessages

批处理中允许的最大消息数。

整数

true

1000

batchingMaxBytes

整数

true

131072

batchingEnabled

启用消息批处理。

布尔值

true

true

batcherBuilder

BatcherBuilder

false

chunkingEnabled

启用消息分块。

布尔值

true

false

chunkMaxMessageSize

整数

true

-1

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

encryptionKeys

Set

true

[]

compressionType

生产者使用的消息数据压缩类型。
可用选项
* LZ4
* ZLIB
* ZSTD
* SNAPPY

CompressionType

true

NONE

initialSequenceId

Long

true

autoUpdatePartitions

布尔值

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

multiSchema

布尔值

true

true

accessMode

ProducerAccessMode

true

共享

lazyStartPartitionedProducers

布尔值

true

false

properties

SortedMap

true

{}

initialSubscriptionName

使用此配置在创建主题时自动创建初始订阅。如果未设置此字段,则不会创建初始订阅。

String

true

11.3. Pulsar 客户端配置

以下是底层 PulsarClient 的配置参考。可以使用通道属性配置这些选项

mp.messaging.incoming.your-channel-name.numIoThreads=4

或使用 pulsar.client 前缀全局配置

pulsar.client.serviceUrl=pulsar://pulsar:6650
表 5. Pulsar 客户端属性
属性 描述 类型 配置文件 默认

serviceUrl

Pulsar 集群 HTTP URL,用于连接到 Broker。

String

true

serviceUrlProvider

ServiceUrlProvider 的实现类,用于生成 ServiceUrl。

ServiceUrlProvider

false

authentication

客户端的身份验证设置。

Authentication

false

authPluginClassName

客户端身份验证插件的类名。

String

true

authParams

客户端的身份验证参数。

String

true

authParamMap

客户端的身份验证映射。

Map

true

operationTimeoutMs

客户端操作超时时间(毫秒)。

long

true

30000

lookupTimeoutMs

客户端查找超时时间(毫秒)。

long

true

-1

statsIntervalSeconds

打印客户端统计信息的间隔(秒)。

long

true

60

numIoThreads

IO 线程数。

整数

true

10

numListenerThreads

消费者监听器线程数。

整数

true

10

connectionsPerBroker

客户端和每个 Broker 之间建立的连接数。值为 0 表示禁用连接池。

整数

true

1

connectionMaxIdleSeconds

如果连接超过 [connectionMaxIdleSeconds] 秒未使用,则释放连接。如果 [connectionMaxIdleSeconds] < 0,则禁用自动释放空闲连接的功能

整数

true

180

useTcpNoDelay

是否使用 TCP NoDelay 选项。

布尔值

true

true

useTls

是否使用 TLS。

布尔值

true

false

tlsKeyFilePath

TLS 密钥文件的路径。

String

true

tlsCertificateFilePath

TLS 证书文件的路径。

String

true

tlsTrustCertsFilePath

受信任的 TLS 证书文件的路径。

String

true

tlsAllowInsecureConnection

客户端是否接受来自 Broker 的不受信任的 TLS 证书。

布尔值

true

false

tlsHostnameVerificationEnable

当客户端创建与 Broker 的 TLS 连接时,是否验证主机名。

布尔值

true

false

concurrentLookupRequest

可以在每个 Broker 连接上发送的并发查找请求数。设置最大值可防止 Broker 过载。

整数

true

5000

maxLookupRequest

每个 Broker 连接上允许的最大查找请求数,以防止 Broker 过载。

整数

true

50000

maxLookupRedirects

重定向查找请求的最大次数。

整数

true

20

maxNumberOfRejectedRequestPerConnection

在当前连接关闭后,Broker 在某个时间段(60 秒)内拒绝的最大请求数,客户端创建一个新连接以连接到不同的 Broker。

整数

true

50

keepAliveIntervalSeconds

每个客户端 Broker 连接的保持活动间隔秒数。

整数

true

30

connectionTimeoutMs

等待与 Broker 建立连接的持续时间。如果持续时间过去而 Broker 没有响应,则会放弃连接尝试。

整数

true

10000

requestTimeoutMs

完成请求的最长持续时间。

整数

true

60000

readTimeoutMs

请求的最大读取时间。

整数

true

60000

autoCertRefreshSeconds

自动刷新证书的秒数。

整数

true

300

initialBackoffIntervalNanos

初始退避间隔(纳秒)。

long

true

100000000

maxBackoffIntervalNanos

最大退避间隔(纳秒)。

long

true

60000000000

enableBusyWait

是否为 EpollEventLoopGroup 启用 BusyWait。

布尔值

true

false

listenerName

查找的监听器名称。只要网络可访问,客户端就可以使用 listenerName 选择其中一个监听器作为服务 URL 来创建与 Broker 的连接。“advertisedListeners”必须在 Broker 端启用。

String

true

useKeyStoreTls

使用 KeyStore 方式设置 TLS。

布尔值

true

false

sslProvider

内部客户端用于与其他 Pulsar Broker 进行身份验证的 TLS 提供程序。

String

true

tlsKeyStoreType

TLS KeyStore 类型配置。

String

true

JKS

tlsKeyStorePath

TLS KeyStore 的路径。

String

true

tlsKeyStorePassword

TLS KeyStore 的密码。

String

true

tlsTrustStoreType

TLS TrustStore 类型配置。当需要客户端身份验证时,您需要设置此配置。

String

true

JKS

tlsTrustStorePath

TLS TrustStore 的路径。

String

true

tlsTrustStorePassword

TLS TrustStore 的密码。

String

true

tlsCiphers

TLS 密码套件集。

Set

true

[]

tlsProtocols

TLS 协议。

Set

true

[]

memoryLimitBytes

客户端内存使用限制(字节)。 64M 默认值可以保证高的生产者吞吐量。

long

true

67108864

proxyServiceUrl

代理服务的 URL。 proxyServiceUrl 和 proxyProtocol 必须互为包含。

String

true

proxyProtocol

代理服务的协议。 proxyServiceUrl 和 proxyProtocol 必须互为包含。

ProxyProtocol

true

enableTransaction

是否启用事务。

布尔值

true

false

clock

Clock

false

dnsLookupBindAddress

Pulsar 客户端 DNS 查找绑定地址,默认行为是绑定到 0.0.0.0

String

true

dnsLookupBindPort

Pulsar 客户端 DNS 查找绑定端口,在配置 dnsLookupBindAddress 时生效,默认值为 0。

整数

true

0

socks5ProxyAddress

SOCKS5 代理的地址。

InetSocketAddress

true

socks5ProxyUsername

SOCKS5 代理的用户名。

String

true

socks5ProxyPassword

SOCKS5 代理的密码。

String

true

description

客户端版本的额外描述。长度不能超过 64。

String

true

配置文件中不可配置的配置属性(不可序列化)在 配置文件 列中注明。

12. 深入了解

本指南介绍了如何使用 Quarkus 与 Pulsar 进行交互。它利用 Quarkus Messaging 来构建数据流应用程序。

如果您想进一步了解,请查看 SmallRye Reactive Messaging 的文档,这是 Quarkus 中使用的实现。

相关内容