博客 / 詳情

返回

gRPC實戰--如何在NodeJS中有效使用gRPC流

gRPC實戰包含一系列文章,包括原創和翻譯。最終會形成一個完整的系列,後續會不斷完善,增加新的內容:

  • gRPC簡介:why,what,how?
  • gRPC服務健康檢查最佳實踐
  • Kubernetes中使用envoy負載均衡gRPC流量
  • 用Golang編寫通過gRPC進行通信的服務
  • 如何在NodeJS中有效使用gRPC流

=============================================================

g10.png

本文將説明如何在NodeJS應用程序的GRPC中使用流。

什麼是gRPC中的流

gRPC中的流可幫助我們在單個RPC調用中發送消息流。

g11.png

gRPC 的流式,分為三種類型:

  • server-side streaming RPC:服務器端流式 RPC
  • Client-side streaming RPC:客户端流式 RPC
  • Bidirectional streaming RPC:雙向流式 RPC

gRPC中的流使用場景

  • 大規模數據包
  • 實時場景

在本文中,我們將重點關注以下流:

  • Server Streaming GRPC:在這種情況下,客户端向服務器發出單個請求,服務器將消息流發送回客户端。
  • Client Streaming GRPC:在這種情況下,客户端將消息流發送到服務器。然後,服務器處理流並將單個響應發送回客户端。

Server Streaming gRPC

現在讓我們為服務器流gRPC創建服務器和客户端代碼。

創建 .proto 文件

創建一個名為proto的文件夾。在該文件夾中創建一個名為employee.proto的文件。將以下內容複製到employee.proto中:

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

請參閲我的grpc基礎文章,以瞭解有關.proto文件和協議緩衝區的更多信息。

在這裏,我們創建一個名為paySalary的rpc,它接受EmployeeRequest作為請求併發送EmployeeResponse流作為響應。我們使用關鍵字流來指示服務器將發送消息流。

上面也定義了EmployeeRequestEmployeeResponse。 repeate關鍵字表示將發送數據列表。

在此示例中,paySalary的請求將是員工ID的列表。服務器將通過一條消息流做出響應,告知是否已向員工支付薪水。

為服務器創建虛擬數據

創建一個名為data.js的文件,並將以下代碼複製到其中。

//Hardcode some data for employees
let employees = [{
    id: 1,
    email: "abcd@abcd.com",
    firstName: "First1",
    lastName: "Last1"   
},
{
    id: 2,
    email: "xyz@xyz.com",
    firstName: "First2",
    lastName: "Last2"   
},
{
    id: 3,
    email: "temp@temp.com",
    firstName: "First3",
    lastName: "Last3"   
},
];

exports.employees = employees;

我們將其用作服務器的數據源。

創建Server

創建一個名為server.js的文件。將以下代碼複製到server.js中

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

接下來,將以下代碼片段添加到server.js中

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的腳本中,我們將啓動GRPC Server並將Employee Service和paySalary實現一起添加到其中。

但是paySalary函數在pay_salary.js文件中定義。

讓我們創建一個pay_salary.js文件。

將以下腳本添加到pay_salary.js文件中

let { employees } = require('./data.js');
const _ = require('lodash');

function paySalary(call) {
    let employeeIdList = call.request.employeeIdList;
  
    _.each(employeeIdList, function (employeeId) {
      let employee = _.find(employees, { id: employeeId });
      if (employee != null) {
        let responseMessage = "Salary paid for ".concat(
          employee.firstName,
          ", ",
          employee.lastName);
        call.write({ message: responseMessage });
      }
      else{
        call.write({message: "Employee with Id " + employeeId + " not found in record"});
      }
  
    });
    call.end();
  
}
exports.paySalary = paySalary;

paySalary函數將調用作為輸入。 call.request將包含客户端發送的請求。

call.request.employeeIdList將包含客户端發送的員工ID的列表。

然後,我們遍歷EmployeeId,併為每個員工ID進行一些處理。

對於每個員工ID,我們最後都調用call.write函數。 call.write將在流中將單個消息寫回到客户端。

在這種情況下,對於每位員工,call.write都會發回工資是否已經支付。

處理完所有員工編號後,我們將調用call.end函數。 call.end指示流已完成。

這是最終的server.js文件

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

創建Client

創建一個名為client_grpc_server_stream.js的文件。將以下代碼複製到文件中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

接下來,將以下腳本片段添加到客户端。

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

client變量將具有存根,這將有助於我們在服務器中調用該函數。

employeeIdList是提供給服務器的輸入。

