实现 gRPC 服务
作为 CDI bean 公开的 gRPC 服务实现会自动注册并由 quarkus-grpc 提供服务。
实现 gRPC 服务需要生成 gRPC 类。将您的 proto 文件放在 src/main/proto 中并运行 mvn compile 。 |
生成的代码
Quarkus 为 proto
文件中声明的服务生成一些实现类
-
使用 Mutiny API 的服务接口
-
类名是
${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}
-
-
使用 gRPC API 的实现基类
-
类名结构如下:
${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}Grpc.${NAME_OF_THE_SERVICE}ImplBase
-
例如,如果您使用以下 proto
文件片段
option java_package = "hello"; (1)
service Greeter { (2)
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
1 | hello 是生成类的 java 包。 |
2 | Greeter 是服务名称。 |
然后服务接口是 hello.Greeter
,实现基类是抽象静态嵌套类:hello.GreeterGrpc.GreeterImplBase
。
您需要实现服务接口或使用您的服务实现 bean 扩展基类,如下节所述。 |
使用 Mutiny API 实现服务
要使用 Mutiny API 实现 gRPC 服务,请创建一个实现服务接口的类。然后,实现服务接口中定义的方法。如果您不想实现服务方法,只需从方法体中抛出一个 java.lang.UnsupportedOperationException
异常(该异常将自动转换为相应的 gRPC 异常)。最后,实现该服务并添加 @GrpcService
注解
import io.quarkus.grpc.GrpcService;
import hello.Greeter;
@GrpcService (1)
public class HelloService implements Greeter { (2)
@Override
public Uni<HelloReply> sayHello(HelloRequest request) {
return Uni.createFrom().item(() ->
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
);
}
}
1 | gRPC 服务实现 bean 必须使用 @GrpcService 注解进行注解,并且不应声明任何其他 CDI 限定符。所有 gRPC 服务都具有 jakarta.inject.Singleton 作用域。此外,请求上下文在服务调用期间始终处于活动状态。 |
2 | hello.Greeter 是生成的服务接口。 |
服务实现 bean 也可以扩展 Mutiny 实现基类,其中类名结构如下:Mutiny${NAME_OF_THE_SERVICE}Grpc.${NAME_OF_THE_SERVICE}ImplBase 。 |
使用默认 gRPC API 实现服务
要使用默认 gRPC API 实现 gRPC 服务,请创建一个扩展默认实现基类的类。然后,覆盖服务接口中定义的方法。最后,实现该服务并添加 @GrpcService
注解
import io.quarkus.grpc.GrpcService;
@GrpcService
public class HelloService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
String name = request.getName();
String message = "Hello " + name;
responseObserver.onNext(HelloReply.newBuilder().setMessage(message).build());
responseObserver.onCompleted();
}
}
阻塞服务实现
默认情况下,gRPC 服务的所有方法都在事件循环中运行。因此,您不能阻塞。如果您的服务逻辑必须阻塞,请使用 io.smallrye.common.annotation.Blocking
注解该方法
@Override
@Blocking
public Uni<HelloReply> sayHelloBlocking(HelloRequest request) {
// Do something blocking before returning the Uni
}
处理流
gRPC 允许接收和返回流
service Streaming {
rpc Source(Empty) returns (stream Item) {} // Returns a stream
rpc Sink(stream Item) returns (Empty) {} // Reads a stream
rpc Pipe(stream Item) returns (stream Item) {} // Reads a streams and return a streams
}
使用 Mutiny,您可以按如下方式实现这些
import io.quarkus.grpc.GrpcService;
@GrpcService
public class StreamingService implements Streaming {
@Override
public Multi<Item> source(Empty request) {
// Just returns a stream emitting an item every 2ms and stopping after 10 items.
return Multi.createFrom().ticks().every(Duration.ofMillis(2))
.select().first(10)
.map(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
@Override
public Uni<Empty> sink(Multi<Item> request) {
// Reads the incoming streams, consume all the items.
return request
.map(Item::getValue)
.map(Long::parseLong)
.collect().last()
.map(l -> Empty.newBuilder().build());
}
@Override
public Multi<Item> pipe(Multi<Item> request) {
// Reads the incoming stream, compute a sum and return the cumulative results
// in the outbound stream.
return request
.map(Item::getValue)
.map(Long::parseLong)
.onItem().scan(() -> 0L, Long::sum)
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
}
健康检查
对于已实现的服务,Quarkus gRPC 以以下格式公开运行状况信息
syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
客户端可以指定完全限定的服务名称以获取特定服务的运行状况,或者跳过指定服务名称以获取 gRPC 服务器的总体状态。
有关更多详细信息,请查看 gRPC 文档
此外,如果将 Quarkus SmallRye Health 添加到应用程序,则 gRPC 服务的状态的就绪性检查将添加到 MicroProfile Health 端点响应,即 /q/health
。
伸缩
默认情况下,quarkus-grpc 启动在单个事件循环上运行的单个 gRPC 服务器。
如果您希望伸缩您的服务器,您可以通过设置 quarkus.grpc.server.instances
来设置服务器实例的数量。
服务器配置
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
我们是否使用单独的 HTTP 服务器来服务 gRPC 请求。如果您想使用新的 Vert.x gRPC 支持(它使用现有的 Vert.x HTTP 服务器),请将其设置为 false。 环境变量: 显示更多 |
布尔值 |
|
类型 |
默认 |
|
布尔值 |
|
|
布尔值 |
|
|
布尔值 |
|
|
字符串 |
|
|
整数 |
|
|
整数 |
|
|
字符串 |
|
|
最大入站消息大小(以字节为单位)。 当使用单个服务器(使用 环境变量: 显示更多 |
整数 |
|
最大入站元数据大小(以字节为单位) 环境变量: 显示更多 |
整数 |
|
服务器证书或证书链的 classpath 路径或文件路径(PEM 格式)。 环境变量: 显示更多 |
path |
|
相应的证书私钥文件的 classpath 路径或文件路径(PEM 格式)。 环境变量: 显示更多 |
path |
|
可选的密钥库,用于保存证书信息,而不是指定单独的文件。密钥库可以在 classpath 上或外部文件中。 环境变量: 显示更多 |
path |
|
可选参数,用于指定密钥库文件的类型。如果未给定,则根据文件名自动检测类型。 环境变量: 显示更多 |
字符串 |
|
用于指定密钥库文件密码的参数。 环境变量: 显示更多 |
字符串 |
|
用于指定密钥库文件别名的参数。 环境变量: 显示更多 |
字符串 |
|
用于指定密钥库文件别名密码的参数。 环境变量: 显示更多 |
字符串 |
|
可选的信任存储,用于保存要信任的证书的证书信息 信任存储可以在 classpath 上或外部文件中。 环境变量: 显示更多 |
path |
|
可选参数,用于指定信任存储文件的类型。如果未给定,则根据文件名自动检测类型。 环境变量: 显示更多 |
字符串 |
|
用于指定信任存储文件密码的参数。 环境变量: 显示更多 |
字符串 |
|
要使用的密码套件。如果没有给出,则选择一个合理的默认值。 环境变量: 显示更多 |
字符串列表 |
|
设置已启用的 SSL/TLS 协议的有序列表。 如果未设置,则默认为 请注意,设置空列表并启用 SSL/TLS 是无效的。您必须至少有一个协议。 环境变量: 显示更多 |
字符串列表 |
|
配置引擎以要求/请求客户端身份验证。NONE、REQUEST、REQUIRED 环境变量: 显示更多 |
|
|
禁用 SSL,并改用纯文本。如果禁用,请配置 ssl 配置。 环境变量: 显示更多 |
布尔值 |
|
布尔值 |
|
|
证书文件的路径。 环境变量: 显示更多 |
字符串 |
|
私钥文件的路径。 环境变量: 显示更多 |
字符串 |
|
启用 gRPC 反射服务。默认情况下,反射服务仅在 环境变量: 显示更多 |
布尔值 |
|
gRPC 服务器 verticle 实例的数量。这对于跨多个内核轻松伸缩很有用。该数字不应超过事件循环的数量。 环境变量: 显示更多 |
整数 |
|
设置自定义的保持活动时间。这配置了在没有读取活动时发送 环境变量: 显示更多 |
||
设置自定义的 permit-keep-alive 持续时间。这配置了允许客户端配置的最激进的保持活动时间。服务器将尝试检测超出此速率的客户端,并在检测到时强制关闭连接。 环境变量: 显示更多 |
||
设置是否允许客户端即使在连接上没有未完成的 RPC 的情况下也发送保持活动 HTTP/2 PING。 环境变量: 显示更多 |
布尔值 |
|
字符串 |
关于 Duration 格式
要编写 duration 值,请使用标准的 您还可以使用简化的格式,以数字开头
在其他情况下,简化格式将被转换为
|
当您禁用 quarkus.grpc.server.use-separate-server 时,您将使用新的 Vert.x gRPC 服务器实现,该实现使用现有的 HTTP 服务器。这意味着服务器端口现在是 8080 (或使用 quarkus.http.port 配置的端口)。此外,大多数其他配置属性不再适用,因为应该已经正确配置了 HTTP 服务器。 |
当您启用 quarkus.grpc.server.xds.enabled 时,应该是 xDS 处理上面的大部分配置。 |
配置示例
启用 TLS
要启用 TLS,请使用以下配置。
请注意,配置中的所有路径都可以指定 classpath 上的资源(通常来自 src/main/resources
或其子文件夹)或外部文件。
quarkus.grpc.server.ssl.certificate=tls/server.pem
quarkus.grpc.server.ssl.key=tls/server.key
当配置 SSL/TLS 时,plain-text 会自动禁用。 |
带有相互身份验证的 TLS
要使用带有相互身份验证的 TLS,请使用以下配置
quarkus.grpc.server.ssl.certificate=tls/server.pem
quarkus.grpc.server.ssl.key=tls/server.key
quarkus.grpc.server.ssl.trust-store=tls/ca.jks
quarkus.grpc.server.ssl.trust-store-password=*****
quarkus.grpc.server.ssl.client-auth=REQUIRED
自定义服务器构建
当 Quarkus 构建 gRPC 服务器实例时,用户可以应用他们自己的 Server(Builder) 定制器。定制器按 priority
应用,数字越高,应用定制器的越晚。定制器在 Quarkus 应用用户的服务器配置之前应用;例如,非常适合某些初始默认值。
有两个 customize
方法,第一个使用 gRPC 的 ServerBuilder
作为参数 - 用于 Quarkus 的旧 gRPC 支持,另一个使用 GrpcServerOptions
- 用于新的 Vert.x gRPC 支持。用户应根据 gRPC 支持类型使用情况实现正确的 customize
方法,如果定制器是 gRPC 类型中性的,则可以同时实现。
public interface ServerBuilderCustomizer<T extends ServerBuilder<T>> {
/**
* Customize a ServerBuilder instance.
*
* @param config server's configuration
* @param builder Server builder instance
*/
default void customize(GrpcServerConfiguration config, T builder) {
}
/**
* Customize a GrpcServerOptions instance.
*
* @param config server's configuration
* @param options GrpcServerOptions instance
*/
default void customize(GrpcServerConfiguration config, GrpcServerOptions options) {
}
/**
* Priority by which the customizers are applied.
* Higher priority is applied later.
*
* @return the priority
*/
default int priority() {
return 0;
}
}
服务器拦截器
gRPC 服务器拦截器允许您在调用服务之前执行逻辑,例如身份验证。
您可以通过创建一个实现 io.grpc.ServerInterceptor
的 @ApplicationScoped
bean 来实现 gRPC 服务器拦截器
@ApplicationScoped
// add @GlobalInterceptor for interceptors meant to be invoked for every service
public class MyInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// ...
}
}
也可以将生产者方法注解为全局拦截器
import io.quarkus.grpc.GlobalInterceptor;
import jakarta.enterprise.inject.Produces;
public class MyProducer {
@GlobalInterceptor
@Produces
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
}
查看 ServerInterceptor JavaDoc 以正确实现您的拦截器。 |
要将拦截器应用于所有公开的服务,请使用 @io.quarkus.grpc.GlobalInterceptor
注解它。要将拦截器应用于单个服务,请使用 @io.quarkus.grpc.RegisterInterceptor
在服务上注册它
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.RegisterInterceptor;
@GrpcService
@RegisterInterceptor(MyInterceptor.class)
public class StreamingService implements Streaming {
// ...
}
当您有多个服务器拦截器时,您可以通过实现 jakarta.enterprise.inject.spi.Prioritized
接口来对它们进行排序。请注意,所有全局拦截器都在特定于服务的拦截器之前调用。
@ApplicationScoped
public class MyInterceptor implements ServerInterceptor, Prioritized {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// ...
}
@Override
public int getPriority() {
return 10;
}
}
优先级最高的拦截器首先被调用。如果拦截器未实现 Prioritized
接口,则使用的默认优先级为 0
。
如果需要,还支持将 Vert.x RoutingContext
实例注入到您的 gRPC 服务中。Quarkus 默认不这样做,您需要将 RoutingContextGrpcInterceptor
添加到您的 gRPC 服务中。
@GrpcService
@RegisterInterceptor(RoutingContextGrpcInterceptor.class)
public class HelloWorldService extends GreeterGrpc.GreeterImplBase {
@Inject
RoutingContext context;
// ...
}
测试您的服务
测试 gRPC 服务的最简单方法是使用 gRPC 客户端,如 使用 gRPC 服务 中所述。
请注意,在使用客户端测试未使用的 TLS 的公开服务的情况下,无需提供任何配置。例如,要测试上面定义的 HelloService
,可以创建以下测试
public class HelloServiceTest implements Greeter {
@GrpcClient
Greeter client;
@Test
void shouldReturnHello() {
CompletableFuture<String> message = new CompletableFuture<>();
client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
.subscribe().with(reply -> message.complete(reply.getMessage()));
assertThat(message.get(5, TimeUnit.SECONDS)).isEqualTo("Hello Quarkus");
}
}
手动试用您的服务
在开发模式下,您可以在 Quarkus Dev UI 中试用您的 gRPC 服务。只需转到 https://:8080/q/dev-ui 并单击 gRPC 磁贴下的服务。
请注意,您的应用程序需要公开“正常”的 HTTP 端口,以便 Dev UI 可以访问。如果您的应用程序不公开任何 HTTP 端点,您可以创建一个专用的 profile,其中包含对 quarkus-vertx-http
的依赖
<profiles>
<profile>
<id>development</id>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
有了它,您可以运行开发模式:mvn quarkus:dev -Pdevelopment
。
如果您使用 Gradle,您可以简单地为 quarkusDev
任务添加依赖项
dependencies {
quarkusDev 'io.quarkus:quarkus-vertx-http'
}
gRPC 服务器指标
启用指标收集
当应用程序也使用 quarkus-micrometer
扩展时,会自动启用 gRPC 服务器指标。Micrometer 收集应用程序实现的所有 gRPC 服务的指标。
例如,如果您将指标导出到 Prometheus,您将得到
# HELP grpc_server_responses_sent_messages_total The total number of responses sent
# TYPE grpc_server_responses_sent_messages_total counter
grpc_server_responses_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_server_processing_duration_seconds The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds summary
grpc_server_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_server_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.016216771
# HELP grpc_server_processing_duration_seconds_max The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds_max gauge
grpc_server_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.007985236
# HELP grpc_server_requests_received_messages_total The total number of requests received
# TYPE grpc_server_requests_received_messages_total counter
grpc_server_requests_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
服务名称、方法和类型可以在 tags 中找到。
禁用指标收集
要在使用 quarkus-micrometer
时禁用 gRPC 服务器指标,请将以下属性添加到应用程序配置
quarkus.micrometer.binder.grpc-server.enabled=false
使用虚拟线程
要在您的 gRPC 服务实现中使用虚拟线程,请查看专门的 指南。
gRPC 服务器授权
当 Vert.x gRPC 支持(它使用现有的 Vert.x HTTP 服务器)启用时,Quarkus 包括内置安全性,以允许 使用注解进行授权。
添加 Quarkus Security 扩展
安全功能由 Quarkus Security 扩展提供,因此请确保您的 pom.xml
文件包含以下依赖项
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
要将 Quarkus Security 扩展添加到现有的 Maven 项目,请从您的项目基本目录运行以下命令
quarkus extension add security
./mvnw quarkus:add-extension -Dextensions='security'
./gradlew addExtension --extensions='security'
受支持的身份验证机制概述
一些受支持的身份验证机制内置于 Quarkus 中,而另一些则需要您添加扩展。下表将特定的身份验证要求映射到您可以在 Quarkus 中使用的受支持的机制
身份验证要求 | 身份验证机制 |
---|---|
用户名和密码 |
|
客户端证书 |
|
自定义要求 |
|
Bearer 访问令牌 |
不要忘记安装至少一个提供基于所选身份验证要求的 IdentityProvider
的扩展。请参阅 基本身份验证指南,了解如何提供基于用户名和密码的 IdentityProvider
的示例。
如果您使用单独的 HTTP 服务器来服务 gRPC 请求,则 自定义身份验证 是您的唯一选择。将 quarkus.grpc.server.use-separate-server 配置属性设置为 false ,以便您可以使用其他机制。 |
安全 gRPC 服务
可以使用 标准安全注解 来保护 gRPC 服务,如下例所示
package org.acme.grpc.auth;
import hello.Greeter;
import io.quarkus.grpc.GrpcService;
import jakarta.annotation.security.RolesAllowed;
@GrpcService
public class HelloService implements Greeter {
@RolesAllowed("admin")
@Override
public Uni<HelloReply> sayHello(HelloRequest request) {
return Uni.createFrom().item(() ->
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
);
}
}
大多数支持的机制示例都发送身份验证标头,请参阅 Consuming a gRPC Service 指南的 gRPC 标头 部分,了解有关 gRPC 标头的更多信息。
基本身份验证
Quarkus Security 为 基本身份验证 提供内置身份验证支持。
quarkus.grpc.server.use-separate-server=false
quarkus.http.auth.basic=true (1)
1 | 启用基本身份验证。 |
package org.acme.grpc.auth;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import org.acme.proto.Greeter;
import org.acme.proto.HelloRequest;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
@QuarkusTest
public class GreeterServiceTest {
private static final Metadata.Key<String> AUTHORIZATION = Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
@GrpcClient
Greeter greeterClient;
@Test
void shouldReturnHello() throws ExecutionException, InterruptedException, TimeoutException {
Metadata headers = new Metadata();
// Set the headers - Basic auth for testing
headers.put(AUTHORIZATION, "Basic YWxpY2U6YWxpY2U="); // alice:alice with "admin" role
var client = GrpcClientUtils.attachHeaders(greeterClient, headers);
// Call the client
CompletableFuture<String> message = new CompletableFuture<>();
client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
.subscribe().with(reply -> message.complete(reply.getMessage()));
// Get the values
String theValue = message.get(5, TimeUnit.SECONDS);
// Assert
assertThat(theValue, is("Hello Quarkus"));
}
}
相互 TLS 身份验证
Quarkus 提供相互 TLS (mTLS) 身份验证,以便您可以基于用户的 X.509 证书对用户进行身份验证。为此指南的 带有相互身份验证的 TLS 部分描述了为所有 gRPC 服务强制执行身份验证的最简单方法。但是,Quarkus Security 支持角色映射,您可以使用它来执行更细粒度的访问控制。
quarkus.grpc.server.use-separate-server=false
quarkus.http.insecure-requests=disabled
quarkus.http.ssl.certificate.files=tls/server.pem
quarkus.http.ssl.certificate.key-files=tls/server.key
quarkus.http.ssl.certificate.trust-store-file=tls/ca.jks
quarkus.http.ssl.certificate.trust-store-password=**********
quarkus.http.ssl.client-auth=required
quarkus.http.auth.certificate-role-properties=role-mappings.txt (1)
quarkus.native.additional-build-args=-H:IncludeResources=.*\\.txt
1 | 添加证书角色映射。 |
testclient=admin (1)
1 | 将 testclient 证书 CN(通用名称)映射到 SecurityIdentity 角色 admin 。 |
自定义身份验证
如果 Quarkus 提供的上述机制无法满足您的需求,您可以随时实现一个或多个 GrpcSecurityMechanism
bean。
GrpcSecurityMechanism
示例package org.acme.grpc.auth;
import jakarta.inject.Singleton;
import io.grpc.Metadata;
import io.quarkus.security.credential.PasswordCredential;
import io.quarkus.security.identity.request.AuthenticationRequest;
import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest;
@Singleton
public class CustomGrpcSecurityMechanism implements GrpcSecurityMechanism {
private static final Metadata.Key<String> AUTHORIZATION = Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
@Override
public boolean handles(Metadata metadata) {
String authString = metadata.get(AUTHORIZATION);
return authString != null && authString.startsWith("Custom ");
}
@Override
public AuthenticationRequest createAuthenticationRequest(Metadata metadata) {
final String authString = metadata.get(AUTHORIZATION);
final String userName;
final String password;
// here comes your application logic that transforms 'authString' to user name and password
return new UsernamePasswordAuthenticationRequest(userName, new PasswordCredential(password));
}
}