Java实现gRPC的部分进阶功能

本篇文章内容均基于gRPC官方文档提供的代码示例v1.38.0

你也可以直接看github上的最新版本

错误处理

StatusException gRPC常用的异常类型, 扩展自 Exception , 包含状态信息 Status status 和扩展信息 Metadata trailers

StatusRuntimeException gRPC常用的异常类型, 扩展自 RuntimeException , 包含状态信息 Status status 和扩展信息 Metadata trailers

服务端返回错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 错误扩展信息key
private static final Metadata.Key<DebugInfo> DEBUG_INFO_TRAILER_KEY = ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());
// 错误扩展信息value
private static final DebugInfo DEBUG_INFO = DebugInfo.newBuilder()
.addStackEntries("stack_entry_1")
.addStackEntries("stack_entry_2")
.addStackEntries("stack_entry_3")
.setDetail("detailed error info.").build();
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 错误扩展信息
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DEBUG_INFO);
// 返回错误
responseObserver.onError(
Status.INTERNAL.withDescription(DEBUG_DESC).asRuntimeException(trailers));
}

客户端处理错误

1
2
3
4
5
6
7
8
9
10
private static final Metadata.Key<DebugInfo> DEBUG_INFO_TRAILER_KEY = ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
Metadata trailers = Status.trailersFromThrowable(t);
// 不符合条件则抛出VerifyException
Verify.verify(status.getCode() == Status.Code.INTERNAL);
Verify.verify(trailers.containsKey(DEBUG_INFO_TRAILER_KEY));
Verify.verify(status.getDescription().contains("Overbite"));
}

客户端请求重试

配置重试参数需要在构建channel时传入配置, 具体参数等可以参考 ManagedChannelServiceConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 重试参数
Map<String, Object> name0 = new HashMap<>();
name0.put("service", "helloworld.Greeter");
name0.put("method", "SayHello");
Map<String, Object> retryPolicies = new HashMap<>();
retryPolicies.put("backoffMultiplier", 2.0);
retryPolicies.put("maxAttempts", 5.0);
retryPolicies.put("initialBackoff", "0.5s");
retryPolicies.put("maxBackoff", "30s");
retryPolicies.put("retryableStatusCodes", Arrays.stream(Status.Code.values()).filter(code -> !Status.Code.OK.equals(code)).map(Enum::name).collect(Collectors.toList()));
Map<String, Object> methodConfig0 = new HashMap<>();
methodConfig0.put("name", Collections.singletonList(name0));
methodConfig0.put("retryPolicy", retryPolicies);
Map<String, Object> defaultServiceConfig = new HashMap<>();
defaultServiceConfig.put("methodConfig", Collections.singletonList(methodConfig0));
// 构建channel
helloWorldChannel = ManagedChannelBuilder.forTarget("localhost:50051")
.usePlaintext()
.enableRetry()
.defaultServiceConfig(defaultServiceConfig)
.build();

以上为Java代码方式构造参数, 官方提供的是使用json配置文件+gson解析的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"methodConfig": [
{
"name": [
{
"service": "helloworld.Greeter",
"method": "SayHello"
}
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}
]
}
1
new Gson().fromJson(new JsonReader(new InputStreamReader(RetryingHelloWorldClient.class.getResourceAsStream("retrying_service_config.json"), UTF_8)),Map.class);

服务端拦截器

gRPC支持服务端设置拦截器链

继承 ServerInterceptor 接口并实现 interceptCall 方法, 在创建 server 时进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static class LatencyInjectionInterceptor implements ServerInterceptor {
@Override
public <HelloRequestT, HelloReplyT> Listener<HelloRequestT> interceptCall(ServerCall<HelloRequestT, HelloReplyT> call, Metadata headers, ServerCallHandler<HelloRequestT, HelloReplyT> next) {
// 这个Interceptor制造了长尾延迟
int random = new Random().nextInt(100);
long delay = 0;
if (random < 1) {
delay = 10_000;
} else if (random < 5) {
delay = 5_000;
} else if (random < 10) {
delay = 2_000;
}
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return next.startCall(call, headers);
}
}

Server server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.intercept(new LatencyInjectionInterceptor())
.build()
.start();

请求对冲

对冲请求 是一种优化 长尾延迟 的方法, 参考 Hedged requests — Tackling tail latency | by Ricardo Linck | The Startup | Medium

gRPC自带对冲请求的支持, 类似上面的重试请求, 需要在构造channel时进行配置, 配置粒度精确到方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"methodConfig": [
{
"name": [
{
"service": "helloworld.Greeter",
"method": "SayHello"
}
],
"hedgingPolicy": {
"maxAttempts": 3,
"hedgingDelay": "1s"
}
}
],
"retryThrottling": {
"maxTokens": 10,
"tokenRatio": 0.1
}
}

其他

gRPC提供的示例还包括

  • client自定义请求头
  • server自定义请求头
  • 请求压缩
  • 响应压缩
  • client自定义序列化反序列化逻辑
  • server自定义序列化反序列化逻辑
  • 背压流量控制
  • TLS
  • ALTS(Google’s Application Layer Transport Security)
  • jwt