gRPC 是一個高性能、開源的遠程過程調用(RPC)框架,由 Google 開發。它旨在提供跨語言的通信能力,適用於從移動設備到數據中心服務器的各種環境。
1. 核心概念
-
Protocol Buffers(protobuf):
- gRPC 使用 Protocol Buffers 作為其接口描述語言和數據序列化協議。開發者通過定義
.proto文件來指定服務和消息格式。Protocol Buffers 提供了一種緊湊的二進制格式,支持快速序列化和反序列化。
- gRPC 使用 Protocol Buffers 作為其接口描述語言和數據序列化協議。開發者通過定義
-
HTTP/2 協議:
- gRPC 基於 HTTP/2 進行通信。HTTP/2 提供了多路複用、頭部壓縮、流控制和雙向流等特性,顯著提高了傳輸效率和性能。
-
服務定義和代碼生成:
- 開發者在
.proto文件中定義服務和 RPC 方法。gRPC 編譯器protoc根據這些定義生成客户端和服務器端的代碼存根,簡化了開發工作。
- 開發者在
-
多種通信模式:
gRPC 支持四種服務方法類型,這使得 gRPC 可以適應多種應用場景:- 單一響應 RPC
- 服務器流式 RPC
- 客户端流式 RPC
- 雙向流式 RPC
-
安全性:
- gRPC 支持 TLS/SSL 加密,提供安全的通信通道。它還支持多種身份驗證機制。
2. 實現原理
- 服務定義和編譯
-
定義服務:在
.proto文件中定義服務接口和消息類型。syntax = "proto3"; service Greeter { rpc SayHello (HelloRequest) returns (HelloResponse); } message HelloRequest { string name = 1; } message HelloResponse { string message = 1; } - 編譯生成代碼:使用
protoc編譯器生成客户端和服務器代碼。這些代碼包含了與服務交互所需的存根和骨架。
- 客户端和服務器的實現
-
服務器端:實現生成的服務接口,定義具體的業務邏輯。啓動 gRPC 服務器,註冊服務。
public class GreeterImpl extends GreeterGrpc.GreeterImplBase { @Override public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) { HelloResponse response = HelloResponse.newBuilder() .setMessage("Hello, " + req.getName()) .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } -
客户端:使用生成的客户端存根與服務器交互。客户端可以選擇同步或異步調用。
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051) .usePlaintext() .build(); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel); HelloResponse response = stub.sayHello(HelloRequest.newBuilder().setName("World").build());
- 傳輸層:基於 Netty 的實現
- Netty 集成:gRPC 的 Java 實現使用 Netty 作為傳輸層框架。Netty 提供了異步事件驅動的 I/O 模型,支持高效的網絡通信。
- HTTP/2 支持:Netty 提供對 HTTP/2 的支持,使得 gRPC 可以利用 HTTP/2 的特性,如多路複用和頭部壓縮。
- 數據序列化和反序列化
- gRPC 使用 Protocol Buffers 進行消息的序列化和反序列化。Protocol Buffers 提供了高效的二進制格式,減少了數據傳輸的開銷。
- 攔截器和中間件
- gRPC 支持攔截器機制,允許在請求和響應的生命週期中插入自定義邏輯。這類似於中間件,可以用於實現日誌記錄、認證、審計等功能。
- 負載均衡和服務發現
- gRPC 提供了多種負載均衡策略,並且可以與服務發現機制集成。客户端可以自動選擇最佳的服務器實例來處理請求。
3. 代碼示例
要在 Java 項目中使用 gRPC,你需要設置 Maven 項目並添加必要的依賴項,然後編寫服務和客户端代碼。以下是一個簡單的示例,展示如何在 Java 中使用 gRPC。
3.1. 設置 Maven 項目
首先,確保你的 pom.xml 文件包含以下 gRPC 和 Protocol Buffers 相關的依賴和插件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>grpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<grpc.version>1.56.0</grpc.version>
<protobuf.version>3.21.12</protobuf.version>
<os.detected.classifier>osx-x86_64</os.detected.classifier>
</properties>
<dependencies>
<!-- gRPC dependencies -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- Protocol Buffers -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<!-- Protobuf Maven Plugin -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- OS Maven Plugin -->
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</plugin>
</plugins>
</build>
</project>
os.detected.classifier 與使用的系統有關
3.2. 定義 Protocol Buffers 文件
首先,定義 .proto 文件以支持各種類型的流:
在 src/main/proto 目錄下創建一個文件 hello.proto:
syntax = "proto3";
option java_package = "com.example.grpc";
option java_outer_classname = "HelloProto";
service Greeter {
// 單一響應
rpc SayHello (HelloRequest) returns (HelloReply) {}
// 客户端流
rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}
// 服務器流
rpc SayHelloServerStream (HelloRequest) returns (stream HelloReply) {}
// 雙向流
rpc SayHelloBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
基於 .proto 生成 Java 文件
在命令行中運行以下命令來生成 Java 類:
mvn clean compile
這將使用 protobuf-maven-plugin 生成 gRPC 服務和消息類。
生成類在 /target目錄下,即 /target/generated-sources/protobuf/grpc-java/org/example/grpc
3.3. 實現 gRPC 服務
在服務端實現這些不同類型的流,同時使用攔截器來處理請求頭:
import io.grpc.*;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.StringUtil;
import org.example.grpc.interceptor.ServerHeaderInterceptor;
import org.example.grpc.interceptor.ServerLoggingInterceptor;
import java.io.IOException;
import java.util.logging.Logger;
public class HelloServer {
private static final Logger logger = Logger.getLogger(HelloServer.class.getName());
private Server server;
public static void main(String[] args) throws IOException, InterruptedException {
final HelloServer server = new HelloServer();
server.start();
server.blockUntilShutdown();
}
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.intercept(new ServerHeaderInterceptor()) // 添加攔截器
.intercept(new ServerLoggingInterceptor())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloServer.this.stop();
System.err.println("*** server shut down");
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
/**
* 單一響應
*/
@Override
public void sayHello(HelloProto.HelloRequest req, StreamObserver<HelloProto.HelloReply> responseObserver) {
System.out.println("【單一響應】收到消息 - " + req.getName());
HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
.setMessage("【單一響應】Hello " + req.getName())
.build();
System.out.println("【單一響應】響應消息 - " + req.getName());
responseObserver.onNext(reply);
System.out.println("【單一響應】服務端發起關閉");
responseObserver.onCompleted();
}
/**
* 客户端流
*/
@Override
public StreamObserver<HelloProto.HelloRequest> sayHelloClientStream(final StreamObserver<HelloProto.HelloReply> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
StringBuilder names = new StringBuilder();
@Override
public void onNext(HelloProto.HelloRequest request) {
if (names.length() > 0) {
names.append(", ");
}
System.out.println("【客户端流】收到消息 - " + request.getName());
names.append(request.getName());
}
@Override
public void onError(Throwable t) {
logger.warning("Error in client stream: " + t);
}
@Override
public void onCompleted() {
System.out.println("【客户端流】收到客户端已關閉消息");
HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
.setMessage("【客户端流】Hello " + names.toString())
.build();
System.out.println("【客户端流】響應消息 - " + names.toString());
responseObserver.onNext(reply);
System.out.println("【客户端流】服務端發起關閉");
responseObserver.onCompleted();
}
};
}
/**
* 服務端流
*/
@Override
public void sayHelloServerStream(HelloProto.HelloRequest req, StreamObserver<HelloProto.HelloReply> responseObserver) {
for (int i = 0; i < 5; i++) {
String msg = req.getName() + StringUtil.SPACE + i;
HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
.setMessage("【服務端流】Hello " + msg)
.build();
System.out.println("【服務端流】響應消息 - " + msg);
responseObserver.onNext(reply);
}
System.out.println("【服務端流】服務端發起關閉");
responseObserver.onCompleted();
}
/**
* 雙向流
*/
@Override
public StreamObserver<HelloProto.HelloRequest> sayHelloBidirectionalStream(final StreamObserver<HelloProto.HelloReply> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
@Override
public void onNext(HelloProto.HelloRequest request) {
HelloProto.HelloReply reply = HelloProto.HelloReply.newBuilder()
.setMessage("Hello " + request.getName())
.build();
System.out.println("【雙向流】收到消息 - " + request.getName());
System.out.println("【雙向流】回覆消息 - " + request.getName());
responseObserver.onNext(reply);
}
@Override
public void onError(Throwable t) {
logger.warning("Error in bidirectional stream: " + t);
}
@Override
public void onCompleted() {
System.out.println("【雙向流】收到客户端已關閉消息");
System.out.println("【雙向流】服務端發起關閉");
responseObserver.onCompleted();
}
};
}
}
}
3.4. 實現 gRPC 客户端
編寫客户端代碼來調用這些不同類型的流:
抱歉,以下是完整的客户端代碼示例,包括所有類型的流調用:
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.example.grpc.interceptor.ClientLoggingInterceptor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class HelloClient {
private static final Logger logger = Logger.getLogger(HelloClient.class.getName());
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private final GreeterGrpc.GreeterStub asyncStub;
public HelloClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.intercept(new ClientLoggingInterceptor())
.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
asyncStub = GreeterGrpc.newStub(channel);
}
public static void main(String[] args) throws Exception {
HelloClient client = new HelloClient("localhost", 50051);
try {
client.greet("world");
client.greetClientStream("Alice", "Bob", "Charlie");
client.greetServerStream("Dave");
client.greetBidirectionalStream("Eve", "Frank");
} finally {
client.shutdown();
}
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* 單一響應
*/
public void greet(String name) {
HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
HelloProto.HelloReply response;
try {
System.out.println("【單一響應】發送消息: " + name);
response = blockingStub.sayHello(request);
System.out.println("【單一響應】收到響應: " + response.getMessage());
} catch (Exception e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getMessage());
}
}
/**
* 客户端流
*/
public void greetClientStream(String... names) {
StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
@Override
public void onNext(HelloProto.HelloReply value) {
System.out.println("【客户端流】收到響應: " + value.getMessage());
}
@Override
public void onError(Throwable t) {
logger.warning("【客户端流】Error in client stream: " + t);
}
@Override
public void onCompleted() {
System.out.println("【客户端流】收到服務端已關閉消息");
}
};
StreamObserver<HelloProto.HelloRequest> requestObserver = asyncStub.sayHelloClientStream(responseObserver);
for (String name : names) {
HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
System.out.println("【客户端流】發送消息 - " + name);
requestObserver.onNext(request);
}
System.out.println("【客户端流】客户端發起關閉");
requestObserver.onCompleted();
}
/**
* 服務端流
*/
public void greetServerStream(String name) {
HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
@Override
public void onNext(HelloProto.HelloReply value) {
System.out.println("【服務端流】收到響應: " + value.getMessage());
}
@Override
public void onError(Throwable t) {
logger.warning("【服務端流】Error in server stream: " + t);
}
@Override
public void onCompleted() {
System.out.println("【服務端流】收到服務端已關閉消息");
}
};
asyncStub.sayHelloServerStream(request, responseObserver);
}
/**
* 雙向流
*/
public void greetBidirectionalStream(String... names) {
StreamObserver<HelloProto.HelloReply> responseObserver = new StreamObserver<HelloProto.HelloReply>() {
@Override
public void onNext(HelloProto.HelloReply value) {
System.out.println("【雙向流】收到響應: " + value.getMessage());
}
@Override
public void onError(Throwable t) {
logger.warning("【雙向流】Error in bidirectional stream: " + t);
}
@Override
public void onCompleted() {
System.out.println("【雙向流】收到服務端已關閉消息");
}
};
StreamObserver<HelloProto.HelloRequest> requestObserver = asyncStub.sayHelloBidirectionalStream(responseObserver);
for (String name : names) {
HelloProto.HelloRequest request = HelloProto.HelloRequest.newBuilder().setName(name).build();
requestObserver.onNext(request);
}
System.out.println("【雙向流】客户端發起關閉");
requestObserver.onCompleted();
}
}
3.5. 實現攔截器
gRPC 支持攔截器來處理請求頭等。以下是一個簡單的攔截器示例:
服務端攔截器 1
import io.grpc.*;
public class ServerHeaderInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
System.out.println("【ServerHeaderInterceptor】Received headers: " + headers);
// 可以在這裏檢查或修改請求頭
return next.startCall(call, headers);
}
}
服務端攔截器 2
import io.grpc.*;
public class ServerLoggingInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
System.out.println("【ServerLoggingInterceptor】Server received call to method: " + call.getMethodDescriptor().getFullMethodName());
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
@Override
public void onMessage(ReqT message) {
System.out.println("【ServerLoggingInterceptor】Server received message: " + message);
super.onMessage(message);
}
@Override
public void onHalfClose() {
System.out.println("【ServerLoggingInterceptor】Server stream half-closed");
super.onHalfClose();
}
@Override
public void onComplete() {
System.out.println("【ServerLoggingInterceptor】Server stream completed");
super.onComplete();
}
@Override
public void onCancel() {
System.out.println("【ServerLoggingInterceptor】Server stream cancelled");
super.onCancel();
}
@Override
public void onReady() {
System.out.println("【ServerLoggingInterceptor】Server stream ready");
super.onReady();
}
};
}
}
客户端攔截器 1
import io.grpc.*;
public class ClientLoggingInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
System.out.println("【ClientLoggingInterceptor】Client sending request to method: " + method.getFullMethodName());
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
System.out.println("【ClientLoggingInterceptor】Client received message: " + message);
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
System.out.println("【ClientLoggingInterceptor】Client call closed with status: " + status);
super.onClose(status, trailers);
}
}, headers);
}
@Override
public void sendMessage(ReqT message) {
System.out.println("【ClientLoggingInterceptor】Client sending message: " + message);
super.sendMessage(message);
}
};
}
}
3.6. 運行服務和客户端
啓動服務器
- 編譯並運行
HelloWorldServer的main方法。 - 服務器將在
localhost的端口50051上啓動並開始監聽。
啓動客户端
- 編譯並運行
HelloWorldClient的main方法。 -
客户端將連接到服務器並執行以下操作:
- 使用單一響應調用
SayHello。 - 使用客户端流調用
SayHelloClientStream。 - 使用服務器流調用
SayHelloServerStream。 - 使用雙向流調用
SayHelloBidirectionalStream。
- 使用單一響應調用
使用測試化工具作為客户端
現在很多測試化工具以及支持調試 gRPC 服務了,例如 Apifox,只需要將 .proto 接口描述文件上傳到客户端,就可以自動生成客户端服務。
4. 代碼解釋
4.1. 同步與異步
GreeterBlockingStub:用於同步阻塞調用,適合簡單的請求-響應模式,不需要高併發。GreeterStubdd:用於異步非阻塞調用,適合高併發和複雜流式通信。
選擇使用哪種存根類,取決於你的應用需求和併發要求。在需要處理大量併發請求或複雜流式通信的場景下,GreeterStub 更加合適。而對於簡單的請求-響應通信且不關心阻塞的場景,GreeterBlockingStub 則提供了更簡單的編程模型。
4.1.1.GreeterGrpc.GreeterBlockingStub
-
同步調用:
GreeterBlockingStub提供同步的阻塞調用。這意味着當你調用一個 RPC 方法時,調用線程會阻塞,直到服務器返回響應或發生錯誤。
-
使用場景:
- 適用於簡單的
請求-響應模式,客户端可以承受阻塞調用。 - 適用於需要按順序處理請求,並且不需要高併發的場景。
- 適用於簡單的
-
優點:
- 編程簡單,易於理解和使用。
- 適合不需要高併發的簡單應用。
-
缺點:
- 調用是阻塞的,可能會導致資源浪費,尤其是在需要處理大量併發請求時。
-
示例使用:
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051) .usePlaintext() .build(); GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel); HelloRequest request = HelloRequest.newBuilder().setName("World").build(); HelloResponse response = blockingStub.sayHello(request); System.out.println("Greeting: " + response.getMessage());
4.1.2.GreeterGrpc.GreeterStub
-
異步調用:
GreeterStub提供異步的非阻塞調用。調用一個 RPC 方法時,方法會立即返回,結果會在回調中處理。
-
使用場景:
- 適用於需要高併發和低延遲的場景。
- 適合複雜的流式通信模式(如
客户端流、服務器流和雙向流)。
-
優點:
- 非阻塞調用,提高了系統的併發能力和資源利用率。
- 更適合實時性要求高的應用。
-
缺點:
- 編程相對複雜,需要處理回調或使用異步框架來管理響應。
-
示例使用:
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051) .usePlaintext() .build(); GreeterGrpc.GreeterStub asyncStub = GreeterGrpc.newStub(channel); HelloRequest request = HelloRequest.newBuilder().setName("World").build(); asyncStub.sayHello(request, new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse response) { System.out.println("Greeting: " + response.getMessage()); } @Override public void onError(Throwable t) { System.err.println("Error: " + t); } @Override public void onCompleted() { System.out.println("Completed"); } });
4.2. onCompleted
在 gRPC 中,當服務器端調用 StreamObserver#onCompleted() 時,客户端會收到通知。這是 gRPC 框架處理流式通信的一部分,用於指示流的正常結束。
-
服務器端調用
onCompleted():- 當服務器完成了對所有消息的處理併發送完所有需要的響應消息後,服務器端會調用
responseObserver.onCompleted()。 - 這表示服務器端的響應流已經結束,沒有更多的消息會發送到客户端。
- 當服務器完成了對所有消息的處理併發送完所有需要的響應消息後,服務器端會調用
-
客户端接收
onCompleted()通知:- 客户端的
StreamObserver實現會收到onCompleted()調用的通知。 - 客户端可以在
onCompleted()方法中執行一些收尾工作,比如釋放資源或更新狀態。
- 客户端的
4.2.1. 調用場景
1. 單一響應(Unary RPC)
- 客户端和服務器端:在單一響應模式中,
onCompleted()不需要顯式調用。請求和響應都是單個消息,gRPC 框架會自動處理流的結束。
2. 客户端流(Client Streaming RPC)
- 客户端:需要調用
onCompleted()。客户端在發送完所有請求消息後,應調用onCompleted()來通知服務器請求流已結束。服務器在接收到onCompleted()後開始處理請求。 - 服務器端:不需要顯式調用
onCompleted()。服務器端在處理完所有請求後返回單個響應,流會自動結束。
3. 服務器流(Server Streaming RPC)
- 客户端:不需要調用
onCompleted()。客户端發送單個請求後,等待服務器的響應流。 - 服務器端:需要調用
onCompleted()。服務器在發送完所有響應消息後,調用onCompleted()來通知客户端響應流已結束。
4. 雙向流(Bidirectional Streaming RPC)
- 客户端:需要調用
onCompleted()。客户端在發送完所有請求消息後,應調用onCompleted()來通知服務器請求流已結束。 - 服務器端:需要調用
onCompleted()。服務器在發送完所有響應消息後,調用onCompleted()來通知客户端響應流已結束。
5. 編程規範
當然,在 gRPC 的單一響應模式中,雖然框架會自動處理流的結束,但明確調用 responseObserver.onCompleted() 是一個良好的編程實踐。這麼做有幾個原因:
- 明確性和可讀性:即使框架會自動處理調用,明確調用
onCompleted()可以讓代碼更具可讀性,清晰地表明流在此處結束。這有助於其他開發人員理解代碼邏輯。 - 一致性:在所有流模式中顯式調用
onCompleted()可以保持代碼風格的一致性。這有助於開發人員在處理不同類型的流時使用相同的模式和習慣。 - 未來的兼容性:雖然當前版本的 gRPC 可能在某些情況下自動處理
onCompleted(),但顯式調用確保代碼在未來版本中的行為是一致的。 - 自定義行為:在某些情況下,你可能需要在調用
onCompleted()之前執行一些特定的邏輯或清理操作。顯式調用onCompleted()讓你有機會在結束流之前插入這些操作。
雖然在單一響應模式下顯式調用 onCompleted() 並不是絕對必要的,但它仍然是一個推薦的做法,尤其是在團隊開發中,有助於代碼的維護和理解。