使用 Kafka 发送和接收 Cloud Events

Cloud Events 是一种事件描述规范。它的目标是促进互操作性。随着事件驱动架构的兴起,Cloud Events 的流行也就不足为奇了。

本文将介绍如何使用 Quarkus、Kafka 和 Reactive Messaging 生成和消费 Cloud Events。

什么是 Cloud Event?

首先,让我们看看为什么。事件无处不在。许多现代系统或多或少都在使用事件。事件可用于实现事件溯源、沟通事实、触发带外处理或发送通知。事件已成为任何系统的关键部分。

然而,事件发布者倾向于以不同的方式描述事件。我不是指内容不同,而是指信封和事件格式存在异构性,即使这些事件在同一个事件网格上传输。有些应用程序选择 JSON 并将所有内容编码在事件的有效载荷中;另一些系统则偏爱二进制格式,如 Avro 或 Protobuf,并利用协议功能(如标头或属性)来传输有关封装有效载荷的元数据。尽管事件驱动架构声称可以简化与外部系统的集成,但这种差异却适得其反。经常需要一个专门的事件转换器,其唯一目的是将事件从一种格式适配到另一种格式。

那么,什么是 Cloud Event?Cloud Event 提出了一种通用的事件描述方式。其目标显然是互操作性和减轻集成负担。Cloud Event 1.0 大约在一年前发布。在过去的一年中,许多云提供商(如 Azure 和 Oracle)都采用了这种格式。一些中间件(如 Knative、Kogito、Debezium 和 Quarkus)也增加了对 Cloud Events 的支持。

给我看一些例子!

好的,那么它是什么样的呢?理解 Cloud Event 最简单的方法就是看一个例子。

{
    "specversion" : "1.0",
    "id" : "O234-345-890",
    "source" : "https://reactive-coffee-shop.io/1234/order",
    "type" : "me.escoffier.coffee.Order",
    "subject" : "order",
    "time" : "2020-11-25T09:05:00Z",
    "datacontenttype" : "application/json",
    "data" : "{\"name\": \"clement\", \"order\":\"espresso\"}",
    "custom-attribute" : "some custom value"
}

这个事件是用 JSON 描述的,但这只是可能性之一。让我们看看这些字段。

首先,specversion 指明了它使用的是哪个 Cloud Event 版本(1.0)。id 字段为该特定事件提供了 ID。source 属性是一个 URI,用于标识事件源,即事件发生时的上下文或发出该特定事件的应用程序。组合 idsource 可提供唯一标识符。这种唯一性对于实现幂等性和处理潜在重复项至关重要。type 是最后一个强制属性。它指示事件的 *类型*。这里,我使用了完全限定的类名,但您可以想象任何内容。它应该引用您在系统中定义的事件种类。

其他属性是可选的。datacontenttype 定义了 data 属性的内容类型。subject 允许传递有关事件的额外详细信息,例如有关上下文或事件类型的附加提示。time 是一个时间戳,通常表示创建时间。还有一个可选属性在我的示例中未使用。dataschema 属性允许您传递事件数据的模式。

data 属性包含封装的业务事件。它是关键部分,而其他属性仅提供有关该特定业务事件的详细信息。

您还可以定义 *扩展*。当建议的属性集不足以满足您的用例时,可以使用这些扩展作为一组自定义属性。

就是这样!因此,我们可以将 Cloud Events 总结为 *理解事件所需的元数据*——其来源、ID、类型和业务数据。

传输中的 Cloud Events - 绑定

但是,这些事件将如何编码呢?上面使用 JSON 的示例很好,但有些协议可能希望利用自身的功能来传输这些元数据。

因此,Cloud Events 还提出了绑定。绑定是特定于某个协议的一组建议。它解释了每个协议应如何编码 Cloud Events。例如,有一个 HTTP 绑定,一个 Kafka 绑定,还有一个 AMQP 绑定。

这些绑定中的大多数都提出了两种方法:

  • 结构化

  • 二进制

