Java实现gRPC的服务端和客户端

gRPC简介

gRPC是一种 RPC 框架,特点:

  1. 序列化/反序列化使用Protocol Buffers,比起json等格式更高效
  2. 支持多种语言编写服务端/客户端

gRPC官网

Java开发gRPC流程

本章节内容基于 Basics tutorial | Java | gRPC

定义接口

protocol buffer使用proto文件定义接口,参考 Language Guide (proto3)  |  Protocol Buffers  |  Google Developers

以下为示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 协议版本
syntax = "proto3";

// 编译后是否生成多个Java类
option java_multiple_files = true;
// 编译后生成Java类的package路径
option java_package = "idol.elie.grpc.routeguide.protobuf";
// 编译后生成的配置基类名
option java_outer_classname = "RouteGuideProto";

// proto内部使用的package
package routeguide;

// 接口
service RouteGuide {
// 客户端同步请求,服务端单次返回
rpc GetFeature(Point) returns (Feature) {};
// 客户端同步请求,服务端流式返回
rpc ListFeatures(Rectangle) returns (stream Feature);
// 客户端流式异步请求,服务端单次返回
rpc RecordRoute(stream Point) returns (RouteSummary);
// 客户端流式异步请求,服务端流式返回
rpc RouteChat(stream RouteNote) returns (stream RouteNote);
}
// 数据类
message Point {
int32 latitude = 1;
int32 longitude = 2;
}

message Rectangle {
Point p1 = 1;
Point p2 = 2;
}

message Feature {
string name = 1;
Point location = 2;
}

message FeatureDatabase {
repeated Feature features = 1;
}

message RouteNote {
Point location = 1;
string message = 2;
}

message RouteSummary {
int32 point_count = 1;
int32 feature_count = 2;
int32 distance = 3;
int32 elapsed_time = 4;
}

proto编译生成Java类

Java版本推荐使用maven插件进行编译,os-maven-plugin判断当前运行平台,protobuf-maven-plugin使用对应平台的protoc进行编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<properties>
<protoc.version>3.12.0</protoc.version>
</properties>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>

将编译生成的类复制到src的对应目录下即可开始正式进行开发

编写服务代码

BASE

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
RouteGuideGrpc为编译生成的基础配置类, 我们需要继承并实现RouteGuideImplBase
没有被继承实现的方法被调用后会向客户端报错
*/
public class RouteGuideImpl extends RouteGuideGrpc.RouteGuideImplBase {

}

// 创建web服务并监听
Server server = ServerBuilder
.forPort(port)
.addService(new RouteGuideImpl())
.build()
.start();
server.awaitTermination();

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RouteGuideClient {
// 同步调用stub
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
// 异步调用stub
private final RouteGuideGrpc.RouteGuideStub stub;

public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
stub = RouteGuideGrpc.newStub(channel);
}
}

ManagedChannel routeGuideChannel = ManagedChannelBuilder
.forTarget("localhost:25252") // target server
.usePlaintext() // not ssl
.build();
RouteGuideClient routeGuideClient = new RouteGuideClient(routeGuideChannel);

客户端同步请求,服务端单次返回

服务端

1
2
3
4
5
6
7
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
// 处理请求
responseObserver.onNext(Feature.newBuilder().build());
// 处理完成
responseObserver.onCompleted();
}

客户端

1
2
3
4
5
6
7
8
public void getFeature(int latitude, int longitude) {
try {
// 使用阻塞stub直接调用
Feature feature = blockingStub.getFeature(Point.newBuilder().build());
} catch (Exception e) {
logger.log(Level.WARNING, "getFeature error", e);
}
}

客户端同步请求,服务端流式返回

服务端

1
2
3
4
5
6
7
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
// 返回流式响应
responseObserver.onNext(Feature.newBuilder().build());
responseObserver.onNext(Feature.newBuilder().build());
responseObserver.onCompleted();
}

客户端

1
2
3
4
5
6
7
8
9
10
11
public void listFeatures() {
try {
Iterator<Feature> featureIterator = blockingStub.listFeatures(Rectangle.newBuilder().build());
// 用Iterator读取流式数据
for (var i = 0; featureIterator.hasNext(); ++i) {
// ...
}
} catch (Exception e) {
logger.log(Level.WARNING, "listFeatures error", e);
}
}

客户端流式异步请求,服务端单次返回

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(Point value) {
// ...
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
// 返回响应
responseObserver.onNext(RouteSummary.newBuilder().build());
responseObserver.onCompleted();
}
};
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void recordRoute() throws InterruptedException {
// 异步响应处理
StreamObserver<RouteSummary> responseObserver = new StreamObserver<>() {
@Override
public void onNext(RouteSummary value) {
// 处理返回值
}

@Override
public void onError(Throwable t) {
// ...
}

@Override
public void onCompleted() {
// ...
}
};
// 用响应处理器生成请求
StreamObserver<Point> requestObserver = stub.recordRoute(responseObserver);
// 流式请求
requestObserver.onNext(Point.newBuilder().build());
requestObserver.onNext(Point.newBuilder().build());
requestObserver.onCompleted();
}

客户端流式异步请求,服务端流式返回

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public StreamObserver<RouteNote> routeChat(StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(RouteNote value) {
// 流式返回
responseObserver.onNext(RouteNote.newBuilder().build());
}

@Override
public void onError(Throwable t) {
// ...
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 处理服务器返回的响应
StreamObserver<RouteNote> responseStreamObserver = new StreamObserver<>() {
@Override
public void onNext(RouteNote value) {
logger.info(value.getMessage());
}
@Override
public void onError(Throwable t) {

}
@Override
public void onCompleted() {

}
};
StreamObserver<RouteNote> requestStreamObserver = stub.routeChat(responseStreamObserver);
// 流式请求
requestStreamObserver.onNext(RouteNote.newBuilder().build());
requestStreamObserver.onNext(RouteNote.newBuilder().build());
requestStreamObserver.onCompleted();