在 Quarkus 应用程序中开始使用 AMQP

AMQP 1.0 是应用程序或组织之间传递消息的标准。它连接系统,为业务流程提供所需信息,并可靠地处理系统间的通信。AMQP 是一种健壮且成熟的协议,广泛用于事件驱动型应用。

本文相当于 Kafka 入门文章,但侧重于 AMQP 的使用。你将学习如何在十个步骤内开始在 Quarkus 应用中使用 AMQP。我们将使用 SmallRye Reactive Messaging——一种声明式构建事件驱动型微服务的方法。

完整的代码可在 GitHub 上找到。

第一步 - 生成你的项目

让我们从头开始,获取具有正确依赖关系的新项目结构。访问 https://code.quarkus.io,输入你的 group id 和 artifact id。然后在扩展列表中选择

  • SmallRye Reactive Messaging - AMQP Connector

  • RESTEasy Jackson

getting started amqp code

你可以禁用“示例代码”,以避免生成的项目中包含示例。

然后,点击生成你的应用程序,将项目下载为 zip 文件,解压,然后在你喜欢的 IDE 中加载它。

如果你打开生成的 pom.xml,你会看到声明了 quarkus-smallrye-reactive-messaging-amqpquarkus-resteasy-jackson 依赖关系,所以我们已经准备好编写一些代码了。

第二步 - 我们将要交换什么?

我们需要一些东西来交换。不加太多创意,让我们说我们将发送和接收 Movie 对象。在你的项目中,创建 org.acme.Movie 类,内容如下:

package org.acme;

public class Movie {

    public String title;
    public int year;

}

通过 AMQP,我们交换 消息,消息可以有多个数据部分(或多个 AMQP 序列,或单个 AMQP 值部分)。在我们的应用程序中,由于我们交换的是 Movie 对象,它将实例编码为 JSON,并在单个数据部分中传输。content-type 标头设置为 application/json

AMQP 消息被发送到一个目标。为了保持简单,我们称之为*movies*。

第三步 - 配置应用程序

如上所述,我们将使用 Reactive Messaging。当你使用 Reactive Messaging 时,你向一个通道发送消息,并从另一个通道接收消息。这些通道通过配置映射到底层的消息传递技术。我们必须在应用程序中指示我们的接收和发布通道将使用*movies*地址。在 src/main/resources/application.properties 中,添加以下内容:

# The AMQP broker location and credentials
amqp-host=localhost
amqp-port=5672
amqp-username=quarkus
amqp-password=quarkus

# Configuring the incoming channel (reading from AMQP)
mp.messaging.incoming.movies-in.connector=smallrye-amqp
mp.messaging.incoming.movies-in.address=movies

# Configuring the outgoing channel (writing to AMQP)
mp.messaging.outgoing.movies-out.connector=smallrye-amqp
mp.messaging.outgoing.movies-out.address=movies

在配置了代理位置和凭据(amqp- 属性)之后,我们配置了两个通道:movies-in(接收记录)和 movies-out(发布记录)。

我们使用 mp.messaging.incoming.movies-in 前缀来配置通道。connector 属性表明谁负责该通道,这里是 AMQP 连接器。我们还需要使用 address 属性指定要消耗的目标。

要配置出站 movies-out 通道,我们使用 mp.messaging.outgoing.movies-out 前缀。除了指明谁负责该通道外,我们还需要配置地址。

第四步 - 将电影发布到 AMQP

现在,是时候发送消息了。创建 org.acme.MovieProducer 类,内容如下:

package org.acme;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MovieProducer {

    @Inject
    @Channel("movies-out")
    Emitter<Movie> emitter;

    public void send(Movie movie) {
        emitter.send(movie);
    }
}

在这个类中,我们注入了一个 Emitter,它是一个负责向通道发送消息的对象。这个发射器附加到 movies-out 通道(并将消息发送到 AMQP)。连接器自动将内容编码为 JSON 并设置了 content-type 标头。

