结合 Apache Kafka 和 Rest 客户端

又过了一周,又有一个有趣的问题。本周,有人问我关于结合 Kafka 和 Rest Client 的问题。这是一个经常出现的主题,大多数时候,目标是实现以下过程

kafka rest architecture

换句话说,我们想在我们收到的每个 Kafka 消息上调用一个远程服务。因此,我们有一个包含我们正在消费的数据的第一个主题(“in”),例如,“transactions”。然后,我们有架构的核心部分:处理组件。它消费传入的交易,并为每笔交易调用一个远程服务。它还将响应(由远程服务生成)写入另一个 Kafka 主题“out”。

使用 Quarkus 实现这一点非常简单,这正是我们将在本文中介绍的内容。借助 Reactive Messaging 和 Rest Client,这应该只需要 20 行代码!

远程服务

让我们从远程服务开始。Quarkus 提供了多种调用远程 HTTP 服务的方法,但让我们使用 Rest Client,因为它提供了一种无需处理低级 HTTP 细节即可与 HTTP 服务进行交互的出色方式。

您可以使用任何 HTTP API,但为简化起见,让我们考虑一个简单的远程服务,例如

@RegisterRestClient(configKey = "transaction-service")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface TransactionService {

    @Path("/transactions")
    @POST
    TransactionResult postSync(Transaction transaction);

    @Path("/transactions")
    @POST
    Uni<TransactionResult> postAsync(Transaction transaction);

}

此服务包含两个调用同一 HTTP 端点的方法。第一个是同步的,因此在接收到响应之前会阻塞调用线程。第二个是异步的,返回的 Uni 在接收到响应时会获取响应。在这种情况下,调用线程不会被阻塞,可以做其他事情。我们将在稍后介绍如何使用这些方法,但首先,进行一些配置。在 application.properties 中添加

# Configure the transaction-service (rest client)
transaction-service/mp-rest/url=https://:8080

当然,更新 URL。 https://quarkus.net.cn/guides/rest-client 指南提供了有关 Rest Client 的用法和配置的更多详细信息。

为每笔传入交易调用服务

好的,我们可以调用我们的服务,但请记住,我们想为每笔传入交易调用它,而这些交易来自 Kafka 主题。使用 Reactive Messaging,目前无需处理 Kafka。我们可以专注于逻辑。假设我们有一个通道(数据流),用于传输我们的交易。我们将第一个通道命名为 in

我们还想将来自远程服务的响应写入另一个 Kafka 主题。同样,目前无需处理 Kafka。假设我们将响应写入一个名为 out 的通道。

因此,我们有以下(不完整的)代码

@ApplicationScoped
public class TransactionProcessor {

    @Incoming("in") // The first channel - we read from it
    @Outgoing("out") // The second channel - we write to it
    public TransactionResult sendToTransactionService(Transaction transaction) {
       // Need to call our service here
    }

}

@Incoming 配置读取通道。 @Outgoing 配置写入通道。但是,缺少一些东西……我们需要调用我们的远程服务

@ApplicationScoped
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    @Blocking
    public TransactionResult sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postSync(transaction);
    }

}

首先,我们注入 Rest Client。然后,我们只需在我们的方法中调用它。

您可能想知道 @Blocking。使用 Reactive Messaging,您需要指明何时使用阻塞代码,因为默认情况下,它使用事件循环架构。虽然方便,但不应滥用 @Blocking,因为它依赖于限制并发的线程池。但它使您的逻辑保持同步。

使用异步操作

我们可以通过使用 TransactionService 提供的第二个方法:postAsync: 来摆脱 @Blocking 注释:

@ApplicationScoped
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    public Uni<TransactionResult> sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postAsync(transaction);
    }

}

使用 post 方法的异步变体允许我们删除 @Blocking。我们直接返回 Uni。当该 Uni 收到远程服务的响应时,它会将值写入 out 通道。

将通道映射到 Kafka

到目前为止,一切顺利。是时候将我们的代码连接到 Kafka 了。使用 Reactive Messaging,我们将通道映射到连接器,这里是 Kafka。因此,我们只需要配置应用程序,指示 inout 通道是 Kafka 主题。再次编辑 application.properties 文件,并添加

mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=transactions
mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false

mp.messaging.outgoing.out.connector=smallrye-kafka
mp.messaging.outgoing.out.topic=output
mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

第一个块是关于 in 通道的。它连接到 transactions Kafka 主题。数据使用自定义反序列化器进行反序列化。最后一个其他属性禁用自动提交(Reactive Messaging 正在为您处理提交)并从上次提交的偏移量读取数据。

第二个块配置 out 通道。我们将其与 output Kafka 主题连接并配置值序列化器。对于这个简单的示例,我们将数据作为 JSON 写入。

因此,当一个交易被写入 Kafka transaction 主题时,它会被我们的处理组件接收,发送到远程服务,并将结果写入 output Kafka 主题

2020-08-27 10:04:44,141 INFO  [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service
2020-08-27 10:04:44,196 INFO  [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20
2020-08-27 10:04:44,240 INFO  [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service
2020-08-27 10:04:44,245 INFO  [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10

如果您查看 output 主题,您将看到 TransactionResult 流动

output

我们完成了!

只需几行代码和一点配置,我们就可以从 Kafka 主题读取数据,调用远程服务,并将结果写入另一个 Kafka 主题。非常简单。

想自己尝试吗?查看此GitHub 存储库中的代码,并遵循自述文件中的说明。

Reactive Messaging 和 Rest Client 包含其他亮点,请查看相关指南和文档以了解更多信息