结合 Apache Kafka 和 Rest 客户端
又过了一周,又有一个有趣的问题。本周,有人问我关于结合 Kafka 和 Rest Client 的问题。这是一个经常出现的主题,大多数时候,目标是实现以下过程
换句话说,我们想在我们收到的每个 Kafka 消息上调用一个远程服务。因此,我们有一个包含我们正在消费的数据的第一个主题(“in”),例如,“transactions”。然后,我们有架构的核心部分:处理组件。它消费传入的交易,并为每笔交易调用一个远程服务。它还将响应(由远程服务生成)写入另一个 Kafka 主题“out”。
使用 Quarkus 实现这一点非常简单,这正是我们将在本文中介绍的内容。借助 Reactive Messaging 和 Rest Client,这应该只需要 20 行代码!
远程服务
让我们从远程服务开始。Quarkus 提供了多种调用远程 HTTP 服务的方法,但让我们使用 Rest Client,因为它提供了一种无需处理低级 HTTP 细节即可与 HTTP 服务进行交互的出色方式。
您可以使用任何 HTTP API,但为简化起见,让我们考虑一个简单的远程服务,例如
@RegisterRestClient(configKey = "transaction-service")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface TransactionService {
@Path("/transactions")
@POST
TransactionResult postSync(Transaction transaction);
@Path("/transactions")
@POST
Uni<TransactionResult> postAsync(Transaction transaction);
}
此服务包含两个调用同一 HTTP 端点的方法。第一个是同步的,因此在接收到响应之前会阻塞调用线程。第二个是异步的,返回的 Uni
在接收到响应时会获取响应。在这种情况下,调用线程不会被阻塞,可以做其他事情。我们将在稍后介绍如何使用这些方法,但首先,进行一些配置。在 application.properties
中添加
# Configure the transaction-service (rest client)
transaction-service/mp-rest/url=https://:8080
当然,更新 URL。 https://quarkus.net.cn/guides/rest-client 指南提供了有关 Rest Client 的用法和配置的更多详细信息。
为每笔传入交易调用服务
好的,我们可以调用我们的服务,但请记住,我们想为每笔传入交易调用它,而这些交易来自 Kafka 主题。使用 Reactive Messaging,目前无需处理 Kafka。我们可以专注于逻辑。假设我们有一个通道(数据流),用于传输我们的交易。我们将第一个通道命名为 in
。
我们还想将来自远程服务的响应写入另一个 Kafka 主题。同样,目前无需处理 Kafka。假设我们将响应写入一个名为 out
的通道。
因此,我们有以下(不完整的)代码
@ApplicationScoped
public class TransactionProcessor {
@Incoming("in") // The first channel - we read from it
@Outgoing("out") // The second channel - we write to it
public TransactionResult sendToTransactionService(Transaction transaction) {
// Need to call our service here
}
}
@Incoming
配置读取通道。 @Outgoing
配置写入通道。但是,缺少一些东西……我们需要调用我们的远程服务
@ApplicationScoped
public class TransactionProcessor {
private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");
@Inject @RestClient TransactionService service;
@Incoming("in")
@Outgoing("out")
@Blocking
public TransactionResult sendToTransactionService(Transaction transaction) {
LOGGER.infof("Sending %s transaction service", transaction);
return service.postSync(transaction);
}
}
首先,我们注入 Rest Client。然后,我们只需在我们的方法中调用它。
您可能想知道 @Blocking
。使用 Reactive Messaging,您需要指明何时使用阻塞代码,因为默认情况下,它使用事件循环架构。虽然方便,但不应滥用 @Blocking
,因为它依赖于限制并发的线程池。但它使您的逻辑保持同步。
使用异步操作
我们可以通过使用 TransactionService
提供的第二个方法:postAsync:
来摆脱 @Blocking
注释:
@ApplicationScoped
public class TransactionProcessor {
private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");
@Inject @RestClient TransactionService service;
@Incoming("in")
@Outgoing("out")
public Uni<TransactionResult> sendToTransactionService(Transaction transaction) {
LOGGER.infof("Sending %s transaction service", transaction);
return service.postAsync(transaction);
}
}
使用 post
方法的异步变体允许我们删除 @Blocking
。我们直接返回 Uni
。当该 Uni
收到远程服务的响应时,它会将值写入 out
通道。
将通道映射到 Kafka
到目前为止,一切顺利。是时候将我们的代码连接到 Kafka 了。使用 Reactive Messaging,我们将通道映射到连接器,这里是 Kafka。因此,我们只需要配置应用程序,指示 in
和 out
通道是 Kafka 主题。再次编辑 application.properties
文件,并添加
mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=transactions
mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false
mp.messaging.outgoing.out.connector=smallrye-kafka
mp.messaging.outgoing.out.topic=output
mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
第一个块是关于 in
通道的。它连接到 transactions
Kafka 主题。数据使用自定义反序列化器进行反序列化。最后一个其他属性禁用自动提交(Reactive Messaging 正在为您处理提交)并从上次提交的偏移量读取数据。
第二个块配置 out
通道。我们将其与 output
Kafka 主题连接并配置值序列化器。对于这个简单的示例,我们将数据作为 JSON 写入。
因此,当一个交易被写入 Kafka transaction
主题时,它会被我们的处理组件接收,发送到远程服务,并将结果写入 output
Kafka 主题
2020-08-27 10:04:44,141 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service
2020-08-27 10:04:44,196 INFO [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20
2020-08-27 10:04:44,240 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service
2020-08-27 10:04:44,245 INFO [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10
如果您查看 output
主题,您将看到 TransactionResult
流动
我们完成了!
只需几行代码和一点配置,我们就可以从 Kafka 主题读取数据,调用远程服务,并将结果写入另一个 Kafka 主题。非常简单。
想自己尝试吗?查看此GitHub 存储库中的代码,并遵循自述文件中的说明。
Reactive Messaging 和 Rest Client 包含其他亮点,请查看相关指南和文档以了解更多信息