本篇為【寫給go開發者的gRPC教程系列】第二篇
第一篇:protobuf基礎
第二篇:通信模式 👈
第三篇:攔截器
第四篇:錯誤處理
上一篇介紹瞭如何編寫 protobuf 的 idl,並使用 idl 生成了 gRPC 的代碼,現在來看看如何編寫客户端和服務端的代碼
Simple RPC (Unary RPC)
syntax = "proto3";
package ecommerce;
import "google/protobuf/wrappers.proto";
option go_package = "ecommerce/";
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
定義如上的 idl,需要關注幾個事項
- 使用
protobuf最新版本syntax = "proto3"; protoc-gen-go要求 pb 文件必須指定 go 包的路徑。即option go_package = "ecommerce/";- 定義的
method僅能有一個入參和出參數。如果需要傳遞多個參數需要定義成message - 使用
import引用另外一個文件的 pb。google/protobuf/wrappers.proto是 google 內置的類型
生成 go 和 grpc 的代碼
$ protoc -I ./pb \
--go_out ./ecommerce --go_opt paths=source_relative \
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
./pb/product.proto
ecommerce
├── product.pb.go
└── product_grpc.pb.go
pb
└── product.proto
server 實現
1、由 pb 文件生成的 gRPC 代碼中包含了 service 的接口定義,它和我們定義的 idl 是吻合的
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementServer interface {
GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
mustEmbedUnimplementedOrderManagementServer()
}
2、我們的業務邏輯就是實現這個接口
package main
import (
"context"
"log"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
)
var _ pb.OrderManagementServer = &OrderManagementImpl{}
var orders = make(map[string]pb.Order)
type OrderManagementImpl struct {
pb.UnimplementedOrderManagementServer
}
// Simple RPC
func (s *OrderManagementImpl) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) {
ord, exists := orders[orderId.Value]
if exists {
return &ord, status.New(codes.OK, "").Err()
}
return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}
3、在實現完業務邏輯之後,我們可以創建並啓動服務
package main
import (
"net"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
"google.golang.org/grpc"
)
func main() {
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
lis, err := net.Listen("tcp", ":8009")
if err != nil {
panic(err)
}
if err := s.Serve(lis); err != nil {
panic(err)
}
}
服務端代碼實現的流程如下
client 實現
1、由 pb 文件生成的 gRPC 代碼中包含了 client 的實現,它和我們定義的 idl 也是吻合的
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type orderManagementClient struct {
cc grpc.ClientConnInterface
}
func NewOrderManagementClient(cc grpc.ClientConnInterface) OrderManagementClient {
return &orderManagementClient{cc}
}
func (c *orderManagementClient) GetOrder(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error) {
out := new(Order)
err := c.cc.Invoke(ctx, "/ecommerce.OrderManagement/getOrder", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
2、直接使用 client 來進行 rpc 調用
package main
import (
"context"
"log"
"time"
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8009", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Get Order
retrievedOrder, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "101"})
if err != nil {
panic(err)
}
log.Print("GetOrder Response -> : ", retrievedOrder)
}
客户端代碼實現的流程如下
小總結
✨ 前文提到過protobuf協議是平台無關的。演示的客户端和服務端都是 golang 的,即使客户端和服務端不同語言也是類似的可以通信的
✨ 對於上面介紹的的這種類似於http1.x的模式:客户端發送請求,服務端響應請求,一問一答的模式在 gRPC 裏叫做Simple RPC (也稱Unary RPC)。gRPC 同時也支持其他類型的交互方式。
Server-Streaming RPC 服務器端流式 RPC
服務器端流式 RPC,顯然是單向流,並代指 Server 為 Stream 而 Client 為普通 RPC 請求
簡單來講就是客户端發起一次普通的 RPC 請求,服務端通過流式響應多次發送數據集,客户端 Recv 接收數據集。大致如圖:
pb 定義
syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
service OrderManagement {
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}
server 實現
✨ 注意與Simple RPC的區別:因為我們的服務端是流式響應的,因此對於服務端來説函數入參多了一個stream OrderManagement_SearchOrdersServer參數用來寫入多個響應,可以把它看作是客户端的對象
✨ 可以通過調用這個流對象的Send(...),來往客户端寫入數據
✨ 通過返回nil或者error來表示全部數據寫完了
func (s *server) SearchOrders(query *wrapperspb.StringValue,
stream pb.OrderManagement_SearchOrdersServer) error {
for _, order := range orders {
for _, str := range order.Items {
if strings.Contains(str, query.Value) {
err := stream.Send(&order)
if err != nil {
return fmt.Errorf("error send: %v", err)
}
}
}
}
return nil
}
client 實現
✨ 注意與Simple RPC的區別:因為我們的服務端是流式響應的,因此 RPC 函數返回值stream是一個流,可以把它看作是服務端的對象
✨ 使用stream的Recv函數來不斷從服務端接收數據
✨ 當Recv返回io.EOF代表流已經結束
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
stream, err := c.SearchOrders(ctx, &wrapperspb.StringValue{Value: "Google"})
if err != nil{
panic(err)
}
for{
order, err := stream.Recv()
if err == io.EOF{
break
}
log.Println("Search Result: ", order)
}
小總結
Client-Streaming RPC 客户端流式 RPC
客户端流式 RPC,顯然也是單向流,客户端通過流式發起多次 RPC 請求給服務端,服務端發起一次響應給客户端,大致如圖:
服務端沒有必要等到客户端發送完所有請求再響應,可以在收到部分請求之後就響應
pb 定義
syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
service OrderManagement {
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}
server 實現
✨ 注意與Simple RPC的區別:因為我們的客户端是流式請求的,因此請求參數stream OrderManagement_UpdateOrdersServer就是流對象
✨ 可以從stream OrderManagement_UpdateOrdersServer的Recv函數讀取消息
✨ 當Recv返回io.EOF代表流已經結束
✨ 使用stream OrderManagement_UpdateOrdersServer的SendAndClose函數關閉併發送響應
// 在這段程序中,我們對每一個 Recv 都進行了處理
// 當發現 io.EOF (流關閉) 後,需要將最終的響應結果發送給客户端,同時關閉正在另外一側等待的 Recv
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv()
if err == io.EOF {
// Finished reading the order stream.
return stream.SendAndClose(
&wrapperspb.StringValue{Value: "Orders processed " + ordersStr})
}
// Update order
orders[order.Id] = *order
log.Println("Order ID ", order.Id, ": Updated")
ordersStr += order.Id + ", "
}
}
Client 實現
✨ 注意與Simple RPC的區別:因為我們的客户端是流式響應的,因此 RPC 函數返回值stream是一個流
✨ 可以通過調用這個流對象的Send(...),來往這個對象寫入數據
✨ 使用stream的CloseAndRecv函數關閉併發送響應
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
stream, err := c.UpdateOrders(ctx)
if err != nil {
panic(err)
}
if err := stream.Send(&pb.Order{
Id: "00",
Items: []string{"A", "B"},
Description: "A with B",
Price: 0.11,
Destination: "ABC",
}); err != nil {
panic(err)
}
if err := stream.Send(&pb.Order{
Id: "01",
Items: []string{"C", "D"},
Description: "C with D",
Price: 1.11,
Destination: "ABCDEFG",
}); err != nil {
panic(err)
}
res, err := stream.CloseAndRecv()
if err != nil {
panic(err)
}
log.Printf("Update Orders Res : %s", res)
小總結
Bidirectional-Streaming RPC 雙向流式 RPC
雙向流式 RPC,顧名思義是雙向流。由客户端以流式的方式發起請求,服務端同樣以流式的方式響應請求
首個請求一定是 Client 發起,但具體交互方式(誰先誰後、一次發多少、響應多少、什麼時候關閉)根據程序編寫的方式來確定(可以結合協程)
假設該雙向流是按順序發送的話,大致如圖:
pb 定義
syntax = "proto3";
package ecommerce;
option go_package = "ecommerce/";
import "google/protobuf/wrappers.proto";
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
message CombinedShipment {
string id = 1;
string status = 2;
repeated Order orderList = 3;
}
service OrderManagement {
rpc processOrders(stream google.protobuf.StringValue)
returns (stream CombinedShipment);
}
server 實現
✨ 函數入參OrderManagement_ProcessOrdersServer是用來寫入多個響應和讀取多個消息的對象引用
✨ 可以通過調用這個流對象的Send(...),來往這個對象寫入響應
✨ 可以通過調用這個流對象的Recv(...)函數讀取消息,當Recv返回io.EOF代表流已經結束
✨ 通過返回nil或者error表示全部數據寫完了
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMarker := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for {
orderId, err := stream.Recv()
log.Printf("Reading Proc order : %s", orderId)
if err == io.EOF {
log.Printf("EOF : %s", orderId)
for _, shipment := range combinedShipmentMap {
if err := stream.Send(&shipment); err != nil {
return err
}
}
return nil
}
if err != nil {
log.Println(err)
return err
}
destination := orders[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]
if found {
ord := orders[orderId.GetValue()]
shipment.OrderList = append(shipment.OrderList, &ord)
combinedShipmentMap[destination] = shipment
} else {
comShip := pb.CombinedShipment{Id: "cmb - " + (orders[orderId.GetValue()].Destination), Status: "Processed!"}
ord := orders[orderId.GetValue()]
comShip.OrderList = append(shipment.OrderList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrderList), comShip.GetId())
}
if batchMarker == orderBatchSize {
for _, comb := range combinedShipmentMap {
log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList))
if err := stream.Send(&comb); err != nil {
return err
}
}
batchMarker = 0
combinedShipmentMap = make(map[string]pb.CombinedShipment)
} else {
batchMarker++
}
}
}
Client 實現
✨ 函數返回值OrderManagement_ProcessOrdersClient是用來獲取多個響應和寫入多個消息的對象引用
✨ 可以通過調用這個流對象的Send(...),來往這個對象寫入響應
✨ 可以通過調用這個流對象的Recv(...)函數讀取消息,當Recv返回io.EOF代表流已經結束
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
stream, err := c.ProcessOrders(ctx)
if err != nil {
panic(err)
}
go func() {
if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil {
panic(err)
}
if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil {
panic(err)
}
if err := stream.CloseSend(); err != nil {
panic(err)
}
}()
for {
combinedShipment, err := stream.Recv()
if err == io.EOF {
break
}
log.Println("Combined shipment : ", combinedShipment.OrderList)
}
小總結
雙向流相對還是比較複雜的,大部分場景都是使用事件機制進行異步交互,需要精心的設計
示例代碼
https://github.com/liangwt/gr...