编辑此页面

Quarkus 虚拟线程支持 gRPC 服务

本指南将介绍如何在实现 gRPC 服务时利用 Java 虚拟线程。

本指南着重于在 gRPC 扩展中使用虚拟线程。请参阅 使用 Quarkus 虚拟线程支持编写更简单的响应式 REST 服务,以了解更多关于 Java 虚拟线程和 Quarkus 虚拟线程支持的通用信息。

默认情况下,Quarkus gRPC 扩展在事件循环线程上调用服务方法。有关此主题的更多详细信息,请参阅 Quarkus 响应式架构文档。但是,您也可以使用 @Blocking 注释来指示服务是阻塞的,并且应该在工作线程上运行。

Quarkus gRPC 服务虚拟线程支持背后的理念是将服务方法调用卸载到虚拟线程上,而不是在事件循环线程或工作线程上运行。

要为服务方法启用虚拟线程支持,只需将 @RunOnVirtualThread 注释添加到方法上。如果 JDK 兼容(Java 19 或更高版本 - 我们推荐 21+),则调用将被卸载到一个新的虚拟线程。这样,就可以执行阻塞操作,而不会阻塞虚拟线程所挂载的平台线程。

配置 gRPC 服务以使用虚拟线程

让我们通过一个使用虚拟线程实现 gRPC 服务的示例。首先,请确保您的构建文件中包含 gRPC 扩展依赖项

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-grpc</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-grpc")

您还需要确保您使用的是 Java 19 或更高版本(我们推荐 21+),您可以在 pom.xml 文件中用以下方式强制执行:

pom.xml
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
</properties>

使用以下命令运行您的应用程序:

java -jar target/quarkus-app/quarkus-run.jar

或者,要使用 Quarkus 开发模式,请在 quarkus-maven-plugin 配置中插入以下内容:

pom.xml
<plugin>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-maven-plugin</artifactId>
    <version>${quarkus.version}</version>
    <executions>
        <execution>
            <goals>
                <goal>build</goal>
                <goal>generate-code</goal>
                <goal>generate-code-tests</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
      <source>21</source>
      <target>21</target>
    </configuration>
</plugin>

然后,您就可以在服务实现中使用 @RunOnVirtualThread 注释了

package io.quarkus.grpc.example.streaming;

import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos;

import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.TestService;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@GrpcService
public class TestServiceImpl implements TestService {

    @RunOnVirtualThread
    @Override
    public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
        return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build());
    }

    @RunOnVirtualThread
    @Override
    public Uni<Messages.SimpleResponse> unaryCall(Messages.SimpleRequest request) {
        var value = request.getPayload().getBody().toStringUtf8();
        var resp = Messages.SimpleResponse.newBuilder()
                .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build())
                .build();
        return Uni.createFrom().item(resp);
    }

    @Override
    @RunOnVirtualThread
    public Multi<Messages.StreamingOutputCallResponse> streamingOutputCall(Messages.StreamingOutputCallRequest request) {
        var value = request.getPayload().getBody().toStringUtf8();
        return Multi.createFrom().<String> emitter(emitter -> {
            emitter.emit(value.toUpperCase());
            emitter.emit(value.toUpperCase());
            emitter.emit(value.toUpperCase());
            emitter.complete();
        }).map(v -> Messages.StreamingOutputCallResponse.newBuilder()
                .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build())
                .build());
    }
}
限制

接收的 gRPC 方法,例如 Multi,不能使用 @RunOnVirtualThread,因为方法不能是阻塞的,并且必须立即产生其结果(MultiUni)。

相关内容