Quarkus 虚拟线程支持 gRPC 服务
本指南将介绍如何在实现 gRPC 服务时利用 Java 虚拟线程。
本指南着重于在 gRPC 扩展中使用虚拟线程。请参阅 使用 Quarkus 虚拟线程支持编写更简单的响应式 REST 服务,以了解更多关于 Java 虚拟线程和 Quarkus 虚拟线程支持的通用信息。 |
默认情况下,Quarkus gRPC 扩展在事件循环线程上调用服务方法。有关此主题的更多详细信息,请参阅 Quarkus 响应式架构文档。但是,您也可以使用 @Blocking 注释来指示服务是阻塞的,并且应该在工作线程上运行。
Quarkus gRPC 服务虚拟线程支持背后的理念是将服务方法调用卸载到虚拟线程上,而不是在事件循环线程或工作线程上运行。
要为服务方法启用虚拟线程支持,只需将 @RunOnVirtualThread 注释添加到方法上。如果 JDK 兼容(Java 19 或更高版本 - 我们推荐 21+),则调用将被卸载到一个新的虚拟线程。这样,就可以执行阻塞操作,而不会阻塞虚拟线程所挂载的平台线程。
配置 gRPC 服务以使用虚拟线程
让我们通过一个使用虚拟线程实现 gRPC 服务的示例。首先,请确保您的构建文件中包含 gRPC 扩展依赖项
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
implementation("io.quarkus:quarkus-grpc")
您还需要确保您使用的是 Java 19 或更高版本(我们推荐 21+),您可以在 pom.xml
文件中用以下方式强制执行:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
使用以下命令运行您的应用程序:
java -jar target/quarkus-app/quarkus-run.jar
或者,要使用 Quarkus 开发模式,请在 quarkus-maven-plugin
配置中插入以下内容:
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
然后,您就可以在服务实现中使用 @RunOnVirtualThread
注释了
package io.quarkus.grpc.example.streaming;
import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos;
import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.TestService;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@GrpcService
public class TestServiceImpl implements TestService {
@RunOnVirtualThread
@Override
public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build());
}
@RunOnVirtualThread
@Override
public Uni<Messages.SimpleResponse> unaryCall(Messages.SimpleRequest request) {
var value = request.getPayload().getBody().toStringUtf8();
var resp = Messages.SimpleResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build())
.build();
return Uni.createFrom().item(resp);
}
@Override
@RunOnVirtualThread
public Multi<Messages.StreamingOutputCallResponse> streamingOutputCall(Messages.StreamingOutputCallRequest request) {
var value = request.getPayload().getBody().toStringUtf8();
return Multi.createFrom().<String> emitter(emitter -> {
emitter.emit(value.toUpperCase());
emitter.emit(value.toUpperCase());
emitter.emit(value.toUpperCase());
emitter.complete();
}).map(v -> Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build())
.build());
}
}
限制
接收流的 gRPC 方法,例如 |