如何实现 Kafka 序列化器和反序列化器?

当您的应用程序将一个记录写入 Kafka 主题或从 Kafka 主题读取一个记录时,会发生序列化和反序列化机制。序列化过程将您要发送到 Kafka 的业务对象转换为字节。反序列化过程则相反。它接收来自 Kafka 的字节并重新创建业务对象。

serde

这篇博文探讨了序列化和反序列化的不同方法,并解释了如何实现自定义序列化器和反序列化器。它还重点介绍了 Quarkus 的 Kafka 连接器提供的功能。

为什么我需要自定义序列化器和反序列化器?

Kafka 为常见类型提供了serializers 和 deserializers:StringDoubleIntegerBytes…… 但这对于业务对象来说几乎是不够的,即使是像这样的简单对象

package me.escoffier.quarkus;

public record Hero(String name, String picture) {

}

幸运的是,Kafka 允许我们实现自己的。要实现这一点,您需要实现以下接口:

Serializer 接口
public interface Serializer<T> extends Closeable {

  default void configure(Map<String, ?> configs, boolean isKey) {  }

  byte[] serialize(String topic, T data);

  default byte[] serialize(String topic, Headers headers, T data) {
    return serialize(topic, data);
  }

  @Override
  default void close() {   }
}
Deserializer 接口
public interface Deserializer<T> extends Closeable {

  default void configure(Map<String, ?> configs, boolean isKey) {  }

  T deserialize(String topic, byte[] data);

  default T deserialize(String topic, Headers headers, byte[] data) {
    return deserialize(topic, data);
  }

  @Override
  default void close() {  }

实现后,您需要配置 Kafka 生产者和消费者的 key 和 value 序列化器和反序列化器。如果您使用的是 Quarkus 的 Kafka 连接器,它看起来会是这样:

mp.messaging.incoming.heroes.value.deserializer=me.escoffier.MyHeroDeserializer
mp.messaging.outgoing.heroes.value.serializer=me.escoffier.MyHeroSerializer

但是,不用担心,Quarkus 为您准备了一些神奇的技巧。

在这篇博文的其余部分,我们将使用以下应用程序:

system

代码可以在 https://github.com/cescoffier/quarkus-kafka-serde-demo 找到。我们将开发三个变体:

  • 第一个版本使用 JSON。

  • 第二个版本使用 Avro。

  • 第三个版本使用自定义(并且是愚蠢的)序列化器和反序列化器。

使用 JSON

在 Kafka 中使用 JSON 非常流行。由于大多数 Web 应用程序都使用 JSON 来交换消息,因此将其与 Kafka 一起使用似乎是一个自然的延伸。

在我们的例子中,这意味着将 Hero 的实例转换为 JSON 文档,然后使用 String serializer。对于反序列化过程,我们将进行相反的操作。要通过 Quarkus 完成此操作,您 **无需** 做任何事情:Quarkus 会为您生成自定义 JSON 序列化器和反序列化器。

json-serde 目录 中,您可以找到一个使用 JSON 来序列化和反序列化记录的应用程序版本。它不包含任何自定义代码或配置。Quarkus 会自动检测到您需要写入和消耗 Hero,并为您生成序列化器和反序列化器。它还会为您配置通道。当然,您可以覆盖配置,但这通常是您想要的。

要运行此应用程序,请打开两个终端。在第一个终端中,导航到 json-serde/json-serde-publisher,然后运行 mvn quarkus:dev。在第二个终端中,导航到 json-serde/json-serde-consumer,然后运行 mvn quarkus:dev。然后,在浏览器中打开 https://:8080。每 5 秒钟,就会显示一张新的英雄图片。

heroes screenshot

使用 Avro

第二种方法使用 Avro。Avro 比(纯)JSON 有几个优点:

  • 它是一种二进制的紧凑协议。有效负载会比 JSON 小很多。

  • 序列化和反序列化过程要快得多(避免了反射)。

