编辑此页面

Quarkus 中的上下文传播

传统的阻塞代码使用 ThreadLocal 变量来存储上下文对象,以避免在各处将它们作为参数传递。许多 Quarkus 扩展需要这些上下文对象才能正常运行:例如 Quarkus REST (以前称为 RESTEasy Reactive)ArCTransaction

如果您编写响应式/异步代码,则必须将您的工作分解为代码块的流水线,这些代码块将在“稍后”执行,实际上是在您定义它们的函数返回之后。因此,try/finally 块以及 ThreadLocal 变量将停止工作,因为您的响应式代码在调用方运行其 finally 块之后在另一个线程中执行。

SmallRye Context PropagationMicroProfile Context Propagation 的实现,旨在使这些 Quarkus 扩展在响应式/异步设置中正常工作。它的工作原理是捕获曾经位于线程局部变量中的那些上下文值,并在调用您的代码时恢复它们。

解决方案

我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。

克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git,或下载 归档文件

该解决方案位于 context-propagation-quickstart 目录中。

设置

如果您正在使用 Mutiny (quarkus-mutiny 扩展),您只需要添加 quarkus-smallrye-context-propagation 扩展即可启用上下文传播。

换句话说,将以下依赖项添加到您的构建文件中

pom.xml
<!-- Quarkus REST extension if not already included -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
build.gradle
// Quarkus REST extension if not already included
implementation("io.quarkus:quarkus-rest")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")

这样,如果您使用 ArC、Quarkus REST 和事务,您将获得它们的上下文传播。

Mutiny 的使用示例

Mutiny

本节使用 Mutiny 响应式类型。如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的响应式编程库

让我们编写一个 REST 端点,该端点从 Kafka 主题读取接下来的 3 个项目,使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再将它们返回给客户端,您可以这样做

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @Transactional
    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3)
                // The items are received from the event loop, so cannot use Hibernate ORM (classic)
                // Switch to a worker thread, the transaction will be propagated
                .emitOn(Infrastructure.getDefaultExecutor())
                .map(price -> {
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.value = price;
                    // here we are all in the same transaction
                    // thanks to context propagation
                    priceEntity.persist();
                    return price;
                    // the transaction is committed once the stream completes
                });
    }

请注意,由于 Mutiny 对上下文传播的支持,这开箱即用。这 3 个项目使用相同的事务持久化,并且该事务在流完成时提交。

CompletionStage 的使用示例

如果您正在使用 CompletionStage,则需要手动上下文传播。您可以通过注入一个将传播每个上下文的 ThreadContextManagedExecutor 来实现。例如,这里我们使用 Vert.x Web Client 获取星球大战人员的列表,然后使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再使用 Jackson 或 JSON-B 将它们作为 JSON 返回给客户端

    @Inject ThreadContext threadContext;
    @Inject ManagedExecutor managedExecutor;
    @Inject Vertx vertx;

    @Transactional
    @GET
    @Path("/people")
    public CompletionStage<List<Person>> people() throws SystemException {
        // Create a REST client to the Star Wars API
        WebClient client = WebClient.create(vertx,
                         new WebClientOptions()
                          .setDefaultHost("swapi.tech")
                          .setDefaultPort(443)
                          .setSsl(true));
        // get the list of Star Wars people, with context capture
        return threadContext.withContextCapture(client.get("/api/people/").send())
                .thenApplyAsync(response -> {
                    JsonObject json = response.bodyAsJsonObject();
                    List<Person> persons = new ArrayList<>(json.getInteger("count"));
                    // Store them in the DB
                    // Note that we're still in the same transaction as the outer method
                    for (Object element : json.getJsonArray("results")) {
                        Person person = new Person();
                        person.name = ((JsonObject) element).getString("name");
                        person.persist();
                        persons.add(person);
                    }
                    return persons;
                }, managedExecutor);
    }

使用 ThreadContextManagedExecutor,您可以包装大多数有用的函数类型和 CompletionStage,以便获得传播的上下文。

注入的 ManagedExecutor 使用 Quarkus 线程池。

覆盖要传播的上下文

默认情况下,将传播所有可用的上下文。但是,您可以通过几种方式覆盖此行为。

使用配置

以下配置属性允许您指定传播上下文的默认集合

配置属性 描述 默认值

