使用 gRPC 服务
gRPC 客户端可以注入到您的应用程序代码中。
消费 gRPC 服务需要生成 gRPC 类。将您的 proto 文件放在 src/main/proto 中,然后运行 mvn compile 。 |
Stub 和注入
gRPC 生成提供了几种 stub,提供了消费 gRPC 服务的不同方式。您可以注入
-
使用 Mutiny API 的服务接口,
-
使用 gRPC API 的阻塞 stub,
-
基于 Mutiny 的响应式 stub,
-
gRPC
io.grpc.Channel
,它允许您创建其他类型的 stub。
import io.quarkus.grpc.GrpcClient;
import hello.Greeter;
import hello.GreeterGrpc.GreeterBlockingStub;
import hello.MutinyGreeterGrpc.MutinyGreeterStub;
class MyBean {
// A service interface using the Mutiny API
@GrpcClient("helloService") (1)
Greeter greeter;
// A reactive stub based on Mutiny
@GrpcClient("helloService")
MutinyGreeterGrpc.MutinyGreeterStub mutiny;
// A blocking stub using the gRPC API
@GrpcClient
GreeterGrpc.GreeterBlockingStub helloService; (2)
@GrpcClient("hello-service")
Channel channel;
}
1 | gRPC 客户端注入点必须用 @GrpcClient 限定符进行注释。此限定符可用于指定用于配置底层 gRPC 客户端的名称。例如,如果将其设置为 hello-service ,则使用 quarkus.grpc.clients.hello-service.host 配置服务主机。 |
2 | 如果未通过 GrpcClient#value() 指定名称,则将改用字段名称,例如在本例中为 helloService 。 |
Stub 类名源自您 proto
文件中使用的服务名称。例如,如果您使用 Greeter
作为服务名称,如
option java_package = "hello";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
那么服务接口名称为:hello.Greeter
,Mutiny stub 名称为:hello.MutinyGreeterGrpc.MutinyGreeterStub
,阻塞 stub 名称为:hello.GreeterGrpc.GreeterBlockingStub
。
示例
服务接口
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Uni;
import hello.Greeter;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient (1)
Greeter hello;
@GET
@Path("/mutiny/{name}")
public Uni<String> helloMutiny(String name) {
return hello.sayHello(HelloRequest.newBuilder().setName(name).build())
.onItem().transform(HelloReply::getMessage);
}
}
1 | 服务名称源自注入点 - 使用字段名称。必须设置 quarkus.grpc.clients.hello.host 属性。 |
阻塞 Stub
import io.quarkus.grpc.GrpcClient;
import hello.GreeterGrpc.GreeterBlockingStub;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient("hello") (1)
GreeterGrpc.GreeterBlockingStub blockingHelloService;
@GET
@Path("/blocking/{name}")
public String helloBlocking(String name) {
return blockingHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()).getMessage();
}
}
1 | 必须设置 quarkus.grpc.clients.hello.host 属性。 |
处理流
gRPC 允许发送和接收流
service Streaming {
rpc Source(Empty) returns (stream Item) {} // Returns a stream
rpc Sink(stream Item) returns (Empty) {} // Reads a stream
rpc Pipe(stream Item) returns (stream Item) {} // Reads a streams and return a streams
}
使用 Mutiny stub,您可以按如下方式与这些流进行交互
package io.quarkus.grpc.example.streaming;
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/streaming")
@Produces(MediaType.APPLICATION_JSON)
public class StreamingEndpoint {
@GrpcClient
MutinyStreamingGrpc.MutinyStreamingStub streaming;
@GET
public Multi<String> invokeSource() {
// Retrieve a stream
return streaming.source(Empty.newBuilder().build())
.onItem().transform(Item::getValue);
}
@GET
@Path("sink/{max}")
public Uni<Void> invokeSink(int max) {
// Send a stream and wait for completion
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.sink(inputs).onItem().ignore().andContinueWithNull();
}
@GET
@Path("/{max}")
public Multi<String> invokePipe(int max) {
// Send a stream and retrieve a stream
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.pipe(inputs).onItem().transform(Item::getValue);
}
}
客户端配置
对于您在应用程序中注入的每个 gRPC 服务,您可以配置以下属性
全局配置
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
如果设置为 true,并且使用了 Stork 负载均衡器,则将主动请求与所有可用服务实例的连接。这意味着更好的负载均衡,但代价是拥有多个活动连接。 Environment variable: 显示更多 |
布尔值 |
|
每个客户端配置
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
使用新的 Vert.x gRPC 客户端支持。默认情况下,我们仍然使用之前的 Java gRPC 支持。 Environment variable: 显示更多 |
布尔值 |
|
如果使用之前的 Java gRPC 支持,请使用 Vert.x 事件循环来处理 gRPC 客户端。 Environment variable: 显示更多 |
布尔值 |
|
类型 |
默认 |
|
显式启用 XDS 的使用。 Environment variable: 显示更多 |
布尔值 |
|
使用安全凭据。 Environment variable: 显示更多 |
布尔值 |
|
可选的显式目标。 Environment variable: 显示更多 |
字符串 |
|
显式启用进程内使用。 Environment variable: 显示更多 |
布尔值 |
|
设置进程内名称。 Environment variable: 显示更多 |
字符串 |
|
延迟 gRPC ClientCall 的线程数 Environment variable: 显示更多 |
整数 |
|
延迟 gRPC 调用的截止时间(毫秒) Environment variable: 显示更多 |
long |
|
gRPC ClientCall 的重试次数 Environment variable: 显示更多 |
整数 |
|
刷新检查的初始延迟(秒) Environment variable: 显示更多 |
long |
|
刷新周期(秒) Environment variable: 显示更多 |
long |
|
gRPC 服务端口。 Environment variable: 显示更多 |
整数 |
|
gRPC 服务测试端口。 Environment variable: 显示更多 |
整数 |
|
服务公开的主机名/IP。 Environment variable: 显示更多 |
字符串 |
|
服务器证书或证书链的 classpath 路径或文件路径(PEM 格式)。 Environment variable: 显示更多 |
path |
|
相应的证书私钥文件的 classpath 路径或文件路径(PEM 格式)。 Environment variable: 显示更多 |
path |
|
一个可选的信任库,其中包含要信任的证书信息。信任库可以在类路径或外部文件中。 Environment variable: 显示更多 |
path |
|
要使用的 TLS 配置的名称。 如果未设置且已配置默认 TLS 配置( 如果未设置 TLS 配置,并且未配置 重要提示:仅在使用 Quarkus(基于 Vert.x)gRPC 客户端时支持此功能。 Environment variable: 显示更多 |
字符串 |
|
是否启用 SSL/TLS。 Environment variable: 显示更多 |
布尔值 |
|
启用信任所有证书。 默认情况下禁用。 Environment variable: 显示更多 |
布尔值 |
|
信任证书文件(Pem 格式)的逗号分隔列表。 Environment variable: 显示更多 |
字符串列表 |
|
密钥文件(JKS 格式)的路径。 Environment variable: 显示更多 |
字符串 |
|
密钥文件的密码。 Environment variable: 显示更多 |
字符串 |
|
密钥文件(PFX 格式)的路径。 Environment variable: 显示更多 |
字符串 |
|
密钥的密码。 Environment variable: 显示更多 |
字符串 |
|
密钥文件(Pem 格式)路径的逗号分隔列表。 Environment variable: 显示更多 |
字符串列表 |
|
证书文件(Pem 格式)路径的逗号分隔列表。 Environment variable: 显示更多 |
字符串列表 |
|
密钥文件(JKS 格式)的路径。 Environment variable: 显示更多 |
字符串 |
|
密钥文件的密码。 Environment variable: 显示更多 |
字符串 |
|
密钥文件(PFX 格式)的路径。 Environment variable: 显示更多 |
字符串 |
|
密钥的密码。 Environment variable: 显示更多 |
字符串 |
|
是否应在 SSL/TLS 握手中验证主机名。 Environment variable: 显示更多 |
布尔值 |
|
使用名称解析器。默认为 dns。如果设置为“stork”,主机将被视为 SmallRye Stork 服务名称 Environment variable: 显示更多 |
字符串 |
|
是否应使用 Environment variable: 显示更多 |
布尔值 |
|
发送 keep alive ping 的时间间隔。 Environment variable: 显示更多 |
||
字节的流控制窗口。默认为 1MiB。 Environment variable: 显示更多 |
整数 |
|
进入空闲模式之前没有活动 RPC 的持续时间。 Environment variable: 显示更多 |
||
keep alive ping 的发送方等待确认的时间。 Environment variable: 显示更多 |
||
当连接上没有未完成的 RPC 时,是否执行 keep-alive。 Environment variable: 显示更多 |
布尔值 |
|
最大试探性尝试次数。 Environment variable: 显示更多 |
整数 |
|
最大重试次数。重试必须显式启用。 Environment variable: 显示更多 |
整数 |
|
对于每个通道或子通道,在示踪剂中保留的最大通道跟踪事件数。 Environment variable: 显示更多 |
整数 |
|
单个 gRPC 帧允许的最大消息大小(以字节为单位)。默认为 4 MiB。 Environment variable: 显示更多 |
整数 |
|
允许接收的元数据的最大大小(以字节为单位)。默认为 8192B。 Environment variable: 显示更多 |
整数 |
|
HTTP/2 连接的协商类型。可接受的值为: Environment variable: 显示更多 |
字符串 |
|
覆盖 TLS 和 HTTP 虚拟主机使用的权限。 Environment variable: 显示更多 |
字符串 |
|
用于重试的每个 RPC 缓冲区限制(以字节为单位)。 Environment variable: 显示更多 |
long |
|
是否启用重试。注意:重试默认禁用。 Environment variable: 显示更多 |
布尔值 |
|
重试缓冲区大小(以字节为单位)。 Environment variable: 显示更多 |
long |
|
使用自定义用户代理。 Environment variable: 显示更多 |
字符串 |
|
使用自定义负载均衡策略。可接受的值为: Environment variable: 显示更多 |
字符串 |
|
每次调用的压缩方式。可接受的值为 Environment variable: 显示更多 |
字符串 |
|
每次调用的截止时间。 Environment variable: 显示更多 |
关于 Duration 格式
要写入持续时间值,请使用标准的 您还可以使用简化的格式,以数字开头
在其他情况下,简化格式将被转换为
|
client-name
是在 @GrpcClient
中设置的名称,或者在未显式定义时从注入点派生的名称。
以下示例使用 *hello* 作为客户端名称。请记住将其替换为您在 @GrpcClient
注释中使用的名称。
当您启用 quarkus.grpc.clients."client-name".xds.enabled 时,xDS 应处理上述大部分配置。 |
自定义通道构建
当 Quarkus 构建 gRPC Channel 实例(gRPC 客户端在较低网络级别与 gRPC 服务通信的方式)时,用户可以应用自己的 Channel(Builder) 自定义器。自定义器按 priority
应用,数字越大,自定义器应用得越晚。自定义器在 Quarkus 应用用户客户端配置之前应用;例如,适用于所有客户端的初始默认值。
有两个 customize
方法,第一个方法使用 gRPC 的 ManagedChannelBuilder
作为参数 - 用于 Quarkus 的旧版 gRPC 支持,另一个使用 GrpcClientOptions
- 用于新的 Vert.x gRPC 支持。用户应根据 gRPC 支持类型使用实现正确的 customize
方法,如果自定义器是 gRPC 类型中性的,则可以实现两者。
public interface ChannelBuilderCustomizer<T extends ManagedChannelBuilder<T>> {
/**
* Customize a ManagedChannelBuilder instance.
*
* @param name gRPC client name
* @param config client's configuration
* @param builder Channel builder instance
* @return map of config properties to be used as default service config against the builder
*/
default Map<String, Object> customize(String name, GrpcClientConfiguration config, T builder) {
return Map.of();
}
/**
* Customize a GrpcClientOptions instance.
*
* @param name gRPC client name
* @param config client's configuration
* @param options GrpcClientOptions instance
*/
default void customize(String name, GrpcClientConfiguration config, GrpcClientOptions options) {
}
/**
* Priority by which the customizers are applied.
* Higher priority is applied later.
*
* @return the priority
*/
default int priority() {
return 0;
}
}
启用 TLS
要启用 TLS,请使用以下配置。请注意,配置中的所有路径都可以指定类路径中的资源(通常来自 src/main/resources
或其子文件夹)或外部文件。
quarkus.grpc.clients.hello.host=localhost
# either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
配置 SSL/TLS 时,plain-text 会自动禁用。 |
带双向认证的 TLS
要使用带双向认证的 TLS,请使用以下配置
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.plain-text=false
# all the following may use either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.certificate=tls/client.pem
quarkus.grpc.clients.hello.ssl.key=tls/client.key
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
客户端 Stub 截止时间
如果您需要为 gRPC stub 配置截止时间,即指定一个时间段,在此之后 stub 将始终返回 DEADLINE_EXCEEDED
状态错误。您可以通过 quarkus.grpc.clients."service-name".deadline
配置属性指定截止时间,例如
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.deadline=2s (1)
1 | 为所有注入的 stub 设置截止时间。 |
不要使用此功能来实现 RPC 超时。要实现 RPC 超时,请使用 Mutiny call.ifNoItem().after(…) 或 Fault Tolerance @Timeout 。 |
gRPC 标头
与 HTTP 类似,gRPC 调用除了消息之外,还可以携带标头。标头可用于身份验证等。
要为 gRPC 调用设置标头,请创建一个附加了标头的客户端,然后在此客户端上执行调用
import jakarta.enterprise.context.ApplicationScoped;
import examples.Greeter;
import examples.HelloReply;
import examples.HelloRequest;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class MyService {
@GrpcClient
Greeter client;
public Uni<HelloReply> doTheCall() {
Metadata extraHeaders = new Metadata();
if (headers) {
extraHeaders.put("my-header", "my-interface-value");
}
Greeter alteredClient = GrpcClientUtils.attachHeaders(client, extraHeaders); (1)
return alteredClient.sayHello(HelloRequest.newBuilder().setName(name).build()); (2)
}
}
1 | 修改客户端以附加调用 extraHeaders |
2 | 使用修改后的客户端执行调用。原始客户端保持不变 |
GrpcClientUtils
可与所有类型的客户端协同工作。
客户端拦截器
gRPC 客户端拦截器可以由实现 io.grpc.ClientInterceptor
接口的 CDI bean 来实现。您可以使用 @io.quarkus.grpc.RegisterClientInterceptor
注释注入的客户端,为特定客户端实例注册指定的拦截器。@RegisterClientInterceptor
注释是可重复的。或者,如果您想将拦截器应用于任何注入的客户端,请用 @io.quarkus.grpc.GlobalInterceptor
注释拦截器 bean。
import io.quarkus.grpc.GlobalInterceptor;
import io.grpc.ClientInterceptor;
@GlobalInterceptor (1)
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
}
1 | 此拦截器应用于所有注入的 gRPC 客户端。 |
还可以将生产者方法注释为全局拦截器
import io.quarkus.grpc.GlobalInterceptor;
import jakarta.enterprise.inject.Produces;
public class MyProducer {
@GlobalInterceptor
@Produces
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
}
请查阅 ClientInterceptor JavaDoc 以正确实现您的拦截器。 |
@RegisterClientInterceptor
示例import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.RegisterClientInterceptor;
import hello.Greeter;
@ApplicationScoped
class MyBean {
@RegisterClientInterceptor(MySpecialInterceptor.class) (1)
@GrpcClient("helloService")
Greeter greeter;
}
1 | 为该特定客户端注册 MySpecialInterceptor 。 |
当您有多个客户端拦截器时,可以通过实现 jakarta.enterprise.inject.spi.Prioritized
接口来对它们进行排序
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor, Prioritized {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
@Override
public int getPriority() {
return 10;
}
}
优先级最高的拦截器会首先被调用。如果拦截器未实现 Prioritized
接口,则默认优先级为 0
。
gRPC 客户端指标
启用指标收集
当应用程序还使用 quarkus-micrometer
扩展时,gRPC 客户端指标会自动启用。Micrometer 收集应用程序使用的所有 gRPC 客户端的指标。
例如,如果您将指标导出到 Prometheus,您将得到
# HELP grpc_client_responses_received_messages_total The total number of responses received
# TYPE grpc_client_responses_received_messages_total counter
grpc_client_responses_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_requests_sent_messages_total The total number of requests sent
# TYPE grpc_client_requests_sent_messages_total counter
grpc_client_requests_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_processing_duration_seconds The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds summary
grpc_client_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_client_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.167411625
# HELP grpc_client_processing_duration_seconds_max The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds_max gauge
grpc_client_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.136478028
服务名称、方法和类型可以在 *tags* 中找到。
自定义异常处理
如果任何 gRPC 服务或服务器拦截器抛出(自定义)异常,您可以将自己的 ExceptionHandlerProvider 作为 CDI bean 添加到您的应用程序中,以提供对这些异常的自定义处理。
例如
@ApplicationScoped
public class HelloExceptionHandlerProvider implements ExceptionHandlerProvider {
@Override
public <ReqT, RespT> ExceptionHandler<ReqT, RespT> createHandler(ServerCall.Listener<ReqT> listener,
ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
return new HelloExceptionHandler<>(listener, serverCall, metadata);
}
@Override
public Throwable transform(Throwable t) {
if (t instanceof HelloException he) {
return new StatusRuntimeException(Status.ABORTED.withDescription(he.getName()));
} else {
return ExceptionHandlerProvider.toStatusException(t, true);
}
}
private static class HelloExceptionHandler<A, B> extends ExceptionHandler<A, B> {
public HelloExceptionHandler(ServerCall.Listener<A> listener, ServerCall<A, B> call, Metadata metadata) {
super(listener, call, metadata);
}
@Override
protected void handleException(Throwable t, ServerCall<A, B> call, Metadata metadata) {
StatusRuntimeException sre = (StatusRuntimeException) ExceptionHandlerProvider.toStatusException(t, true);
Metadata trailers = sre.getTrailers() != null ? sre.getTrailers() : metadata;
call.close(sre.getStatus(), trailers);
}
}
}
开发模式
默认情况下,在开发模式下启动应用程序时,会启动 gRPC 服务器,即使没有配置任何服务。您可以使用以下属性配置 gRPC 扩展的开发模式行为。
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
即使没有实现 gRPC 服务,也在开发模式下启动 gRPC 服务器。默认设置为 Environment variable: 显示更多 |
布尔值 |
|
注入模拟客户端
在您的 @QuarkusTest
中,您可以使用 @InjectMock
来注入 gRPC 服务的 Mutiny 客户端
@QuarkusTest
public class GrpcMockTest {
@InjectMock
@GrpcClient("hello")
Greeter greeter;
@Test
void test1() {
HelloRequest request = HelloRequest.newBuilder().setName("neo").build();
Mockito.when(greeter.sayHello(Mockito.any(HelloRequest.class)))
.thenReturn(Uni.createFrom().item(HelloReply.newBuilder().setMessage("hello neo").build()));
Assertions.assertEquals(greeter.sayHello(request).await().indefinitely().getMessage(), "hello neo");
}
}
只有 Mutiny 客户端可以被 *模拟*,通道和其他 stub 不能被模拟。 |