  • 消息的格式使用存储在模式注册表中的模式来定义,这支持版本控制并强制执行结构。

最后一点至关重要。要使用 Avro,您需要一个模式注册表。在这篇文章中,我们使用的是 Apicurio,但您也可以使用 Confluent Schema RegistryKarapace。Quarkus 为 Apicurio 提供了开发服务,所以您无需做任何事情(只要您可以在机器上运行容器)。

要使用 Avro,我们需要一个模式。在 hero.avsc 中,您可以找到代表我们英雄的模式。

{
  "namespace": "me.escoffier.quarkus.avro",
  "type": "record",
  "name": "Hero",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "picture",
      "type": "string"
    }
  ]
}

Avro 依赖于代码生成。它处理模式以生成具有定义字段和序列化/反序列化方法的 Java 类。

虽然通常使用代码生成是一个额外的步骤,但借助 Quarkus,它是内置的!一旦您在 src/main/avro 中有一个模式,它就会为您生成代码,您就可以开始使用生成的类了。

AvroPublisherAppAvroConsumerResource 中,我们使用了从模式生成的 Hero 类。例如,消费者应用程序看起来像这样:

package me.escoffier.quarkus;

import io.smallrye.mutiny.Multi;
import me.escoffier.quarkus.avro.Hero;   // Generated class
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/heroes")
public class AvroConsumerResource {

    @Channel("heroes")
    Multi<Hero> heroes;  // The hero class is generated from the schema.

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.APPLICATION_JSON)
    public Multi<Hero> stream() {
        return heroes;
    }


}

Quarkus 会自动查找序列化器和反序列化器并配置通道,所以同样:**无需配置**。但是,您仍然需要指示 Apicurio 注册模式。通常,这是一个手动操作,但对于开发,您可以使用以下属性:

kafka.apicurio.registry.auto-register=true

要运行此应用程序,请打开两个终端。在第一个终端中,导航到 avro-serde/avro-serde-publisher,然后运行 mvn quarkus:dev。在第二个终端中,导航到 avro-serde/avro-serde-consumer,然后运行 mvn quarkus:dev。然后,在浏览器中打开 https://:8080。与 JSON 变体一样,每 5 秒钟,就会显示一张新的英雄图片。这次 Kafka 记录是使用 Avro 序列化的。

编写自定义序列化器和反序列化器

当然,您仍然可以编写自己的自定义序列化器和反序列化器。如上所述,您需要实现 SerializerDeserializer 接口。

例如,HeroSerializer 类 包含了一种简单(但效率不高)的序列化英雄的方法:

package me.escoffier.quarkus.json.publisher;

import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;

public class HeroSerializer implements Serializer<Hero> {

    @Override
    public byte[] serialize(String topic, Hero data) {
        return (data.name() + "," + data.picture())
                .getBytes(StandardCharsets.UTF_8);
    }
}

HeroDeserializer 类 包含相应的反序列化部分。

和以前一样,Quarkus 会发现这些实现并为您配置通道。所以您无需配置任何内容。

自定义序列化器和反序列化器可以接收配置属性。它们会在 configure 方法中接收生产者/消费者的配置。

自定义序列化器和反序列化器不能是 CDI bean。Kafka 会直接使用反射来实例化它们。

结论

这篇博文探讨了使用 Kafka 序列化和反序列化消息的各种可能性,以及 Quarkus 如何减少您需要使用的样板代码和配置量。

那么,您应该使用什么呢?

  1. JSON 被广泛使用,但默认情况下缺乏结构验证,如果格式快速演变,可能会很快成为一个问题。

  2. Avro 提供了更好的性能,并处理验证和演进。但它需要一个模式注册表。如果您的系统与不断演变的结构交换大量消息,那么 Avro 应该更受青睐。此外,Avro 生成的有效负载更小。

  3. 如果您有 JSON 和 Avro 方法无法满足的严格要求,您可以开发自定义序列化器和反序列化器。

请注意,JSON 可以与 JSON-Schema 结合使用(模式存储在模式注册表中)。如果您更喜欢二进制格式,Protobuf 也是一个可能的替代方案。