Apache Pulsar 参考指南
本参考指南演示了您的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Pulsar 进行交互。
1. 简介
Apache Pulsar 是一个为云构建的开源分布式消息传递和流处理平台。它提供了一个多租户、高性能的解决方案来服务消息传递,并具有分层存储功能。
Pulsar 实现了发布-订阅模式
-
生产者将消息发布到主题。
-
消费者创建这些主题的订阅以接收和处理传入的消息,并在处理完成后向 broker 发送确认。
-
创建订阅后,Pulsar 会保留所有消息,即使消费者已断开连接。仅当消费者确认已成功处理所有这些消息时,才会丢弃保留的消息。
Pulsar 集群包含
-
一个或多个broker,它们是无状态组件。
-
用于维护主题元数据、模式、协调和集群配置的元数据存储。
-
一组用于消息持久化存储的 bookies。
2. Apache Pulsar 的 Quarkus 扩展
Quarkus 通过 SmallRye Reactive Messaging 框架为 Apache Pulsar 提供支持。它基于 Eclipse MicroProfile Reactive Messaging 规范 3.0,提出了一个灵活的编程模型,将 CDI 和事件驱动桥接起来。
本指南深入探讨了 Apache Pulsar 和 SmallRye Reactive Messaging 框架。要快速入门,请查看 开始使用 Apache Pulsar 的 Quarkus Messaging。 |
您可以通过在项目基本目录中运行以下命令将 messaging-pulsar
扩展添加到您的项目
quarkus extension add messaging-pulsar
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
./gradlew addExtension --extensions='messaging-pulsar'
这会将以下内容添加到您的构建文件中
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-pulsar")
该扩展包括 |
3. 配置 SmallRye Pulsar 连接器
由于 SmallRye Reactive Messaging 框架支持不同的消息传递后端,如 Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT 等,因此它采用通用词汇表
Pulsar 连接器的最小配置,带有一个传入通道,如下所示
%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 | 配置生产环境配置文件的 Pulsar broker 服务 URL。您可以使用 mp.messaging.incoming.$channel.serviceUrl 属性全局配置它,也可以按通道进行配置。在开发模式和运行测试时,Pulsar 的 Dev Services 会自动启动 Pulsar broker。 |
2 | 配置连接器以管理价格通道。默认情况下,主题名称与通道名称相同。 |
您可以配置主题属性以覆盖它。
%prod 前缀表示该属性仅在应用程序以生产模式运行时使用(因此不在开发或测试模式下)。有关更多详细信息,请参阅 配置文件文档。 |
连接器自动附加
如果您的类路径上只有一个连接器,则可以省略 可以使用以下方式禁用此自动附加功能
|
有关更多配置选项,请参见 配置 Pulsar 客户端。
4. 从 Pulsar 接收消息
Pulsar 连接器使用 Pulsar 客户端连接到 Pulsar broker,并创建消费者以从 Pulsar broker 接收消息,并将每个 Pulsar Message
映射到 Reactive Messaging Message
。
4.1. 示例
假设您有一个正在运行的 Pulsar broker,可以使用 pulsar:6650
地址访问。配置您的应用程序以接收 prices
通道上的 Pulsar 消息,如下所示
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
-
配置 Pulsar broker 服务 URL。
-
确保消费者订阅从
Earliest
位置开始接收消息。
您无需设置 Pulsar 主题或消费者名称。默认情况下,连接器使用通道名称 ( |
在 Pulsar 中,消费者需要为主题订阅提供 |
然后,您的应用程序可以直接接收 double
有效负载
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,您可以检索 Reactive Messaging 类型 Message<Double>
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
// process the message payload.
double price = msg.getPayload();
// Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
return msg.ack();
}
Reactive Messaging Message
类型允许消费方法访问传入消息元数据并手动处理确认。
如果您想直接访问 Pulsar 消息对象,请使用
@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
String key = msg.getKey();
String value = msg.getValue();
String topic = msg.topicName();
// ...
}
org.apache.pulsar.client.api.Message
由底层 Pulsar 客户端提供,可以直接与消费方法一起使用。
或者,您的应用程序可以将 Multi
注入到您的 bean 中,使用通道名称标识并订阅其事件,如以下示例所示
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
使用 |
以下类型可以作为通道注入
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
与之前的 Message
示例一样,如果您的注入通道接收有效负载 (Multi<T>
),它会自动确认消息,并支持多个订阅者。如果您的注入通道接收 Message (Multi<Message<T>>
),您将负责确认和广播。
4.2. 阻塞处理
Reactive Messaging 在 I/O 线程上调用您的方法。有关此主题的更多详细信息,请参见 Quarkus Reactive Architecture 文档。但是,您通常需要将 Reactive Messaging 与阻塞处理(如数据库交互)结合使用。为此,您需要使用 @Blocking
注释,指示处理是阻塞的,不应在调用者线程上运行。
例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入的有效负载存储到数据库中
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有 2 个
它们具有相同的效果。因此,您可以同时使用两者。第一个提供了更细粒度的调整,例如要使用的工作池以及是否保留顺序。第二个也用于 Quarkus 的其他 Reactive 功能,它使用默认工作池并保留顺序。 有关 |
@RunOnVirtualThread
有关在 Java 虚拟线程上运行阻塞处理的信息,请参见 Quarkus 虚拟线程支持与响应式消息传递文档。 |
@Transactional
如果您的方法使用 |
4.3. Pulsar 订阅类型
Pulsar subscriptionType 消费者配置可以灵活地用于实现不同的消息传递场景,例如发布-订阅或排队。
-
Exclusive 订阅类型允许为“扇出式发布-订阅消息传递”指定唯一订阅名称。这是默认订阅类型。
-
Shared、Key_Shared 或 Failover 订阅类型允许多个消费者共享相同的订阅名称,以实现消费者之间的“消息排队”。
如果未提供订阅名称,Quarkus 将生成唯一 ID。
4.4. 反序列化和 Pulsar 模式
Pulsar 连接器允许为底层 Pulsar 消费者配置模式配置。有关更多信息,请参见 Pulsar 模式配置 & 自动模式发现。
4.5. 确认策略
当从 Pulsar 消息生成的消息被确认时,连接器会向 Pulsar broker 发送 确认请求。大多数情况下,所有 Reactive Messaging 消息都需要确认,这会自动处理。可以使用以下两种策略将确认请求发送到 Pulsar broker
-
Individual acknowledgement 是默认策略,每个消息都会向 broker 发送确认请求。
-
Cumulative acknowledgement,使用
ack-strategy=cumulative
配置,消费者仅确认其收到的最后一条消息。流中直到(包括)提供的消息的所有消息都不会重新传递给该消费者。
默认情况下,Pulsar 消费者不会等待 broker 的确认确认来验证确认。您可以使用 |
4.6. 故障处理策略
如果从 Pulsar 消息生成的消息被 nacked,则会应用失败策略。Quarkus Pulsar 扩展支持 4 种策略
-
nack
(default) 将 否定确认 发送到 broker,触发 broker 将此消息重新传递给消费者。可以使用negativeAckRedeliveryDelayMicros
和negativeAck.redeliveryBackoff
属性进一步配置否定确认。 -
fail
使应用程序失败,不再处理消息。 -
ignore
将记录失败,但将应用确认策略并继续处理。 -
continue
将记录失败,但继续处理而不应用确认或否定确认。此策略可以与 确认超时 配置一起使用。 -
reconsume-later
使用reconsumeLater
API 将消息发送到 重试消息主题,以便延迟重新消费。可以使用reconsumeLater.delay
属性配置延迟,默认为 3 秒。可以通过将io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata
实例添加到失败元数据来配置每个消息的自定义延迟或属性。
4.6.1. 确认超时
类似于否定确认,使用 确认超时 机制,Pulsar 客户端会跟踪未确认的消息,在给定的 ackTimeout 周期内,并向 broker 发送 redeliver unacknowledged messages request,因此 broker 会将未确认的消息重新发送给消费者。
要配置超时和重新传递退避机制,您可以设置 ackTimeoutMillis
和 ackTimeout.redeliveryBackoff
属性。ackTimeout.redeliveryBackoff
值接受以逗号分隔的值,分别为最小延迟(以毫秒为单位)、最大延迟(以毫秒为单位)和乘数
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
4.6.2. 稍后重新消费和重试消息主题
重试消息主题 将未成功消费的消息推送到死信主题,并继续消息消费。请注意,死信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试消息主题。
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2
4.6.3. 死信主题
死信主题 将未成功消费的消息推送到死信主题,并继续消息消费。请注意,死信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试消息主题。
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared
用于重新传递的否定确认或确认超时方法将重新传递至少包含一条未处理消息的整个消息批次。有关更多信息,请参见 生产者批处理。 |
4.7. 批量接收 Pulsar 消息
默认情况下,传入方法单独接收每条 Pulsar 消息。您可以使用 batchReceive=true
属性启用批处理模式,或者在消费者配置中设置 batchReceivePolicy
。
@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
或者,您可以直接将有效负载列表接收到消费方法
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
Quarkus 会自动检测传入通道的批处理类型,并自动设置批处理配置。您可以使用 |
5. 向 Pulsar 发送消息
Pulsar 连接器可以将 Reactive Messaging `Message`s 作为 Pulsar Message 写入。
5.1. 示例
假设您有一个正在运行的 Pulsar broker,可以使用 pulsar:6650
地址访问。配置您的应用程序以将来自 prices
通道的消息写入 Pulsar Messages,如下所示
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
-
配置 Pulsar broker 服务 URL。
您无需设置 Pulsar 主题或生产者名称。默认情况下,连接器使用通道名称 ( |
然后,您的应用程序必须将 Message<Double>
发送到 prices
通道。它可以像以下代码段中那样使用 double
有效负载
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
请注意,generate 方法返回一个 Multi<Double>
,它实现了 Flow.Publisher
接口。框架将使用此发布者生成消息并将其发送到配置的 Pulsar 主题。
您可以返回 io.smallrye.reactive.messaging.pulsar.OutgoingMessage
而不是返回有效负载来发送 Pulsar 消息
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
有效负载可以包装在 org.eclipse.microprofile.reactive.messaging.Message
中,以更好地控制写入的记录
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
发送 Messages
时,您可以添加 io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata
实例以影响消息的写入方式到 Pulsar。
除了返回 Flow.Publisher
的方法签名外,传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
5.2. 序列化和 Pulsar 模式
Pulsar 连接器允许为底层 Pulsar 生产者配置模式配置。有关更多信息,请参见 Pulsar 模式配置 & 自动模式发现。
5.3. 发送键/值对
为了将 Kev/Value 对发送到 Pulsar,您可以使用 KeyValue 模式配置 Pulsar 生产者模式。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
如果您需要更好地控制写入的记录,请使用 PulsarOutgoingMessageMetadata
。
5.4. 确认
从生产者接收消息后,Pulsar broker 会为该消息分配一个 MessageId
并将其发回给生产者,以确认消息已发布。
默认情况下,连接器会等待 Pulsar 确认记录以继续处理(确认收到的 Message
)。您可以通过将 waitForWriteCompletion
属性设置为 false
来禁用此功能。
如果无法写入记录,则消息将被 nacked
。
如果发生故障,Pulsar 客户端会自动重试发送消息,直到达到 发送超时。可以使用 |
5.5. 背压和飞行中记录
Pulsar 出站连接器处理背压,监控等待写入 Pulsar broker 的挂起消息数量。挂起消息的数量使用 maxPendingMessages
属性配置,默认值为 1000。
连接器一次仅发送该数量的消息。在至少一条挂起消息被 broker 确认之前,不会发送其他消息。然后,当 broker 的一条挂起消息被确认时,连接器会向 Pulsar 写入一条新消息。
您还可以通过将 maxPendingMessages
设置为 0
来删除挂起消息的限制。请注意,Pulsar 还允许使用 maxPendingMessagesAcrossPartitions
配置每个分区的挂起消息数量。
6. Pulsar 事务和精确一次处理
Pulsar 事务 使事件流应用程序能够在一个原子操作中消费、处理和生成消息。
事务允许一个或多个生产者将一批消息发送到多个主题,其中批处理中的所有消息最终对任何消费者可见,或者没有消息对消费者可见。
为了使用,需要在 broker 配置中激活事务支持,使用 |
在客户端,还需要在 PulsarClient
配置中启用事务支持
mp.messaging.outgoing.tx-producer.enableTransaction=true
Pulsar 连接器提供 PulsarTransactions
自定义发射器,用于在事务中写入记录。
它可以用作常规发射器 @Channel
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarTransactionalProducer {
@Inject
@Channel("tx-out-example")
PulsarTransactions<OutgoingMessage<Integer>> txProducer;
@Inject
@Channel("other-producer")
PulsarTransactions<String> producer;
@Incoming("in")
public Uni<Void> emitInTransaction(Message<Integer> in) {
return txProducer.withTransaction(emitter -> {
emitter.send(OutgoingMessage.of("a", 1));
emitter.send(OutgoingMessage.of("b", 2));
emitter.send(OutgoingMessage.of("c", 3));
producer.send(emitter, "4");
producer.send(emitter, "5");
producer.send(emitter, "6");
return Uni.createFrom().completionStage(in::ack);
});
}
}
给定给 withTransaction
方法的函数接收一个 TransactionalEmitter
用于生成记录,并返回一个 Uni
,它提供事务的结果。如果处理成功完成,则刷新生产者并提交事务。如果处理抛出异常,返回失败的 Uni
,或将 TransactionalEmitter
标记为中止,则事务将被中止。
多个事务生产者可以参与单个事务。这确保所有消息都使用启动的事务发送,并且在事务提交之前,所有参与的生产者都会被刷新。 |
如果在 Vert.x 上下文中调用此方法,则处理函数也会在该上下文中调用。否则,它将在生产者的发送线程上调用。
6.1. 精确一次处理
Pulsar 事务 API 还允许在事务中管理消费者偏移量,以及生成的消息。这反过来使消费者能够与事务生产者以消费-转换-生产模式耦合,也称为精确一次处理。这意味着应用程序消费消息,处理它们,将结果发布到主题,并在事务中提交已消费消息的偏移量。
PulsarTransactions
发射器还提供了一种在事务中将精确一次处理应用于传入 Pulsar 消息的方法。
以下示例在事务中包含一批 Pulsar 消息。
mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
如果处理成功完成,则在事务中确认消息并提交事务。
使用精确一次处理时,消息只能单独确认,而不能累积确认。 |
如果处理需要中止,则消息将被 nack。可以使用其中一种失败策略来重试处理或仅进行故障停止。请注意,如果事务失败并中止,则从 withTransaction
返回的 Uni
将产生失败。
应用程序可以选择处理错误情况,但为了消息消费继续,从 @Incoming
方法返回的 Uni
不能导致失败。PulsarTransactions#withTransactionAndAck
方法将 ack 和 nack 消息,但不会停止 Reactive 流。忽略失败只会将消费者重置为上次提交的偏移量,并从那里恢复处理。
为了避免在失败情况下出现重复,建议在 broker 端启用消息重复数据删除和批处理索引级别确认
|
7. Pulsar 模式配置 & 自动模式发现
Pulsar 消息以有效负载作为非结构化字节数组存储。Pulsar 模式 定义了如何将结构化数据序列化为原始消息字节。模式 在生产者和消费者中应用,以使用强制数据结构进行写入和读取。它在数据发布到主题之前将数据序列化为原始字节,并在数据传递给消费者之前将原始字节反序列化。
Pulsar 使用模式注册表作为中央存储库来存储注册的模式信息,这使生产者/消费者可以通过 broker 协调主题消息的模式。默认情况下,Apache BookKeeper 用于存储模式。
Pulsar 连接器允许使用 schema
属性将模式指定为基本类型
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
如果 schema
属性的值与 Schema Type 匹配,则将使用该类型创建一个简单模式,并将其用于该通道。
Pulsar 连接器允许通过 CDI 提供 Schema
bean 来配置复杂模式类型,并使用 @Identifier
限定符标识。
例如,以下 bean 提供了一个 JSON 模式和一个键/值模式
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
要使用定义的模式配置传入通道 users
,您需要将 schema
属性设置为模式 user-schema
的标识符
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
如果未找到 schema
属性,则连接器会查找使用通道名称标识的 Schema
bean。例如,传出通道 a-channel
将使用键/值模式。
mp.messaging.outgoing.a-channel.connector=smallrye-pulsar
如果未提供模式信息,传入通道将使用 Schema.AUTO_CONSUME()
,而传出通道将使用 Schema.AUTO_PRODUCE_BYTES()
模式。
7.1. 自动模式发现
当使用 Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar
) 时,Quarkus 通常可以自动检测要配置的正确 Pulsar 模式。此自动检测基于 @Incoming
和 @Outgoing
方法的声明,以及注入的 @Channel
s。
例如,如果您声明
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
并且您的配置表明 generated-price
通道使用 smallrye-pulsar
连接器,那么 Quarkus 将自动将 generated-price
通道的 schema
属性设置为 Pulsar Schema INT32
。
类似地,如果您声明
@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
...
}
并且您的配置表明 my-pulsar-consumer
通道使用 smallrye-pulsar
连接器,那么 Quarkus 将自动将 schema
属性设置为 Pulsar BYTES
Schema。
最后,如果您声明
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
并且您的配置表明 price-create
通道使用 smallrye-pulsar
连接器,那么 Quarkus 将自动将 schema
设置为 Pulsar INT64
Schema。
Pulsar Schema 自动检测支持的完整类型集为
-
short
和java.lang.Short
-
int
和java.lang.Integer
-
long
和java.lang.Long
-
float
和java.lang.Float
-
double
和java.lang.Double
-
byte[]
-
java.time.Instant
-
java.sql.Timestamp
-
java.time.LocalDate
-
java.time.LocalTime
-
java.time.LocalDateTime
-
java.nio.ByteBuffer
-
从 Avro 模式生成的类,以及 Avro
GenericRecord
,将使用AVRO
模式类型配置 -
从 Protobuf 模式生成的类,将使用
PROTOBUF
模式类型配置 -
其他类将自动使用
JSON
模式类型配置
请注意, |
除了这些 Pulsar 提供的模式外,Quarkus 还提供以下不强制执行验证的模式实现
-
io.vertx.core.buffer.Buffer
将使用io.quarkus.pulsar.schema.BufferSchema
模式配置 -
io.vertx.core.json.JsonObject
将使用io.quarkus.pulsar.schema.JsonObjectSchema
模式配置 -
io.vertx.core.json.JsonArray
将使用io.quarkus.pulsar.schema.JsonArraySchema
模式配置 -
对于无模式 Json 序列化,如果
schema
配置设置为ObjectMapper<fully_qualified_name_of_the_bean>
,则将使用 JacksonObjectMapper
生成模式,而不强制执行 Pulsar Schema 验证。可以使用io.quarkus.pulsar.schema.ObjectMapperSchema
显式配置 JSON 模式而不进行验证。
如果通过配置设置了 schema
,则不会被自动检测替换。
如果您在序列化器自动检测方面遇到任何问题,您可以通过设置 quarkus.messaging.pulsar.serializer-autodetection.enabled=false
完全关闭它。如果您发现需要这样做,请在 Quarkus 问题跟踪器 中提交一个错误,以便我们可以解决您遇到的任何问题。
8. Pulsar 的 Dev Services
使用 Quarkus Messaging Pulsar 扩展 (quarkus-messaging-pulsar
) 时,Pulsar 的 Dev Services 会在开发模式和运行测试时自动启动 Pulsar broker。因此,您不必手动启动 broker。应用程序会自动配置。
8.1. 启用/禁用 Pulsar 的 Dev Services
Pulsar 的 Dev Services 会自动启用,除非
-
quarkus.pulsar.devservices.enabled
设置为false
-
配置了
pulsar.client.serviceUrl
-
所有 Reactive Messaging Pulsar 通道都设置了
serviceUrl
属性
Pulsar 的 Dev Services 依赖于 Docker 来启动 broker。如果您的环境不支持 Docker,您需要手动启动 broker,或连接到已运行的 broker。您可以使用 pulsar.client.
配置 broker 地址。
8.2. 共享 broker
大多数情况下,您需要在应用程序之间共享代理。Pulsar 的 Dev Services 为运行在开发模式下的多个 Quarkus 应用程序实现了一个服务发现机制,以共享单个代理。
Pulsar 的 Dev Services 使用 quarkus-dev-service-pulsar 标签启动容器,该标签用于标识容器。 |
如果您需要多个(共享)代理,您可以配置 quarkus.pulsar.devservices.service-name
属性并指定代理名称。它会查找具有相同值的容器,如果找不到,则启动一个新的容器。默认服务名称为 pulsar
。
默认情况下,共享在开发模式下启用,但在测试模式下禁用。您可以使用 quarkus.pulsar.devservices.shared=false
禁用共享。
8.3. 设置端口
默认情况下,Pulsar 的 Dev Services 会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.pulsar.devservices.port
属性来设置端口。
请注意,Pulsar 发布的地址会自动配置为所选端口。
8.4. 配置镜像
Pulsar 的 Dev Services 支持 官方 Apache Pulsar 镜像。
可以这样配置自定义镜像名称
quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7
8.5. 配置 Pulsar 代理
您可以使用自定义代理配置来配置 Pulsar 的 Dev Services。
以下示例启用了事务支持
quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true
8.6. 配置参考
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
如果已显式启用或禁用 Pulsar 的 Dev Services。Dev Services 通常默认启用,除非存在现有配置。对于 Pulsar,Dev Services 会启动一个代理,除非设置了 环境变量: 显示更多 |
布尔值 |
|
开发服务将侦听的可选固定端口。 如果未定义,将随机选择端口。 环境变量: 显示更多 |
整数 |
|
要使用的镜像。请注意,仅支持 Apache Pulsar 镜像。具体来说,镜像存储库必须以 环境变量: 显示更多 |
字符串 |
|
指示 Quarkus Dev Services 管理的 Pulsar 代理是否共享。共享时,Quarkus 会使用基于标签的服务发现来查找正在运行的容器。如果找到匹配的容器,则会使用该容器,因此不会启动第二个容器。否则,Pulsar 的 Dev Services 会启动一个新的容器。 该发现使用 容器共享仅在开发模式下使用。 环境变量: 显示更多 |
布尔值 |
|
附加到启动的容器的 当您需要多个共享 Pulsar 代理时,将使用此属性。 环境变量: 显示更多 |
字符串 |
|
要在 Pulsar 实例上设置的代理配置 环境变量: 显示更多 |
Map<String,String> |
9. 配置 Pulsar 客户端
Pulsar 客户端、消费者和生产者非常可定制,可以配置 Pulsar 客户端应用程序的行为方式。
Pulsar 连接器为每个通道创建一个 Pulsar 客户端,以及一个消费者或生产者,每个通道都有合理的默认设置,以简化它们的配置。尽管创建过程是自动处理的,但所有可用的配置选项仍然可以通过 Pulsar 通道进行配置。
虽然创建 PulsarClient
、PulsarConsumer
或 PulsarProducer
的惯用方式是通过构建器 API,但本质上这些 API 每次都会构建一个配置对象,以便传递给实现。这些是 ClientConfigurationData、ConsumerConfigurationData 和 ProducerConfigurationData。
Pulsar 连接器允许直接接收这些配置对象的属性。例如,PulsarClient
的代理身份验证信息使用 authPluginClassName
和 authParams
属性接收。为了配置传入通道 data
的身份验证
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
请注意,Pulsar 消费者属性 subscriptionInitialPosition
也配置了 Earliest
值,该值表示枚举值 SubscriptionInitialPosition.Earliest
。
此方法涵盖了大多数配置情况。但是,无法以这种方式配置非可序列化对象,例如 CryptoKeyReader
、ServiceUrlProvider
等。Pulsar 连接器允许考虑 Pulsar 配置数据对象的实例 – ClientConfigurationData
、ConsumerConfigurationData
、ProducerConfigurationData
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
检索并使用此实例来配置连接器使用的客户端。您需要使用 client-configuration
、consumer-configuration
或 producer-configuration
属性来指示客户端的名称
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
如果未配置 [client|consumer|producer]-configuration
,则连接器将查找使用通道名称标识的实例
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
您还可以提供一个 Map<String, Object>
,其中包含按键的配置值
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;
class PulsarConfig {
@Produces
@Identifier("prices")
public Map<String, Object> getProducerConfig() {
return Map.of(
"batcherBuilder", BatcherBuilder.KEY_BASED,
"sendTimeoutMs", 3000,
"customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
}
}
不同的配置源按以下优先级顺序加载,从最不重要到最重要
-
使用默认配置标识符
default-pulsar-client
、default-pulsar-consumer
、default-pulsar-producer
生成的Map<String, Object>
配置映射。 -
使用配置或通道名称中的标识符生成的
Map<String, Object>
配置映射 -
使用通道配置或通道名称中的标识符生成的
[Client|Producer|Consuemr]ConfigurationData
对象 -
以
[Client|Producer|Consuemr]ConfigurationData
字段名称命名的通道配置属性。
有关配置选项的详尽列表,请参阅 配置参考。
9.1. 配置 Pulsar 身份验证
Pulsar 提供了一个可插拔的身份验证框架,Pulsar 代理/代理使用此机制来验证客户端的身份。
可以使用 authPluginClassName
和 authParams
属性在 application.properties
文件中配置客户端
pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}
或者以编程方式
import java.util.Map;
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData config() {
var data = new ClientConfigurationData();
var auth = new AuthenticationBasic();
auth.configure(Map.of("userId", "superuser", "password", "admin"));
data.setAuthentication(auth);
return data;
}
}
9.1.1. 使用 mTLS 配置到 Pulsar 的身份验证
Pulsar Messaging 扩展与 Quarkus TLS 注册表集成,以使用 mTLS 验证客户端身份。
要为 Pulsar 通道配置 mTLS,您需要在 application.properties
中提供一个命名的 TLS 配置
quarkus.tls.my-tls-config.trust-store.p12.path=target/certs/pulsar-client-truststore.p12
quarkus.tls.my-tls-config.trust-store.p12.password=secret
quarkus.tls.my-tls-config.key-store.p12.path=target/certs/pulsar-client-keystore.p12
quarkus.tls.my-tls-config.key-store.p12.password=secret
mp.messaging.incoming.prices.tls-configuration-name=my-tls-config
9.1.2. 配置对 Datastax Luna Streaming 的访问
Luna Streaming 是 Apache Pulsar 的生产就绪版本,具有 DataStax 的工具和支持。创建 DataStax Luna Pulsar 租户后,请记下自动生成的令牌,并配置令牌身份验证
pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....
确保提前创建主题,或在命名空间配置中启用自动主题创建。
请注意,主题配置需要引用主题的完整名称
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
9.1.3. 配置对 StreamNative Cloud 的访问
StreamNative Cloud 是一种完全托管的 Pulsar 即服务,可在不同的部署选项中使用,无论是完全托管、在公共云上但由 StreamNative 管理还是在 Kubernetes 上自行管理。
StreamNative Pulsar 集群使用 Oauth2 身份验证,因此您需要确保存在一个具有所需 Pulsar 命名空间/主题权限的 服务帐户您的应用程序正在使用。
接下来,您需要下载服务帐户的 密钥文件(用作 私钥),并记下群集的 颁发者 URL(通常为 https://auth.streamnative.cloud/
)和 受众(例如 urn:sn:pulsar:o-rf3ol:redhat
)。StreamNative Cloud 控制台中 管理 部分中的 Pulsar 客户端 页面可帮助您完成此过程。
要使用 Pulsar Oauth2 身份验证配置您的应用程序
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
请注意,pulsar.client.authParams
配置包含一个 Json 字符串,其中 issuerUrl
、audience
和 privateKey
的格式为 data:application/json;base64,<base64-encoded-key-file>
。
或者,您可以以编程方式配置身份验证
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
这假设密钥文件作为资源包含在应用程序类路径中,然后配置将如下所示
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
请注意,使用以 pulsar-auth
标识的客户端配置的通道需要设置 client-configuration
属性。
10. 健康检查
Quarkus 扩展报告由 Pulsar 连接器管理的每个通道的启动、就绪和活动状态。健康检查依赖于 Pulsar 客户端来验证是否已与代理建立连接。
当与代理建立连接时,入站和出站通道的 启动 和 就绪 探针都会报告 OK。
当与代理建立连接 并且 没有捕获到任何故障时,入站和出站通道的 活动 探针都会报告 OK。
请注意,消息处理失败会 nack 消息,然后由失败策略处理。失败策略负责报告故障并影响活动检查的结果。fail
失败策略会报告故障,因此活动检查会报告故障。
11. 配置参考
以下是 Pulsar 连接器通道、消费者、生产者和客户端的配置属性列表。有关如何配置 Pulsar 客户端的更多信息,请参阅 Pulsar 客户端配置。
11.1. 入站通道配置(从 Pulsar 接收)
以下属性使用以下方式配置
mp.messaging.incoming.your-channel-name.attribute=value
属性 (别名) | 描述 | 类型 | 强制 | 默认 |
---|---|---|---|---|
ack-strategy |
指定在确认从记录生成的消息时要应用的提交策略。值可以是 |
字符串 |
false |
|
ackTimeout.redeliveryBackoff |
用于配置 ack 超时 MultiplierRedeliveryBackoff 的逗号分隔值,最小延迟、最大延迟、乘数。 |
字符串 |
false |
|
batchReceive |
是否使用批量接收来消费消息 |
布尔值 |
false |
|
client-configuration |
提供此通道的默认 Pulsar 客户端配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。 |
字符串 |
false |
|
consumer-configuration |
提供此通道的默认 Pulsar 消费者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。 |
字符串 |
false |
|
deadLetterPolicy.deadLetterTopic |
将发送失败消息的死信主题的名称 |
字符串 |
false |
|
deadLetterPolicy.initialSubscriptionName |
死信主题的初始订阅名称的名称 |
字符串 |
false |
|
deadLetterPolicy.maxRedeliverCount |
在发送到死信主题之前,消息将被重新传递的最大次数 |
整数 |
false |
|
deadLetterPolicy.retryLetterTopic |
将发送失败消息的重试主题的名称 |
字符串 |
false |
|
失败策略 |
指定在对从记录生成的消息进行负面确认 (nack) 时要应用的失败策略。值可以是 |
字符串 |
false |
|
health-enabled |
是否启用(默认)或禁用健康报告 |
布尔值 |
false |
|
negativeAck.redeliveryBackoff |
用于配置负面 ack MultiplierRedeliveryBackoff 的逗号分隔值,最小延迟、最大延迟、乘数。 |
字符串 |
false |
|
reconsumeLater.delay |
reconsume 失败策略的默认延迟(以秒为单位) |
long |
false |
|
schema |
此通道的 Pulsar 模式类型。配置后,将使用给定的 SchemaType 构建模式并用于通道。如果不存在,则模式解析为搜索使用 |
字符串 |
false |
|
serviceUrl |
Pulsar 服务的服务 URL |
字符串 |
false |
|
topic |
消费/填充的 Pulsar 主题。如果未设置,则使用通道名称 |
字符串 |
false |
|
已启用跟踪 |
是否启用(默认)或禁用跟踪 |
布尔值 |
false |
|
您还可以配置底层 Pulsar 消费者支持的属性。
这些属性也可以使用 pulsar.consumer
前缀全局配置
pulsar.consumer.subscriptionInitialPosition=Earliest
属性 | 描述 | 类型 | 配置文件 | 默认 |
---|---|---|---|---|
topicNames |
主题名称 |
Set |
true |
[] |
topicsPattern |
主题模式 |
Pattern |
true |
|
subscriptionName |
订阅名称 |
String |
true |
|
subscriptionType |
订阅类型。 |
SubscriptionType |
true |
Exclusive |
subscriptionProperties |
Map |
true |
||
subscriptionMode |
SubscriptionMode |
true |
Durable |
|
messageListener |
MessageListener |
false |
||
consumerEventListener |
ConsumerEventListener |
false |
||
negativeAckRedeliveryBackoff |
自定义消息的接口是 negativeAcked 策略。您可以为消费者指定 |
RedeliveryBackoff |
false |
|
ackTimeoutRedeliveryBackoff |
自定义消息的接口是 ackTimeout 策略。您可以为消费者指定 |
RedeliveryBackoff |
false |
|
receiverQueueSize |
消费者接收队列的大小。 |
整数 |
true |
1000 |
acknowledgementsGroupTimeMicros |
将消费者确认分组一段时间。 |
long |
true |
100000 |
maxAcknowledgmentGroupSize |
按消息数量对消费者确认进行分组。 |
整数 |
true |
1000 |
negativeAckRedeliveryDelayMicros |
在重新传递处理失败的消息之前等待的延迟。 |
long |
true |
60000000 |
maxTotalReceiverQueueSizeAcrossPartitions |
跨分区的最大总接收队列大小。 |
整数 |
true |
50000 |
consumerName |
消费者名称 |
String |
true |
|
ackTimeoutMillis |
未确认消息的超时 |
long |
true |
0 |
tickDurationMillis |
ack-timeout 重新传递的粒度。 |
long |
true |
1000 |
priorityLevel |
消费者在共享订阅类型中,代理在分派消息时为其提供更高优先级的优先级级别。 代理将消息分派给使用者的顺序为:C1、C2、C3、C1、C4、C5、C4。 |
整数 |
true |
0 |
maxPendingChunkedMessage |
保持挂起分块消息的队列的最大大小。当达到阈值时,使用者会删除挂起消息以优化内存利用率。 |
整数 |
true |
10 |
autoAckOldestChunkedMessageOnQueueFull |
当达到 |
布尔值 |
true |
false |
expireTimeOfIncompleteChunkedMessageMillis |
如果使用者未能在指定的时间段内收到所有块,则使不完整块过期的时间间隔。默认值为 1 分钟。 |
long |
true |
60000 |
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
cryptoFailureAction |
当消费者收到无法解密的消息时应采取的措施。 消息的解压缩失败。 如果消息包含批处理消息,则客户端无法检索批处理中的单个消息。 传递的加密消息包含 |
ConsumerCryptoFailureAction |
true |
FAIL |
properties |
此消费者的名称或值属性。
在获取主题统计信息时,将此元数据与消费者统计信息相关联,以便于识别。 |
SortedMap |
true |
{} |
readCompacted |
如果启用 消费者仅在压缩主题中看到每个键的最新值,直到达到压缩积压时的主题消息中的点。超出该点后,像往常一样发送消息。 仅在订阅持久主题时启用 尝试在非持久主题的订阅或共享订阅上启用它会导致订阅调用抛出 |
布尔值 |
true |
false |
subscriptionInitialPosition |
首次订阅主题时,设置光标的初始位置。 |
SubscriptionInitialPosition |
true |
Latest |
patternAutoDiscoveryPeriod |
使用模式获取主题消费者时的主题自动发现周期。 默认值和最小值是 1 分钟。 |
整数 |
true |
60 |
regexSubscriptionMode |
使用正则表达式订阅主题时,您可以选择特定类型的主题。 * PersistentOnly:仅订阅持久主题。 |
RegexSubscriptionMode |
true |
PersistentOnly |
deadLetterPolicy |
消费者的死信策略。 默认情况下,某些消息可能会重新传递多次,甚至可能永远不会停止。 通过使用死信机制,消息具有最大重新传递计数。当超出最大重新传递次数时,消息会自动发送到死信主题并进行确认。 您可以通过设置 在指定死信策略,但不指定 |
DeadLetterPolicy |
true |
|
retryEnable |
布尔值 |
true |
false |
|
batchReceivePolicy |
BatchReceivePolicy |
false |
||
autoUpdatePartitions |
如果启用了 注意:这仅适用于分区消费者。 |
布尔值 |
true |
true |
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
replicateSubscriptionState |
如果启用了 |
布尔值 |
true |
false |
resetIncludeHead |
布尔值 |
true |
false |
|
keySharedPolicy |
KeySharedPolicy |
false |
||
batchIndexAckEnabled |
布尔值 |
true |
false |
|
ackReceiptEnabled |
布尔值 |
true |
false |
|
poolMessages |
布尔值 |
true |
false |
|
payloadProcessor |
MessagePayloadProcessor |
false |
||
startPaused |
布尔值 |
true |
false |
|
autoScaledReceiverQueueSizeEnabled |
布尔值 |
true |
false |
|
topicConfigurations |
List |
true |
[] |
11.2. 出站通道配置(发布到 Pulsar)
属性 (别名) | 描述 | 类型 | 强制 | 默认 |
---|---|---|---|---|
client-configuration |
提供此通道的默认 Pulsar 客户端配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。 |
字符串 |
false |
|
health-enabled |
是否启用(默认)或禁用健康报告 |
布尔值 |
false |
|
maxPendingMessages |
保持挂起消息的队列的最大大小,即等待接收来自代理的确认的消息 |
整数 |
false |
|
producer-configuration |
提供此通道的默认 Pulsar 生产者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。bean 必须具有 Map<String, Object> 类型,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符来设置标识符。 |
字符串 |
false |
|
schema |
此通道的 Pulsar 模式类型。配置后,将使用给定的 SchemaType 构建模式并用于通道。如果不存在,则模式解析为搜索使用 |
字符串 |
false |
|
serviceUrl |
Pulsar 服务的服务 URL |
字符串 |
false |
|
topic |
消费/填充的 Pulsar 主题。如果未设置,则使用通道名称 |
字符串 |
false |
|
已启用跟踪 |
是否启用(默认)或禁用跟踪 |
布尔值 |
false |
|
waitForWriteCompletion |
客户端是否在确认消息之前等待代理确认写入的记录 |
布尔值 |
false |
|
您还可以配置底层 Pulsar 生产者支持的属性。
这些属性也可以使用 pulsar.producer
前缀全局配置
pulsar.producer.batchingEnabled=false
属性 | 描述 | 类型 | 配置文件 | 默认 |
---|---|---|---|---|
topicName |
主题名称 |
String |
true |
|
producerName |
生产者名称 |
String |
true |
|
sendTimeoutMs |
消息发送超时(毫秒)。 |
long |
true |
30000 |
blockIfQueueFull |
如果设置为
|
布尔值 |
true |
false |
maxPendingMessages |
挂起消息队列的最大大小。 例如,等待接收来自 代理 的确认的消息。 默认情况下,当队列已满时,所有对 |
整数 |
true |
0 |
maxPendingMessagesAcrossPartitions |
跨分区的最大挂起消息数。 如果总数超过配置的值,请使用此设置来降低每个分区的最大挂起消息数 ( |
整数 |
true |
0 |
messageRoutingMode |
在 分区主题 上,生产者的消息路由逻辑。 |
MessageRoutingMode |
true |
|
hashingScheme |
确定在其中发布特定消息的分区(仅限分区主题)的哈希函数。 |
HashingScheme |
true |
JavaStringHash |
cryptoFailureAction |
当加密失败时,生产者应采取的操作。 |
ProducerCryptoFailureAction |
true |
FAIL |
customMessageRouter |
MessageRouter |
false |
||
batchingMaxPublishDelayMicros |
发送消息的批处理时间段。 |
long |
true |
1000 |
batchingPartitionSwitchFrequencyByPublishDelay |
整数 |
true |
10 |
|
batchingMaxMessages |
批处理中允许的最大消息数。 |
整数 |
true |
1000 |
batchingMaxBytes |
整数 |
true |
131072 |
|
batchingEnabled |
启用消息批处理。 |
布尔值 |
true |
true |
batcherBuilder |
BatcherBuilder |
false |
||
chunkingEnabled |
启用消息分块。 |
布尔值 |
true |
false |
chunkMaxMessageSize |
整数 |
true |
-1 |
|
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
encryptionKeys |
Set |
true |
[] |
|
compressionType |
CompressionType |
true |
NONE |
|
initialSequenceId |
Long |
true |
||
autoUpdatePartitions |
布尔值 |
true |
true |
|
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
multiSchema |
布尔值 |
true |
true |
|
accessMode |
ProducerAccessMode |
true |
共享 |
|
lazyStartPartitionedProducers |
布尔值 |
true |
false |
|
properties |
SortedMap |
true |
{} |
|
initialSubscriptionName |
使用此配置在创建主题时自动创建初始订阅。如果未设置此字段,则不会创建初始订阅。 |
String |
true |
11.3. Pulsar 客户端配置
以下是底层 PulsarClient
的配置参考。可以使用通道属性配置这些选项
mp.messaging.incoming.your-channel-name.numIoThreads=4
或使用 pulsar.client
前缀全局配置
pulsar.client.serviceUrl=pulsar://pulsar:6650
属性 | 描述 | 类型 | 配置文件 | 默认 |
---|---|---|---|---|
serviceUrl |
Pulsar 集群 HTTP URL,用于连接到 Broker。 |
String |
true |
|
serviceUrlProvider |
ServiceUrlProvider 的实现类,用于生成 ServiceUrl。 |
ServiceUrlProvider |
false |
|
authentication |
客户端的身份验证设置。 |
Authentication |
false |
|
authPluginClassName |
客户端身份验证插件的类名。 |
String |
true |
|
authParams |
客户端的身份验证参数。 |
String |
true |
|
authParamMap |
客户端的身份验证映射。 |
Map |
true |
|
operationTimeoutMs |
客户端操作超时时间(毫秒)。 |
long |
true |
30000 |
lookupTimeoutMs |
客户端查找超时时间(毫秒)。 |
long |
true |
-1 |
statsIntervalSeconds |
打印客户端统计信息的间隔(秒)。 |
long |
true |
60 |
numIoThreads |
IO 线程数。 |
整数 |
true |
10 |
numListenerThreads |
消费者监听器线程数。 |
整数 |
true |
10 |
connectionsPerBroker |
客户端和每个 Broker 之间建立的连接数。值为 0 表示禁用连接池。 |
整数 |
true |
1 |
connectionMaxIdleSeconds |
如果连接超过 [connectionMaxIdleSeconds] 秒未使用,则释放连接。如果 [connectionMaxIdleSeconds] < 0,则禁用自动释放空闲连接的功能 |
整数 |
true |
180 |
useTcpNoDelay |
是否使用 TCP NoDelay 选项。 |
布尔值 |
true |
true |
useTls |
是否使用 TLS。 |
布尔值 |
true |
false |
tlsKeyFilePath |
TLS 密钥文件的路径。 |
String |
true |
|
tlsCertificateFilePath |
TLS 证书文件的路径。 |
String |
true |
|
tlsTrustCertsFilePath |
受信任的 TLS 证书文件的路径。 |
String |
true |
|
tlsAllowInsecureConnection |
客户端是否接受来自 Broker 的不受信任的 TLS 证书。 |
布尔值 |
true |
false |
tlsHostnameVerificationEnable |
当客户端创建与 Broker 的 TLS 连接时,是否验证主机名。 |
布尔值 |
true |
false |
concurrentLookupRequest |
可以在每个 Broker 连接上发送的并发查找请求数。设置最大值可防止 Broker 过载。 |
整数 |
true |
5000 |
maxLookupRequest |
每个 Broker 连接上允许的最大查找请求数,以防止 Broker 过载。 |
整数 |
true |
50000 |
maxLookupRedirects |
重定向查找请求的最大次数。 |
整数 |
true |
20 |
maxNumberOfRejectedRequestPerConnection |
在当前连接关闭后,Broker 在某个时间段(60 秒)内拒绝的最大请求数,客户端创建一个新连接以连接到不同的 Broker。 |
整数 |
true |
50 |
keepAliveIntervalSeconds |
每个客户端 Broker 连接的保持活动间隔秒数。 |
整数 |
true |
30 |
connectionTimeoutMs |
等待与 Broker 建立连接的持续时间。如果持续时间过去而 Broker 没有响应,则会放弃连接尝试。 |
整数 |
true |
10000 |
requestTimeoutMs |
完成请求的最长持续时间。 |
整数 |
true |
60000 |
readTimeoutMs |
请求的最大读取时间。 |
整数 |
true |
60000 |
autoCertRefreshSeconds |
自动刷新证书的秒数。 |
整数 |
true |
300 |
initialBackoffIntervalNanos |
初始退避间隔(纳秒)。 |
long |
true |
100000000 |
maxBackoffIntervalNanos |
最大退避间隔(纳秒)。 |
long |
true |
60000000000 |
enableBusyWait |
是否为 EpollEventLoopGroup 启用 BusyWait。 |
布尔值 |
true |
false |
listenerName |
查找的监听器名称。只要网络可访问,客户端就可以使用 listenerName 选择其中一个监听器作为服务 URL 来创建与 Broker 的连接。“advertisedListeners”必须在 Broker 端启用。 |
String |
true |
|
useKeyStoreTls |
使用 KeyStore 方式设置 TLS。 |
布尔值 |
true |
false |
sslProvider |
内部客户端用于与其他 Pulsar Broker 进行身份验证的 TLS 提供程序。 |
String |
true |
|
tlsKeyStoreType |
TLS KeyStore 类型配置。 |
String |
true |
JKS |
tlsKeyStorePath |
TLS KeyStore 的路径。 |
String |
true |
|
tlsKeyStorePassword |
TLS KeyStore 的密码。 |
String |
true |
|
tlsTrustStoreType |
TLS TrustStore 类型配置。当需要客户端身份验证时,您需要设置此配置。 |
String |
true |
JKS |
tlsTrustStorePath |
TLS TrustStore 的路径。 |
String |
true |
|
tlsTrustStorePassword |
TLS TrustStore 的密码。 |
String |
true |
|
tlsCiphers |
TLS 密码套件集。 |
Set |
true |
[] |
tlsProtocols |
TLS 协议。 |
Set |
true |
[] |
memoryLimitBytes |
客户端内存使用限制(字节)。 64M 默认值可以保证高的生产者吞吐量。 |
long |
true |
67108864 |
proxyServiceUrl |
代理服务的 URL。 proxyServiceUrl 和 proxyProtocol 必须互为包含。 |
String |
true |
|
proxyProtocol |
代理服务的协议。 proxyServiceUrl 和 proxyProtocol 必须互为包含。 |
ProxyProtocol |
true |
|
enableTransaction |
是否启用事务。 |
布尔值 |
true |
false |
clock |
Clock |
false |
||
dnsLookupBindAddress |
Pulsar 客户端 DNS 查找绑定地址,默认行为是绑定到 0.0.0.0 |
String |
true |
|
dnsLookupBindPort |
Pulsar 客户端 DNS 查找绑定端口,在配置 dnsLookupBindAddress 时生效,默认值为 0。 |
整数 |
true |
0 |
socks5ProxyAddress |
SOCKS5 代理的地址。 |
InetSocketAddress |
true |
|
socks5ProxyUsername |
SOCKS5 代理的用户名。 |
String |
true |
|
socks5ProxyPassword |
SOCKS5 代理的密码。 |
String |
true |
|
description |
客户端版本的额外描述。长度不能超过 64。 |
String |
true |
配置文件中不可配置的配置属性(不可序列化)在 |
12. 深入了解
本指南介绍了如何使用 Quarkus 与 Pulsar 进行交互。它利用 Quarkus Messaging 来构建数据流应用程序。
如果您想进一步了解,请查看 SmallRye Reactive Messaging 的文档,这是 Quarkus 中使用的实现。