Quarkus 中的上下文传播
传统的阻塞代码使用 ThreadLocal
变量来存储上下文对象,以避免在各处将它们作为参数传递。许多 Quarkus 扩展需要这些上下文对象才能正常运行:例如 Quarkus REST (以前称为 RESTEasy Reactive)、ArC 和 Transaction。
如果您编写响应式/异步代码,则必须将您的工作分解为代码块的流水线,这些代码块将在“稍后”执行,实际上是在您定义它们的函数返回之后。因此,try/finally
块以及 ThreadLocal
变量将停止工作,因为您的响应式代码在调用方运行其 finally
块之后在另一个线程中执行。
SmallRye Context Propagation 是 MicroProfile Context Propagation 的实现,旨在使这些 Quarkus 扩展在响应式/异步设置中正常工作。它的工作原理是捕获曾经位于线程局部变量中的那些上下文值,并在调用您的代码时恢复它们。
解决方案
我们建议您按照以下章节中的说明,逐步创建应用程序。但是,您可以直接转到完整的示例。
克隆 Git 存储库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或下载 归档文件。
该解决方案位于 context-propagation-quickstart
目录中。
设置
如果您正在使用 Mutiny (quarkus-mutiny
扩展),您只需要添加 quarkus-smallrye-context-propagation
扩展即可启用上下文传播。
换句话说,将以下依赖项添加到您的构建文件中
<!-- Quarkus REST extension if not already included -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
// Quarkus REST extension if not already included
implementation("io.quarkus:quarkus-rest")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")
这样,如果您使用 ArC、Quarkus REST 和事务,您将获得它们的上下文传播。
Mutiny 的使用示例
Mutiny
本节使用 Mutiny 响应式类型。如果您不熟悉 Mutiny,请查看 Mutiny - 一个直观的响应式编程库。 |
让我们编写一个 REST 端点,该端点从 Kafka 主题读取接下来的 3 个项目,使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再将它们返回给客户端,您可以这样做
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@Transactional
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3)
// The items are received from the event loop, so cannot use Hibernate ORM (classic)
// Switch to a worker thread, the transaction will be propagated
.emitOn(Infrastructure.getDefaultExecutor())
.map(price -> {
// store each price before we send them
Price priceEntity = new Price();
priceEntity.value = price;
// here we are all in the same transaction
// thanks to context propagation
priceEntity.persist();
return price;
// the transaction is committed once the stream completes
});
}
请注意,由于 Mutiny 对上下文传播的支持,这开箱即用。这 3 个项目使用相同的事务持久化,并且该事务在流完成时提交。
CompletionStage
的使用示例
如果您正在使用 CompletionStage
,则需要手动上下文传播。您可以通过注入一个将传播每个上下文的 ThreadContext
或 ManagedExecutor
来实现。例如,这里我们使用 Vert.x Web Client 获取星球大战人员的列表,然后使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再使用 Jackson 或 JSON-B 将它们作为 JSON 返回给客户端
@Inject ThreadContext threadContext;
@Inject ManagedExecutor managedExecutor;
@Inject Vertx vertx;
@Transactional
@GET
@Path("/people")
public CompletionStage<List<Person>> people() throws SystemException {
// Create a REST client to the Star Wars API
WebClient client = WebClient.create(vertx,
new WebClientOptions()
.setDefaultHost("swapi.tech")
.setDefaultPort(443)
.setSsl(true));
// get the list of Star Wars people, with context capture
return threadContext.withContextCapture(client.get("/api/people/").send())
.thenApplyAsync(response -> {
JsonObject json = response.bodyAsJsonObject();
List<Person> persons = new ArrayList<>(json.getInteger("count"));
// Store them in the DB
// Note that we're still in the same transaction as the outer method
for (Object element : json.getJsonArray("results")) {
Person person = new Person();
person.name = ((JsonObject) element).getString("name");
person.persist();
persons.add(person);
}
return persons;
}, managedExecutor);
}
使用 ThreadContext
或 ManagedExecutor
,您可以包装大多数有用的函数类型和 CompletionStage
,以便获得传播的上下文。
注入的 |
覆盖要传播的上下文
默认情况下,将传播所有可用的上下文。但是,您可以通过几种方式覆盖此行为。
使用配置
以下配置属性允许您指定传播上下文的默认集合
配置属性 | 描述 | 默认值 |
---|---|---|
|
以逗号分隔的传播上下文集 |
|
|
以逗号分隔的已清除上下文集 |
|
|
以逗号分隔的未更改上下文集 |
|
以下上下文在 Quarkus 中可用,可以是开箱即用的,也可以取决于您是否包含它们的扩展
上下文名称 | 名称常量 | 描述 |
---|---|---|
|
可用于指定一个空的上下文集,但将该值设置为空也可以 |
|
|
所有未在其他集中显式列出的上下文 |
|
|
JTA 事务上下文 |
|
|
CDI (ArC) 上下文 |
|
|
N/A |
Servlet 上下文 |
|
N/A |
Quarkus REST 或 RESTEasy Classic 上下文 |
|
当前的 |
使用注解覆盖传播的上下文
为了在特定方法中覆盖自动上下文传播(例如 Mutiny 使用的传播),您可以使用 @CurrentThreadContext
注解
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
// Get rid of all context propagation, since we don't need it here
@CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
使用 CDI 注入覆盖传播的上下文
您还可以使用注入点的 @ThreadContextConfig
注解注入一个自定义构建的 ThreadContext
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
// Get a ThreadContext that doesn't propagate context
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
SmallRyeThreadContext threadContext;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// Get rid of all context propagation, since we don't need it here
try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
}
同样,有一种类似的方法可以使用 @ManagedExecutorConfig
注解注入一个配置的 ManagedExecutor
实例
// Custom ManagedExecutor with different async limit, queue and no propagation
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
ManagedExecutor configuredCustomExecutor;
共享 ManagedExecutor 和 ThreadContext 的配置 CDI 实例
如果您需要将同一个 ManagedExecutor
或 ThreadContext
注入到多个地方并共享其容量,您可以使用 @NamedInstance
注解命名该实例。@NamedInstance
是一个 CDI 限定符,因此相同类型和名称的所有注入将共享同一个底层实例。如果您还需要自定义您的实例,您可以在它的一个注入点上使用 @ManagedExecutorConfig
/ThreadContextConfig
注解来实现
// Custom configured ManagedExecutor with name
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
@NamedInstance("myExecutor")
ManagedExecutor sharedConfiguredExecutor;
// Since this executor has the same name, it will be the same instance as above
@Inject
@NamedInstance("myExecutor")
ManagedExecutor sameExecutor;
// Custom ThreadContext with a name
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
@NamedInstance("myContext")
ThreadContext sharedConfiguredThreadContext;
// Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
@Inject
@NamedInstance("myContext")
ThreadContext sameContext;
CDI 的上下文传播
就 CDI 而言,@RequestScoped
、@ApplicationScoped
和 @Singleton
bean 会被传播,并且在其他线程中可用。@Dependent
bean 以及任何自定义作用域的 bean 无法通过 CDI 上下文传播自动传播。
@ApplicationScoped
和 @Singleton
bean 始终是活动作用域,因此很容易处理 - 只要 CDI 容器正在运行,上下文传播任务就可以使用这些 bean。但是,@RequestScoped
bean 的情况不同。它们仅在短时间内处于活动状态,该时间段可以绑定到 HTTP 请求或其他请求/任务,并在手动激活/停用时绑定。在这种情况下,用户必须知道,一旦原始线程到达请求的末尾,它将终止上下文,调用这些 bean 上的 @PreDestroy
,然后从上下文中清除它们。随后尝试从其他线程访问这些 bean 可能会导致意外行为。因此,建议确保通过上下文传播使用请求作用域 bean 的所有任务都以这样一种方式执行,即它们不会超出原始请求持续时间。
由于上述行为,建议在使用 CDI 中的上下文传播时避免在 |