mp.context.ThreadContext.propagated

以逗号分隔的传播上下文集

Remaining(所有非显式列表上下文)

mp.context.ThreadContext.cleared

以逗号分隔的已清除上下文集

None(没有上下文),除非传播集和清除集都不包含 Remaining,在这种情况下,默认值为 Remaining(所有非显式列出的上下文)

mp.context.ThreadContext.unchanged

以逗号分隔的未更改上下文集

None(没有上下文)

以下上下文在 Quarkus 中可用,可以是开箱即用的,也可以取决于您是否包含它们的扩展

上下文名称 名称常量 描述

ThreadContext.NONE

可用于指定一个空的上下文集,但将该值设置为空也可以

Remaining

ThreadContext.ALL_REMAINING

所有未在其他集中显式列出的上下文

Transaction

ThreadContext.TRANSACTION

JTA 事务上下文

CDI

ThreadContext.CDI

CDI (ArC) 上下文

Servlet

N/A

Servlet 上下文

Jakarta REST

N/A

Quarkus REST 或 RESTEasy Classic 上下文

应用程序

ThreadContext.APPLICATION

当前的 ThreadContextClassLoader

使用注解覆盖传播的上下文

为了在特定方法中覆盖自动上下文传播(例如 Mutiny 使用的传播),您可以使用 @CurrentThreadContext 注解

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    // Get rid of all context propagation, since we don't need it here
    @CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3);
    }

使用 CDI 注入覆盖传播的上下文

您还可以使用注入点的 @ThreadContextConfig 注解注入一个自定义构建的 ThreadContext

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;
    // Get a ThreadContext that doesn't propagate context
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    SmallRyeThreadContext threadContext;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // Get rid of all context propagation, since we don't need it here
        try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
            // get the next three prices from the price stream
            return Multi.createFrom().publisher(prices)
                    .select().first(3);
        }
    }

同样,有一种类似的方法可以使用 @ManagedExecutorConfig 注解注入一个配置的 ManagedExecutor 实例

    // Custom ManagedExecutor with different async limit, queue and no propagation
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    ManagedExecutor configuredCustomExecutor;

共享 ManagedExecutor 和 ThreadContext 的配置 CDI 实例

如果您需要将同一个 ManagedExecutorThreadContext 注入到多个地方并共享其容量,您可以使用 @NamedInstance 注解命名该实例。@NamedInstance 是一个 CDI 限定符,因此相同类型和名称的所有注入将共享同一个底层实例。如果您还需要自定义您的实例,您可以在它的一个注入点上使用 @ManagedExecutorConfig/ThreadContextConfig 注解来实现

    // Custom configured ManagedExecutor with name
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    @NamedInstance("myExecutor")
    ManagedExecutor sharedConfiguredExecutor;

    // Since this executor has the same name, it will be the same instance as above
    @Inject
    @NamedInstance("myExecutor")
    ManagedExecutor sameExecutor;

    // Custom ThreadContext with a name
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    @NamedInstance("myContext")
    ThreadContext sharedConfiguredThreadContext;

    // Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
    @Inject
    @NamedInstance("myContext")
    ThreadContext sameContext;

CDI 的上下文传播

就 CDI 而言,@RequestScoped@ApplicationScoped@Singleton bean 会被传播,并且在其他线程中可用。@Dependent bean 以及任何自定义作用域的 bean 无法通过 CDI 上下文传播自动传播。

@ApplicationScoped@Singleton bean 始终是活动作用域,因此很容易处理 - 只要 CDI 容器正在运行,上下文传播任务就可以使用这些 bean。但是,@RequestScoped bean 的情况不同。它们仅在短时间内处于活动状态,该时间段可以绑定到 HTTP 请求或其他请求/任务,并在手动激活/停用时绑定。在这种情况下,用户必须知道,一旦原始线程到达请求的末尾,它将终止上下文,调用这些 bean 上的 @PreDestroy,然后从上下文中清除它们。随后尝试从其他线程访问这些 bean 可能会导致意外行为。因此,建议确保通过上下文传播使用请求作用域 bean 的所有任务都以这样一种方式执行,即它们不会超出原始请求持续时间。

由于上述行为,建议在使用 CDI 中的上下文传播时避免在 @RequestScoped bean 上使用 @PreDestroy