编辑此页面

重复的上下文、上下文局部变量、异步处理和传播

在使用传统的、阻塞的和同步的框架时,每个请求的处理都在一个专用的线程中执行。因此,整个处理过程都使用同一个线程。 您知道在处理完成之前,该线程不会被用于执行任何其他操作。 当您需要在处理过程中传播数据时,例如安全主体或跟踪 ID,您可以使用 ThreadLocals。 传播的数据在处理完成后会被清除。

当使用反应式和异步执行模型时,您不能使用相同的机制。 为了避免使用大量的进程线程,减少资源使用(并提高应用程序的并发性),同一个线程可以被用来处理多个并发处理。 因此,您不能使用 ThreadLocals,因为这些值会在各种并发处理之间泄漏。

Vert.x 复制上下文 是一种构造,它为异步处理提供相同的传播方式。 它也可以用于同步代码。

本文档解释了复制上下文,如何检索、使用它,以及它如何在(异步)处理过程中传播。

反应式模型

本节不是对反应式模型的解释。 有关更多详细信息,请参阅Quarkus 反应式架构

在底层,Quarkus 使用反应式引擎。 此引擎提供了效率和并发性,以应对现代的、容器化的和云原生应用程序。

例如,当您使用 Quarkus REST(以前称为 RESTEasy Reactive)或 gRPC 时,Quarkus 可以在 I/O 线程上调用您的业务逻辑。 这些线程被称为事件循环,并实现多 Reactor 模式

当使用命令式模型时,Quarkus 将一个 worker 线程与每个处理单元(如 HTTP 请求或 gRPC 调用)关联。 在处理完成之前,该线程专用于此特定处理。 因此,您可以使用 *Thread Locals* 在处理过程中传播数据,并且在当前处理完成之前,没有其他处理单元会使用该线程。

使用反应式模型,代码在事件循环线程上运行。 这些事件循环执行多个并发处理单元。 例如,同一个事件循环可以处理多个并发 HTTP 请求。 下图说明了这种反应式执行模型

Continuation in the reactive execution model

绝对不能阻塞这些事件循环。 如果您这样做,整个模型将会崩溃。 因此,当 HTTP 请求的处理需要执行 I/O 操作(例如调用外部服务)时,它会

  1. 安排操作,

  2. 传递一个延续(I/O 完成时要调用的代码),

  3. 释放线程。

然后,该线程可以处理另一个并发请求。 当安排的操作完成时,它会在同一个事件循环上执行传递的延续。

该模型特别高效(线程数少)且性能良好(避免上下文切换)。 但是,它需要不同的开发模型,并且您不能使用 *Thread Locals*,因为并发请求会看到相同的值。 实际上,它们都由同一个线程处理:事件循环。

MicroProfile Context Propagation 规范解决了这个问题。 每当我们切换到另一个处理单元时,它会保存和恢复存储在线程局部变量中的值。 但是,该模型很昂贵。 上下文局部变量(也称为复制上下文)是另一种方法,并且需要更少的机制。

上下文和复制上下文

在 Quarkus 中,当您执行反应式代码时,您会在一个 *Context* 中运行,该上下文表示执行线程(事件循环或 worker 线程)。

@GET
@NonBlocking // Force the usage of the event loop
@Path("/hello1")
public String hello1() {
   Context context = Vertx.currentContext();
   return "Hello, you are running on context: %s and on thread %s".formatted(context, Thread.currentThread());  (1)
}

@GET
@Path("/hello2")
public String hello2() { // Called on a worker thread (because it has a blocking signature)
   Context context = Vertx.currentContext();
   return "Hello, you are running on context: %s and on thread %s".formatted(context, Thread.currentThread()); (2)
}
1 产生:Hello 1, you are running on context: io.vertx.core.impl.DuplicatedContext@5dc42d4f and on thread Thread[vert.x-eventloop-thread-1,5,main] - 因此在事件循环上调用。
2 产生:Hello 2, you are running on context: io.vertx.core.impl.DuplicatedContext@41781ccb and on thread Thread[executor-thread-1,5,main] - 因此在 worker 线程上调用。

使用此 *Context* 对象,您可以在同一个上下文中安排操作。 如上图所述,上下文对于在同一个事件循环上执行延续很有用(因为上下文附加到事件循环)。 例如,当您需要调用一些异步的东西时,您可以捕获当前上下文,当响应到达时,它会在该上下文中调用延续

public Uni<String> invoke() {
   Context context = Vertx.currentContext();
   return invokeRemoteService()
       .emitOn(runnable -> context.runOnContext(runnable)); (1)
}
1 在同一个上下文中发出结果。
大多数 Quarkus 客户端都会自动执行此操作,从而在适当的上下文中调用延续。

上下文有两个级别

  • 根上下文,表示事件循环,因此不能用于传播数据,否则会在并发处理之间泄漏数据

  • 复制的上下文,它基于根上下文,但不共享,并且表示处理单元

因此,复制的上下文与每个处理单元相关联。 复制的上下文仍然与根上下文相关联,并且使用复制的上下文安排操作会在关联的根上下文中运行它们。 但是,与根上下文不同,它们不在处理单元之间共享。 但是,处理单元的延续使用相同的复制上下文。 因此,在前面的代码片段中,延续不仅在同一个事件循环上调用,而且在同一个复制上下文上调用(假设捕获的上下文是一个复制的上下文,稍后会详细介绍)。

