gRPC實戰包含一系列文章,包括原創和翻譯。最終會形成一個完整的系列,後續會不斷完善,增加新的內容:
- gRPC簡介:why,what,how?
- gRPC服務健康檢查最佳實踐
- Kubernetes中使用envoy負載均衡gRPC流量
- 用Golang編寫通過gRPC進行通信的服務
- 如何在NodeJS中有效使用gRPC流
=============================================================
本文將説明如何在NodeJS應用程序的GRPC中使用流。
什麼是gRPC中的流
gRPC中的流可幫助我們在單個RPC調用中發送消息流。
gRPC 的流式,分為三種類型:
- server-side streaming RPC:服務器端流式 RPC
- Client-side streaming RPC:客户端流式 RPC
- Bidirectional streaming RPC:雙向流式 RPC
gRPC中的流使用場景
- 大規模數據包
- 實時場景
在本文中,我們將重點關注以下流:
- Server Streaming GRPC:在這種情況下,客户端向服務器發出單個請求,服務器將消息流發送回客户端。
- Client Streaming GRPC:在這種情況下,客户端將消息流發送到服務器。然後,服務器處理流並將單個響應發送回客户端。
Server Streaming gRPC
現在讓我們為服務器流gRPC創建服務器和客户端代碼。
創建 .proto 文件
創建一個名為proto的文件夾。在該文件夾中創建一個名為employee.proto的文件。將以下內容複製到employee.proto中:
syntax = "proto3";
package employee;
service Employee {
rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
}
message EmployeeRequest {
repeated int32 employeeIdList = 1;
}
message EmployeeResponse{
string message = 1;
}
請參閲我的grpc基礎文章,以瞭解有關.proto文件和協議緩衝區的更多信息。
在這裏,我們創建一個名為paySalary的rpc,它接受EmployeeRequest作為請求併發送EmployeeResponse流作為響應。我們使用關鍵字流來指示服務器將發送消息流。
上面也定義了EmployeeRequest和EmployeeResponse。 repeate關鍵字表示將發送數據列表。
在此示例中,paySalary的請求將是員工ID的列表。服務器將通過一條消息流做出響應,告知是否已向員工支付薪水。
為服務器創建虛擬數據
創建一個名為data.js的文件,並將以下代碼複製到其中。
//Hardcode some data for employees
let employees = [{
id: 1,
email: "abcd@abcd.com",
firstName: "First1",
lastName: "Last1"
},
{
id: 2,
email: "xyz@xyz.com",
firstName: "First2",
lastName: "Last2"
},
{
id: 3,
email: "temp@temp.com",
firstName: "First3",
lastName: "Last3"
},
];
exports.employees = employees;
我們將其用作服務器的數據源。
創建Server
創建一個名為server.js的文件。將以下代碼複製到server.js中
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition)
接下來,將以下代碼片段添加到server.js中
let { paySalary } = require('./pay_salary.js');
function main() {
let server = new grpc.Server();
server.addService(employee_proto.Employee.service,
{ paySalary: paySalary }
);
server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
server.start();
}
main();
在上面的腳本中,我們將啓動GRPC Server並將Employee Service和paySalary實現一起添加到其中。
但是paySalary函數在pay_salary.js文件中定義。
讓我們創建一個pay_salary.js文件。
將以下腳本添加到pay_salary.js文件中
let { employees } = require('./data.js');
const _ = require('lodash');
function paySalary(call) {
let employeeIdList = call.request.employeeIdList;
_.each(employeeIdList, function (employeeId) {
let employee = _.find(employees, { id: employeeId });
if (employee != null) {
let responseMessage = "Salary paid for ".concat(
employee.firstName,
", ",
employee.lastName);
call.write({ message: responseMessage });
}
else{
call.write({message: "Employee with Id " + employeeId + " not found in record"});
}
});
call.end();
}
exports.paySalary = paySalary;
paySalary函數將調用作為輸入。 call.request將包含客户端發送的請求。
call.request.employeeIdList將包含客户端發送的員工ID的列表。
然後,我們遍歷EmployeeId,併為每個員工ID進行一些處理。
對於每個員工ID,我們最後都調用call.write函數。 call.write將在流中將單個消息寫回到客户端。
在這種情況下,對於每位員工,call.write都會發回工資是否已經支付。
處理完所有員工編號後,我們將調用call.end函數。 call.end指示流已完成。
這是最終的server.js文件
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition)
let { paySalary } = require('./pay_salary.js');
function main() {
let server = new grpc.Server();
server.addService(employee_proto.Employee.service,
{ paySalary: paySalary }
);
server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
server.start();
}
main();
創建Client
創建一個名為client_grpc_server_stream.js的文件。將以下代碼複製到文件中。
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
接下來,將以下腳本片段添加到客户端。
function main() {
let client = new employee_proto.Employee('localhost:4500',
grpc.credentials.createInsecure());
let employeeIdList = [1,10,2];
let call = client.paySalary({employeeIdList: employeeIdList});
call.on('data',function(response){
console.log(response.message);
});
call.on('end',function(){
console.log('All Salaries have been paid');
});
}
main();
client變量將具有存根,這將有助於我們在服務器中調用該函數。
employeeIdList是提供給服務器的輸入。
let call = client.paySalary({employeeIdList: employeeIdList}); 腳本調用服務器中的paySalary函數,並將employeeIdList作為請求傳遞。由於服務器將要發送消息流,因此調用對象將幫助我們偵聽流事件。
我們會偵聽呼叫對象中的“數據”事件,以查看流中來自服務器的任何消息。如下面的腳本所示。
call.on('data',function(response){
console.log(response.message);
});
在這裏,只要我們從服務器收到任何消息,我們就只打印響應消息。
我們在調用對象中偵聽“結束”事件,以瞭解服務器流何時結束。如下面的腳本所示。
call.on('end',function(){
console.log('All Salaries have been paid');
});
在此流結束時,我們正在打印“已支付所有薪水”。
這是client_gprc_server_stream.js的完整代碼。
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
function main() {
let client = new employee_proto.Employee('localhost:4500',
grpc.credentials.createInsecure());
let employeeIdList = [1,10,2];
let call = client.paySalary({employeeIdList: employeeIdList});
call.on('data',function(response){
console.log(response.message);
});
call.on('end',function(){
console.log('All Salaries have been paid');
});
}
main();
運行代碼
打開命令提示符,然後使用以下腳本啓動服務器。
node server.js
打開一個新的命令提示符,並使用以下腳本運行客户端。
node client_grpc_server_stream.js
在運行客户端時,我們將獲得以下輸出。
Salary paid for First1, Last1
Employee with Id 10 not found in record
Salary paid for First2, Last2
All Salaries have been paid
在這種情況下,客户端已向服務器發送了3個Id的1,10,2。服務器一一處理ID,然後將消息流發送給客户端。流中的所有消息完成後,將顯示消息“已支付所有薪水”。
Client Streaming GRPC
現在,讓我們為客户端流GRPC創建服務器和客户端代碼。
創建.proto文件
在先前創建的employee.proto文件中,添加以下內容
service Employee {
rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}
message ReportEmployeeRequest {
int32 id = 1;
}
message ReportEmployeeResponse{
string successfulReports = 1;
string failedReports = 2;
}
在這裏,我們添加了一個名為generateReport的新rpc,它接受ReportEmployeeRequest流作為請求並返回ReportEmployeeResponse作為響應。
因此,向rpc輸入的內容是員工ID的流,服務器的響應將是單個響應,其中指出生成了多少報告以及有多少報告失敗。
這是我們更改後的完整的employee.proto文件:
syntax = "proto3";
package employee;
service Employee {
rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}
message EmployeeRequest {
repeated int32 employeeIdList = 1;
}
message EmployeeResponse{
string message = 1;
}
message ReportEmployeeRequest {
int32 id = 1;
}
message ReportEmployeeResponse{
string successfulReports = 1;
string failedReports = 2;
}
創建Server
這是添加了新rpc的完整server.js代碼:
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
let { paySalary } = require('./pay_salary.js');
let { generateReport } = require('./generate_report.js');
function main() {
let server = new grpc.Server();
server.addService(employee_proto.Employee.service,
{ paySalary: paySalary ,
generateReport: generateReport }
);
server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
server.start();
}
main();
在上面的腳本中,我們可以看到我們還向grpc服務器添加了generateReport函數。我們還可以看到generateReport函數來自generate_report.js文件。
創建一個名為generate_report.js的文件。
將以下腳本添加到文件中:
let { employees } = require('./data.js');
const _ = require('lodash');
function generateReport(call, callback){
let successfulReports = [];
let failedReports = [];
call.on('data',function(employeeStream){
let employeeId = employeeStream.id;
let employee = _.find(employees, { id: employeeId });
if (employee != null) {
successfulReports.push(employee.firstName);
}
else{
failedReports.push(employeeId);
}
});
call.on('end',function(){
callback(null,{
successfulReports: successfulReports.join(),
failedReports: failedReports.join()
})
})
}
exports.generateReport = generateReport;
generateReport函數接受兩個輸入,即調用和回調
為了從客户端獲取消息流,我們需要在調用對象中監聽數據事件。這是在以下腳本中完成的。
call.on('data',function(employeeStream){
let employeeId = employeeStream.id;
let employee = _.find(employees, { id: employeeId });
if (employee != null) {
successfulReports.push(employee.firstName);
}
else{
failedReports.push(employeeId);
}
});
來自客户端的每條消息都會調用data事件。該消息存在於employeeStream變量中。收到消息後,我們嘗試生成報告,並確定報告是成功還是失敗。
調用對象上的結束事件表示客户端流已結束。以下代碼顯示瞭如何監聽結束事件。
call.on('end',function(){
callback(null,{
successfulReports: successfulReports.join(),
failedReports: failedReports.join()
})
})
在這種情況下,當結束事件發生時,我們將所有成功和失敗報告組合到一個響應對象中,並使用回調對象將其發送回客户端。
創建Client
創建一個名為client_grpc_client_stream.js的文件。將以下腳本添加到其中。
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
上面的腳本具有與服務器代碼相同的功能。
將以下腳本也添加到client_grpc_client_stream.js。
function main() {
let client = new employee_proto.Employee('localhost:4500',
grpc.credentials.createInsecure());
let call = client.generateReport(function (error, response) {
console.log("Reports successfully generated for: ", response.successfulReports);
console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
});
let employeeIdList = [1, 10, 2];
_.each(employeeIdList, function (employeeId) {
call.write({ id: employeeId });
})
call.end();
}
main();
讓我們看看上面的腳本在做什麼。
let call = client.generateReport(function (error, response) {
console.log("Reports successfully generated for: ", response.successfulReports);
console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
});
在腳本的這一部分中,我們正在創建一個調用對象並調用generateReport函數。同樣在generateReport函數內部,我們指示客户端一旦收到服務器的響應,應該怎麼做。在這種情況下,我們將打印服務器發送回的成功和失敗報告。
let employeeIdList = [1, 10, 2];
_.each(employeeIdList, function (employeeId) {
call.write({ id: employeeId });
})
在腳本的以上部分中,我們遍歷了員工ID,並將消息流發送到服務器。我們使用call.write將消息以流的形式發送到服務器。
最後,一旦我們在流中發送了所有消息,就可以使用call.end函數指示流已完成,如下所示:
call.end();
下面給出了client_grpc_client_stream的完整代碼。
const PROTO_PATH = __dirname + '/proto/employee.proto';
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');
let packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
function main() {
let client = new employee_proto.Employee('localhost:4500',
grpc.credentials.createInsecure());
let call = client.generateReport(function (error, response) {
console.log("Reports successfully generated for: ", response.successfulReports);
console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
});
let employeeIdList = [1, 10, 2];
_.each(employeeIdList, function (employeeId) {
call.write({ id: employeeId });
})
call.end();
}
main();
運行代碼
打開命令提示符,然後使用以下腳本啓動服務器。
node server.js
打開一個新的命令提示符,並使用以下腳本運行客户端。
node client_grpc_server_stream.js
在運行客户端時,我們將獲得以下輸出。
Reports successfully generated for: First1,First2
Reports failed since Following Employee Id\'s do not exist: 10
在這種情況下,客户端已向服務器發送了3個Id的1,10,2作為消息流。然後,服務器處理流中的消息,並將單個響應發送回客户端,以顯示成功的報告數量和失敗的報告數量。