你需要确保你的负载可以编码为 JSON。

因此,我们应用程序的其余部分可以使用 send 方法将电影发送到我们的 AMQP 目标。

第五步 - 消费电影

现在,让我们来看另一方面,从 AMQP 中检索电影。

package org.acme;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private final Logger logger = Logger.getLogger(MovieConsumer.class);

    @Incoming("movies-in")
    public void receive(Movie movie) {
        logger.infof("Got a movie: %d - %s", movie.year, movie.title);
    }
}

这里,我们使用 @Incoming 注解来指示 Quarkus 在收到每条记录时调用 receive 方法。

请记住,电影被编码为 JSON,所以我们需要帮助连接器从接收到的 JSON 中生成一个 Movie 对象。

创建 org.acme.JsonToObjectConverter 类,内容如下:

package org.acme;

import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.vertx.core.json.JsonObject;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;

@ApplicationScoped
public class JsonToObjectConverter implements MessageConverter {

    @Override
    public boolean canConvert(Message<?> in, Type target) {
        return in.getMetadata(IncomingAmqpMetadata.class)
                .map(meta -> meta.getContentType().equals("application/json")  && target instanceof Class)
                .orElse(false);

    }

    @Override
    public Message<?> convert(Message<?> in, Type target) {
        return in.withPayload(((JsonObject) in.getPayload()).mapTo((Class<?>) target));
    }
}

这个类是一个转换器。它将 Message 的内容映射到另一个类型。在 canConvert 方法中,我们验证传入消息是否来自 AMQP(因此包含 IncomingAmqpMetadata 元数据),并且 content-type 设置为 application/jsonconvert 方法将接收到的 JsonObject 映射到目标类型(在本例中为 Movie)。

有了这个转换器,我们的 consume 方法将接收 Movie 对象。

第六步 - 从 REST 端点发送电影

从 REST 端点发送消息到 AMQP 是很常见的。要做到这一点,创建 org.acme.MovieResource 类,内容如下:

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    @Inject
    MovieProducer producer;

    @POST
    public Response send(Movie movie) {
        producer.send(movie);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

这个类使用了我们上面实现的 MovieProducer 来发送 movies。你也可以直接使用 Emitter

第七步 - 运行起来!

首先,我们需要一个 AMQP 代理,例如 Apache ActiveMQ Artemis。你可以遵循 Artemis 入门文档,或者使用以下 docker-compose.yaml 文件:

version: '2'

services:

  artemis:
    image: vromero/activemq-artemis:2-alpine-latest
    ports:
      - "5672:5672"
      - "8161:8161"
      - "61616:61616"
    environment:
      ARTEMIS_USERNAME: quarkus
      ARTEMIS_PASSWORD: quarkus

docker-compose.yaml 文件复制到你的项目中,然后在终端中运行你的代理:`docker-compose up -d`

然后,使用以下命令运行应用程序:

./mvnw quarkus:dev

应用程序以开发模式运行,这意味着你仍然可以更新代码。它会自动重新加载。

在另一个终端中,发出几个 HTTP POST 请求,例如:

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
https://:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
https://:8080/

在运行应用程序的终端中,你将看到:

...
2021-01-27 09:29:41,087 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-27 09:29:41,114 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...

它奏效了!

第八步 - 本地打包

如果你已经 安装并正确配置了 GraalVM,你可以将此应用程序打包为本地可执行文件:

./mvnw package -Pnative

然后,使用 `./target/getting-started-amqp-1.0.0-SNAPSHOT-runner` 执行你的本地可执行文件,你将得到一个使用 AMQP 的 Quarkus 应用程序,在几毫秒内启动,并且内存消耗极低:在摄入了 100 条记录后仅为 33MB!

$ rss getting-started-amqp-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
54986 33M ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner

总结

在不到 10 分钟的时间内,我们就拥有了一个使用 AMQP 的新 Quarkus 应用程序。如果你想更进一步,请查看 AMQP 指南