Continuation with duplicated contexts

上下文局部数据

当在复制的上下文中执行时,代码可以存储数据,而无需与其他并发处理共享它。 因此,您可以存储、检索和删除本地数据。 在同一个复制上下文中调用的延续将可以访问该数据

import io.smallrye.common.vertx.ContextLocals;

AtomicInteger counter = new AtomicInteger();

public Uni<String> invoke() {
   Context context = Vertx.currentContext();

   ContextLocals.put("message", "hello");
   ContextLocals.put("id", counter.incrementAndGet());

   return invokeRemoteService()
       .emitOn(runnable -> context.runOnContext(runnable))
       .map(res -> {
           // Can still access the context local data
           // `get(...)` returns an Optional
           String msg = ContextLocals.<String>get("message").orElseThrow();
           Integer id = ContextLocals.<Integer>get("id").orElseThrow();
           return "%s - %s - %d".formatted(res, msg, id);
       });
}
前面的代码片段使用 io.smallrye.common.vertx.ContextLocals,这简化了对本地数据的访问。 您还可以使用 Vertx.currentContext().getLocal("key") 访问它们。

上下文局部数据提供了一种在反应式执行期间传播对象的有效方式。 可以安全地存储和检索跟踪元数据、指标和会话。

上下文局部变量限制

但是,此功能只能与复制的上下文一起使用。 如上所述,对于代码来说是透明的。 复制的上下文是一个上下文,因此它们公开相同的 API。

在 Quarkus 中,对本地数据的访问仅限于复制的上下文。 如果您尝试从根上下文中访问本地数据,它会抛出 UnsupportedOperationException。 它可以防止访问在不同处理单元之间共享的数据。

java.lang.UnsupportedOperationException: Access to Context.putLocal(), Context.getLocal() and Context.removeLocal() are forbidden from a 'root' context  as it can leak data between unrelated processing. Make sure the method runs on a 'duplicated' (local) Context.

安全上下文

您可以将上下文标记为安全。 它的目的是供其他扩展集成,以帮助识别哪些上下文是隔离的,并保证由唯一线程访问。 Hibernate Reactive 使用此功能来检查当前上下文是否安全,可以存储当前打开的会话,以保护用户免于错误地交错多个可能无意中共享同一会话的反应式操作。

Vert.x Web 将为每个 HTTP Web 请求创建一个新的复制上下文; Quarkus REST 会将此类上下文标记为安全。 当它们设置一个新的安全上下文时,其他扩展应遵循类似的模式,该上下文可用于本地上下文,保证顺序使用、非并发访问,并限定为当前反应式链,以便不必显式传递“上下文”对象。

在其他情况下,显式地将当前上下文标记为不安全可能也很有用; 例如,如果需要在多个 worker 之间共享现有上下文以并行处理某些操作:通过适当地标记和取消标记,同一个上下文可以具有安全的跨度,然后是不安全的跨度。

要将上下文标记为安全,您可以

  1. 使用 io.quarkus.vertx.SafeVertxContext 注解

  2. 使用 io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle

通过使用 io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle 类,当前上下文可以显式地标记为 safe,也可以显式地标记为 unsafe; 还有第三种状态,它是任何新上下文的默认状态:unmarked。 默认情况下,任何未标记的上下文都被认为是 unsafe,除非系统属性 io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT 设置为 true

SafeVertxContext 注解将当前复制的上下文标记为安全,并在上下文为 unmarked 或已标记为 safe 时调用带注解的方法。 如果上下文标记为 unsafe,您可以使用 force=true 参数强制其为 safe。 但是,必须谨慎使用此可能性。

@SafeVertxContext 注解是一个 CDI 拦截器绑定注解。 因此,它仅适用于 CDI bean 和非私有方法。

支持复制上下文的扩展

通常,Quarkus 在复制的上下文中调用反应式代码。 因此,它可以安全地访问本地数据。 以下情况就是如此

  • Quarkus REST

  • gRPC

  • 反应式路由

  • Vert.x Event Bus @ConsumeEvent

  • REST 客户端

  • 反应式消息传递(Kafka、AMQP)

  • Funqy

  • Quarkus 调度程序和 Quartz

  • Redis 客户端(用于 pub/sub 命令)

  • GraphQL

区分根上下文和复制的上下文

您可以使用以下方法区分根上下文和复制的上下文

boolean isDuplicated = VertxContext.isDuplicatedContext(context);

此代码使用 io.smallrye.common.vertx.VertxContext 助手类。

复制上下文和映射诊断上下文 (MDC)

使用记录器时,MDC(添加到日志消息的上下文数据)在可用时存储在复制的上下文中。 有关更多详细信息,请查看日志记录参考指南

CDI 请求范围

在 Quarkus 中,CDI 请求范围存储在复制的上下文中,这意味着它会随着请求的反应式处理自动传播。

反应式消息传递

Kafka 和 AMQP 连接器为每条消息创建一个复制的上下文,实现消息上下文。 此消息上下文用于完整的消息处理,因此可用于传播数据。

有关更多信息,请参阅消息上下文文档。

OpenTelemetry

OpenTelemetry 扩展将跟踪存储在复制的上下文中,确保即使在使用反应式和异步代码时也能进行传播。

相关内容