结构化方法将事件元数据和数据一起保留在消息或请求的有效载荷中。它通常使用 JSON 来编码这些数据。如果您将 Cloud Event 示例(来自上面)传递到 HTTP 请求中,它将使用结构化模式。当您将该 JSON 片段写入 Kafka 记录的值时,它也将使用结构化模式。

结构化方法允许跨多种协议进行简单转发。但是,它可能效率不高,并且可能会限制业务数据的类型。

另一种方法依赖于协议功能,并支持高效传输和编码。如果我们与 Kafka 一起使用二进制模式,我们将把 data 属性的值存储在 Kafka 记录的值中,并通过记录的标头传递其他属性。因此,业务数据可以使用 Avro 等二进制协议进行编码,从而提高效率。

本文的其余部分将介绍如何使用 Quarkus、Kafka 和 Reactive Messaging 发送和接收 Cloud Events。

在 Kafka 上发送 Cloud Events

Quarkus 使用的 Kafka 连接器对 Cloud Events 具有内置支持。它可以使用结构化模式(将所有内容编码到 JSON 有效载荷中)或二进制模式(使用 Kafka 标头)来发送和消费 Cloud Events。

要将出站消息写入 Cloud Event,您只需在通道上指定 cloud-events-typecloud-events-source 属性。

mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.generated-price.cloud-events-source=price-generators
mp.messaging.outgoing.generated-price.cloud-events-type=price
mp.messaging.outgoing.generated-price.cloud-events-subject=generated-prices

默认情况下,连接器使用二进制模式写入 Cloud Events。连接器为每条消息生成一个随机的 id。您还可以使用 cloud-events-$attribute(如 cloud-events-subject)来自定义其他 Cloud Event 属性。

上面显示的配置应用于所有出站消息。有时,您希望单独自定义每条消息的值。为此,您还可以将 io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata 附加到您的消息中,以自定义每条消息的 id、source、type 和 subject。

@Outgoing("cloud-events")
public Message<String> toCloudEvents(Message<String> in) {
    return in.addMetadata(OutgoingCloudEventMetadata.builder()
      .withId("id-" + in.getPayload())
      .withType("greetings")
      .withSource(URI.create("http://example.com"))
      .withSubject("greeting-message") .build());
}

连接器还支持 *结构化* 模式。您可以通过将 cloud-events-mode 属性设置为 structured 来写入结构化 Cloud Events。目前仅支持 JSON。写入的记录会将其 content-type 标头设置为 application/cloudevents+json; charset=UTF-8,这使得接收者能够理解它是一个结构化的 Cloud Event。

从 Kafka 消费 Cloud Event

当然,连接器也可以消费 Cloud Events。连接器通过检查记录的标头自动检测 Cloud Events。它还会确定模式。

当连接器收到 Cloud Event 时,它会将 IncomingKafkaCloudEventMetadata 附加到消息元数据中。因此,您可以检索各种属性以及扩展。

public Message<Double> process(Message<Integer> priceInUsd) {
  IncomingCloudEventMetadata<Integer> cloudEventMetadata = priceInUsd.getMetadata(IncomingCloudEventMetadata.class)
    .orElseThrow(() -> new IllegalArgumentException("Expected a Cloud Event"));

  LOGGER.infof("Received Cloud Events (spec-version: %s): source:  '%s', type: '%s', subject: '%s' ",
    cloudEventMetadata.getSpecVersion(),
    cloudEventMetadata.getSource(),
    cloudEventMetadata.getType(),
    cloudEventMetadata.getSubject().orElse("no subject"));

  return priceInUsd.withPayload(Integer.valueOf(priceInUsd.getPayload()) * CONVERSION_RATE);
}

总结

随着事件驱动架构的兴起,Cloud Events 越来越受欢迎。自 Quarkus 1.9 起,Quarkus 中使用的 Kafka 连接器已内置支持 Cloud Events。本文介绍了 Cloud Events,并展示了如何轻松写入和读取 Cloud Events。

还有更多选项可供 选择,而且 Kafka 并不是 Quarkus 中唯一支持 Cloud Events 的部分。例如,Funqy[https://quarkus.net.cn/guides/funqy#context-injection] 也开箱即用地支持 Cloud Event。