let call = client.paySalary({employeeIdList: employeeIdList}); 腳本調用服務器中的paySalary函數,並將employeeIdList作為請求傳遞。由於服務器將要發送消息流,因此調用對象將幫助我們偵聽流事件。

我們會偵聽呼叫對象中的“數據”事件,以查看流中來自服務器的任何消息。如下面的腳本所示。

call.on('data',function(response){
    console.log(response.message);
  });

在這裏,只要我們從服務器收到任何消息,我們就只打印響應消息。

我們在調用對象中偵聽“結束”事件,以瞭解服務器流何時結束。如下面的腳本所示。

call.on('end',function(){
    console.log('All Salaries have been paid');
  });

在此流結束時,我們正在打印“已支付所有薪水”。

這是client_gprc_server_stream.js的完整代碼。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

運行代碼

打開命令提示符,然後使用以下腳本啓動服務器。

node server.js

打開一個新的命令提示符,並使用以下腳本運行客户端。

node client_grpc_server_stream.js   

在運行客户端時,我們將獲得以下輸出。

Salary paid for First1, Last1
Employee with Id 10 not found in record
Salary paid for First2, Last2
All Salaries have been paid

在這種情況下,客户端已向服務器發送了3個Id的1,10,2。服務器一一處理ID,然後將消息流發送給客户端。流中的所有消息完成後,將顯示消息“已支付所有薪水”。

Client Streaming GRPC

現在,讓我們為客户端流GRPC創建服務器和客户端代碼。

創建.proto文件

在先前創建的employee.proto文件中,添加以下內容

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

在這裏,我們添加了一個名為generateReport的新rpc,它接受ReportEmployeeRequest流作為請求並返回ReportEmployeeResponse作為響應。

因此,向rpc輸入的內容是員工ID的流,服務器的響應將是單個響應,其中指出生成了多少報告以及有多少報告失敗。

這是我們更改後的完整的employee.proto文件:

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

創建Server

這是添加了新rpc的完整server.js代碼:

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;


let { paySalary } = require('./pay_salary.js');
let { generateReport } = require('./generate_report.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary ,
      generateReport: generateReport }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的腳本中,我們可以看到我們還向grpc服務器添加了generateReport函數。我們還可以看到generateReport函數來自generate_report.js文件。

創建一個名為generate_report.js的文件。

將以下腳本添加到文件中:

let { employees } = require('./data.js');
const _ = require('lodash');

function generateReport(call, callback){

    let successfulReports = [];
    let failedReports = [];
    call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });
    call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })
}

exports.generateReport = generateReport;

generateReport函數接受兩個輸入,即調用和回調

為了從客户端獲取消息流,我們需要在調用對象中監聽數據事件。這是在以下腳本中完成的。

call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });

來自客户端的每條消息都會調用data事件。該消息存在於employeeStream變量中。收到消息後,我們嘗試生成報告,並確定報告是成功還是失敗。

調用對象上的結束事件表示客户端流已結束。以下代碼顯示瞭如何監聽結束事件。

call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })

在這種情況下,當結束事件發生時,我們將所有成功和失敗報告組合到一個響應對象中,並使用回調對象將其發送回客户端。

創建Client

創建一個名為client_grpc_client_stream.js的文件。將以下腳本添加到其中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

上面的腳本具有與服務器代碼相同的功能。

將以下腳本也添加到client_grpc_client_stream.js

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

讓我們看看上面的腳本在做什麼。

let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

在腳本的這一部分中,我們正在創建一個調用對象並調用generateReport函數。同樣在generateReport函數內部,我們指示客户端一旦收到服務器的響應,應該怎麼做。在這種情況下,我們將打印服務器發送回的成功和失敗報告。

let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

在腳本的以上部分中,我們遍歷了員工ID,並將消息流發送到服務器。我們使用call.write將消息以流的形式發送到服務器。

最後,一旦我們在流中發送了所有消息,就可以使用call.end函數指示流已完成,如下所示:

call.end();

下面給出了client_grpc_client_stream的完整代碼。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

運行代碼

打開命令提示符,然後使用以下腳本啓動服務器。

node server.js

打開一個新的命令提示符,並使用以下腳本運行客户端。

node client_grpc_server_stream.js   

在運行客户端時,我們將獲得以下輸出。

Reports successfully generated for:  First1,First2
Reports failed since Following Employee Id\'s do not exist:  10

在這種情況下,客户端已向服務器發送了3個Id的1,10,2作為消息流。然後,服務器處理流中的消息,並將單個響應發送回客户端,以顯示成功的報告數量和失敗的報告數量。

user avatar snakesss 頭像 jiujuan 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.