RESTEasy Reactive - 阻塞还是不阻塞
2021 年 1 月,Quarkus 团队宣布了 REEsteasy Reactive,这是一种在 Quarkus 中提供 HTTP API 的新方法。自推出以来,REEsteasy Reactive 的采用率一直相当不错,我们计划很快将其设为实现 HTTP API 的默认方法。
但是,等等,这对我的命令式 API 意味着什么?我现在需要学习响应式编程才能使用 Quarkus 吗?让我们说清楚:不。这篇博文将探讨我们在 REEsteasy Reactive 中进行的一些更改,以使过渡平稳透明。
Quarkus 中 HTTP API 的简史
Quarkus 自问世以来就能够提供 HTTP API。引入 REEsteasy 是 Quarkus 首个 Beta 版的一个重要里程碑。使用 REEsteasy classic,您可以使用众所周知的 JAX-RS 注释(如 @GET
、@Path
、@POST
等)来开发 HTTP API。以下代码片段展示了一个简短的“hello world”示例。
package org.acme;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/hello")
public class GreetingResource {
@GET
public String hello() {
return "Hello";
}
}
REEsteasy classic 在与 HTTP 请求关联的工作线程上调用 HTTP 端点(先前代码片段中的 hello
方法)。这是一个易于理解的模型。但是,依赖工作线程会引入并发限制:线程数。
即使 Quarkus 核心融入了响应式,REEsteasy classic 也保留了这种分发策略。它正在分裂 Quarkus 生态系统。一方面,我们有使用 REEsteasy classic、Hibernate ORM 等的命令式阵营。另一方面,我们有使用 Reactive Routes、Vert.x API 和其他响应式扩展的响应式阵营。两者在底层都使用了 Quarkus 的响应式引擎,但响应式阵营使用它的方式更有效率。
在 Quarkus 1.11 中,我们遵循命令式和响应式思想的统一,引入了 REEsteasy reactive,这是基于 Quarkus 响应式架构的 JAX-RS 模型的一个新实现。它提供了类似的开发模型和更好的吞吐量。我不会详细介绍 REEsteasy reactive 的架构和优点。Georgios 在两篇博文中已经涵盖了它们:《REEsteasy Reactive 简介》和《无痛获得海量性能》。
从用户的角度来看,REEsteasy classic 和 reactive 的主要区别在于它们调用 HTTP 端点方法的方式:
-
classic - 始终在工作线程上,
-
reactive - 在 I/O 线程或工作线程上(由您作为开发人员选择)。
您可能想知道为什么这如此重要。线程成本很高,尤其是在容器或云等资源有限的环境中。使用 I/O 线程可以避免创建额外的线程(改善内存消耗)并避免上下文切换(提高响应时间)。Emmanuel 在《一个 I/O 线程和一个工作线程走进酒吧:一个微基准测试故事》博文中解释了这些优点。
阻塞还是不阻塞,这就是问题所在。
当我们引入 REEsteasy reactive 时,我们决定默认使用非阻塞方法:如果没有另行说明,它会在 I/O 线程上调用 HTTP 端点方法。这个模型由于使用了 @Blocking
注释,因此取得了出色的性能,并且足够简单。
在过去的几个月里,REEsteasy reactive 的采用令人难以置信!我们收到了许多问题和明显的错误报告。核心问题是 Hibernate ORM 的使用。
由于 Hibernate ORM classic(我们也有 Hibernate reactive)是阻塞的,您无法在不使用 @Blocking
注释的情况下将其与 REEsteasy reactive 一起使用。此注释会更改分发策略,使其使用工作线程(而不是 I/O 线程)。
虽然对我们来说,由此产生的模型看起来高效且直接,但不知情的用户却看到了很多
You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.
错误消息很明确。但当我们的终端中打印出如此多的文本时,我们很少会感到高兴。
您可能会说……“好吧,那就默认阻塞吧。”没那么简单。在工作线程上调用期望在 I/O 线程上调用的响应式 API,与在 I/O 线程上调用阻塞 API 一样危险。
新世界,新规则!
在 Quarkus 2.2.0 中,我们引入了一种基于方法签名的新的分发策略。Quarkus 的构建时方法使我们能够明智地推断出应在构建时在 I/O 线程或工作线程上调用哪个方法,从而减少运行时开销。
下表总结了新的规则集:
方法签名 | 分发策略 |
---|---|
|
工作线程 |
|
I/O 线程 |
|
I/O 线程 |
|
I/O 线程 |
|
I/O 线程 |
|
工作线程 |
基本上:同步方法默认在工作线程上,异步方法默认在 I/O 线程上,除非另有明确说明。当然,您可以使用 @Blocking
和 @NonBlocking
注释覆盖行为。@Transactional
注释是默认规则的一个例外,因为它通常意味着您正在访问阻塞资源(例如实体管理器)。
这对您有什么改变?
让我们通过一些示例来讨论这一新策略如何改进用户体验,同时又不限制效率和灵活性。
Hello REEsteasy Reactive
使用 REEsteasy reactive 不会改变上面的“hello”示例。
package org.acme;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/hello")
public class GreetingResource {
@GET
public String hello() {
return "Hello";
}
}
由于该方法具有同步签名,因此它在工作线程上调用。之前(在 Quarkus 2.2 之前),使用 REEsteasy reactive 时,它会在 I/O 线程上调用。要切换回该行为,请添加 @NonBlocking
。
package org.acme;
import io.smallrye.common.annotation.NonBlocking;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/hello")
public class GreetingResource {
@GET
@NonBlocking
public String hello() {
return "Hello";
}
}
或者,您可以返回一个 Uni
。
package org.acme;
import io.smallrye.mutiny.Uni;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/hello")
public class GreetingResource {
@GET
public Uni<String> hello() {
return Uni.createFrom().item("Hello");
}
}
与 Hibernate ORM 集成
根据用户反馈,让我们设想您想将 Hibernate classic 与 REEsteasy reactive 一起使用:
package org.acme;
import org.jboss.resteasy.reactive.RestQuery;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/fruit")
public class FruitResource {
@GET
public Fruit getFruit(@RestQuery String name) {
return Fruit.find("name", name).firstResult();
}
}
由于签名是同步的,因此您无需使用 @Blocking
。不再有大段文字!
与 Hibernate Reactive 集成
如果您使用 Hibernate reactive,您将使用 Mutiny API,因此生成的代码将是:
package org.acme;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.RestQuery;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/fruit")
public class FruitResource {
@GET
public Uni<Fruit> getFruit(@RestQuery String name) {
return Fruit.find("name", name).firstResult();
}
}
此方法在 I/O 线程上运行,这正是 Hibernate reactive 所期望的。
与 Kafka 集成
如果您组合使用 HTTP 和 Kafka(使用响应式消息),您将使用发射器。根据发射器类型(Emitter
或 MutinyEmitter
),send
方法返回 CompletionStage
或 Uni
。因此,以下 HTTP 方法在 I/O 线程上运行:
package org.acme;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/fruit")
public class FruitResource {
@Channel("kafka")
MutinyEmitter<Fruit> emitter;
@POST
public Uni<Void> writeToKafka(Fruit fruit) {
return emitter.send(fruit);
}
}
如果您将其更改为同步签名,它将在工作线程上运行:
package org.acme;
import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import java.time.Duration;
@Path("/fruit")
public class FruitResource {
@Channel("kafka")
MutinyEmitter<Fruit> emitter;
@POST
public void writeToKafka(Fruit fruit) {
System.out.println(Thread.currentThread().getName());
emitter.send(fruit).await().atMost(Duration.ofSeconds(5));
}
}
组合 REEsteasy Reactive、Hibernate ORM 和 Kafka
现在让我们组合 Resteasy reactive、Hibernate ORM classic 和 Kafka 来持久化一个实体并将其写入 Kafka 主题:
package org.acme;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;
import javax.transaction.Transactional;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/fruit")
public class FruitResource {
@Channel("kafka")
MutinyEmitter<Fruit> emitter;
@POST
@Transactional
public Uni<Void> persistAndWriteToKafka(Fruit fruit) {
System.out.println(Thread.currentThread().getName());
fruit.persist();
return emitter.send(fruit);
}
}
尽管有签名,此方法仍在工作线程上运行。@Transactional
注释会配置分发策略以使用工作线程。