将 Apache Kafka 与 Schema Registry 和 Avro 一起使用
本指南介绍您的 Quarkus 应用程序如何使用 Apache Kafka、Avro 序列化记录,并连接到模式注册表(例如 Confluent Schema Registry 或 Apicurio Registry)。
如果您不熟悉 Kafka,特别是 Quarkus 中的 Kafka,请先查阅使用 Apache Kafka 和 Reactive Messaging指南。
先决条件
要完成本指南,您需要
-
大约 30 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
Docker 和 Docker Compose 或 Podman,以及 Docker Compose
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
架构
在本指南中,我们将实现一个 REST 资源,即 MovieResource
,它将消耗电影 DTO 并将它们放入 Kafka 主题。
然后,我们将实现一个消费者,它将从同一个主题消费并收集消息。收集到的消息将通过另一个资源 ConsumedMovieResource
,使用 Server-Sent Events 进行公开。
电影将使用 Avro 进行序列化和反序列化。描述电影的模式存储在 Apicurio Registry 中。如果您使用的是 Confluent Avro serde 和 Confluent Schema Registry,概念也是一样的。
解决方案
我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或下载一个存档。
解决方案位于 kafka-avro-schema-quickstart
目录中。
创建 Maven 项目
首先,我们需要一个新项目。使用以下命令创建一个新项目
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数用双引号括起来,例如"-DprojectArtifactId=kafka-avro-schema-quickstart"
如果您使用 Confluent Schema Registry,则不需要 |
Avro 模式
Apache Avro 是一个数据序列化系统。数据结构使用模式进行描述。我们需要做的第一件事是创建描述 Movie
结构的模式。创建一个名为 src/main/avro/movie.avsc
的文件,其中包含我们记录(Kafka 消息)的模式。
{
"namespace": "org.acme.kafka.quarkus",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
如果您使用以下方式构建项目
quarkus build
./mvnw install
./gradlew build
movies.avsc
将被编译成一个 Movie.java
文件,位于 target/generated-sources/avsc
目录中。
请查看 Avro 规范,了解更多关于 Avro 语法和支持类型的信息。
使用 Quarkus,无需使用特定的 Maven 插件来处理 Avro 模式,这一切都由 quarkus-avro 扩展为您完成! |
如果您使用以下方式运行项目
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
您对模式文件的更改将自动应用于生成的 Java 文件。
Movie
生产者
在定义了模式之后,我们现在可以开始实现 MovieResource
。
让我们打开 MovieResource
,注入一个 Movie
DTO 的Emitter
,并实现一个 @POST
方法,该方法消耗 Movie
并通过 Emitter
发送它。
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
现在,我们需要将 movies
通道(Emitter
发送到此通道)映射到一个 Kafka 主题。为了实现这一点,请编辑 application.properties
文件,并添加以下内容:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
您可能已经注意到我们没有定义 如果您使用 Confluent Schema Registry,也不需要配置 |
Movie
消费者
因此,我们可以将包含我们 Movie
数据的记录写入 Kafka。这些数据使用 Avro 进行序列化。现在,是时候为它们实现一个消费者了。
让我们创建 ConsumedMovieResource
,它将从 movies-from-kafka
通道消费 Movie
消息,并通过 Server-Sent Events 将其公开。
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
应用程序代码的最后一部分是在 application.properties
中配置 movies-from-kafka
通道。
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
您可能已经注意到我们没有定义 如果您使用 Confluent Schema Registry,也不需要配置 |
运行应用程序
在开发模式下启动应用程序
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
由于开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参阅Kafka 的开发服务和Apicurio Registry 的开发服务。
您可能已经注意到我们没有在任何地方配置模式注册表 URL。这是因为 Apicurio Registry 的开发服务会自动配置 Quarkus Messaging 中的所有 Kafka 通道以使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 之外,还公开了一个与 Confluent Schema Registry API 兼容的端点。因此,此自动配置同时适用于 Apicurio Registry serde 和 Confluent Schema Registry serde。 但是,请注意,没有为运行 Confluent Schema Registry 本身提供开发服务支持。如果您想使用正在运行的 Confluent Schema Registry 实例,请配置其 URL 以及 Kafka 代理的 URL。
|
在第二个终端中使用 curl
查询 ConsumedMovieResource
资源。
curl -N https://:8080/consumed-movies
在第三个终端中,发布一些电影。
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
https://:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
https://:8080/movies
观察第二个终端中打印的内容。您应该看到类似以下内容:
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
在 JVM 或 Native 模式下运行
当不在开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。让它们运行起来最简单的方法是使用 docker-compose
来启动相应的容器。
如果您使用 Confluent Schema Registry,您已经有一个正在运行且已配置的 Kafka 代理和 Confluent Schema Registry 实例。您可以忽略这里的 docker-compose 说明以及 Apicurio Registry 配置。 |
在项目根目录中创建一个 docker-compose.yaml
文件,其中包含以下内容:
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
在启动应用程序之前,让我们先启动 Kafka 代理和 Apicurio Registry。
docker-compose up
要停止容器,请使用 docker-compose down 。您也可以使用 docker-compose rm 清理容器。 |
您可以使用以下方式构建应用程序:
quarkus build
./mvnw install
./gradlew build
并使用以下方式以 JVM 模式运行:
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到监听在 localhost:9092 的 Kafka 代理。您可以使用以下方式配置引导服务器: java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar |
在命令行中指定注册表 URL 不是非常方便,因此您可以为 prod
配置文件添加一个配置属性。
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=https://:8081/apis/registry/v2
您可以使用以下方式构建原生可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
并使用以下方式运行:
./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
测试应用程序
如上所述,Kafka 和 Apicurio Registry 的开发服务会在开发模式和测试模式下自动启动和配置 Kafka 代理和 Apicurio Registry 实例。因此,我们无需自行设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。
首先,让我们在构建文件中添加 REST Client 和 Awaitility 的测试依赖项。
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
在测试中,我们将循环发送电影,并检查 ConsumedMovieResource
是否返回了我们发送的内容。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
我们修改了与项目一起生成的 MovieResourceTest 。此测试类有一个子类 NativeMovieResourceIT ,它针对原生可执行文件运行相同的测试。要运行它,请执行: |
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手动设置
如果我们不能使用开发服务,并且想手动启动 Kafka 代理和 Apicurio Registry 实例,我们会定义一个 QuarkusTestResourceLifecycleManager。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
使用兼容版本的 Apicurio Registry
quarkus-apicurio-registry-avro
扩展依赖于 Apicurio Registry 客户端的最新版本,并且大多数 Apicurio Registry 服务器和客户端版本是向后兼容的。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。
例如,使用 Apicurio 开发服务,如果您将镜像名称设置为使用版本 2.1.5.Final
。
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
您需要确保 apicurio-registry-serdes-avro-serde
依赖项和 REST 客户端 apicurio-common-rest-client-vertx
依赖项设置为兼容的版本。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-avro")
implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
和 apicurio-common-rest-client-vertx
的已知先前兼容版本如下:
-
apicurio-registry-client
2.1.5.Final 与apicurio-common-rest-client-vertx
0.1.5.Final -
apicurio-registry-client
2.3.1.Final 与apicurio-common-rest-client-vertx
0.1.13.Final
使用 Confluent Schema Registry
如果您想使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-avro
扩展,而不是 quarkus-apicurio-registry-avro
扩展。此外,您还需要在 pom.xml
/ build.gradle
文件中添加一些依赖项和一个自定义 Maven 存储库。
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-avro")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-avro-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-avro-serializer
。在原生模式下,Quarkus 支持以下版本:6.2.x
、7.0.x
、7.1.x
、7.2.x
、7.3.x
。
对于版本 7.4.x
及更高版本,由于 Confluent Schema Serializer 的问题,您需要添加另一个依赖项。
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
对于任何其他版本,可能需要调整原生配置。
Avro 代码生成详情
在本指南中,我们使用了 Quarkus 代码生成机制从 Avro 模式生成 Java 文件。
在后台,该机制使用 org.apache.avro:avro-compiler
。
您可以使用以下配置属性来改变它的工作方式:
-
avro.codegen.[avsc|avdl|avpr].imports
- 应首先编译的文件或目录列表,从而可以被后续编译的模式导入。请注意,导入的文件不应相互引用。所有路径都应相对于src/[main|test]/avro
目录,或构建系统配置的任何源目录中的avro
子目录。作为逗号分隔的列表传递。 -
avro.codegen.stringType
- 用于 Avro 字符串的 Java 类型。可以是CharSequence
、String
或Utf8
之一。默认为String
。 -
avro.codegen.createOptionalGetters
- 启用生成返回请求类型 Optional 的getOptional…
方法。默认为false
。 -
avro.codegen.enableDecimalLogicalType
- 确定是否为 decimal 类型使用 Java 类,默认为false
。 -
avro.codegen.createSetters
- 确定是否为记录字段创建 setter。默认为false
。 -
avro.codegen.gettersReturnOptional
- 启用生成返回请求类型 Optional 的get…
方法。默认为false
。 -
avro.codegen.optionalGettersForNullableFieldsOnly
,与gettersReturnOptional
选项结合使用。如果设置了此选项,则仅为可为空的字段生成 Optional getter。如果字段是强制性的,则会生成常规 getter。默认为false
。
进一步阅读
-
如何使用 Quarkus 进行 Kafka、Schema Registry 和 Avro - 本指南基于一篇博客文章。它对 Avro 和模式注册表的概念进行了很好的介绍。