這個五月,eKuiper 處在 1.6.0 版本新功能開發早期階段,產品功能開發主要包括 protobuf 編解碼支持。針對用户多次提出的用拖拽方式編寫數據處理規則需求,我們也在進行原型驗證。v1.5.1 的 bug 修復也在進行中,預計於 6 月初發布。
此外,作為邊緣流式數據處理軟件,eKuiper 還參與了中國信通院發起的《邊緣流式數據處理技術能力要求》標準評估,通過後 eKuiper 將成為首個通過此標準認定的產品。
protobuf 編解碼支持
目前 source 和 sink 關於編解碼的配置屬性 format 默認值為 JSON,用户可以通過新增的 protobuf 的選項選用 protobuf 的編解碼能力。相比於 JSON,protobuf 編碼的數據量更小,有利於節省雲邊之間傳輸的帶寬。該功能目前已經開發完成但尚未發佈,用户可以通過邊緣源碼的 v1.6.0 分支進行試用。
相比於無模式(schema)的 JSON 格式,protobuf 需要定義 proto 文件作為編解碼的 schema。在使用 protobuf 格式之前,用户需要先註冊 schema,並通過新增的 schemaId 屬性,指定編解碼選用的 schema。
Schema 管理
使用 REST API 註冊,schema 內容可通過文件路徑或者文本提供。
// POST /schemas
{
"id":"fileName",
"file":"http://myhost/files/abc.proto"
}
或者通過文本內容配置:
// POST /schemas
{
"id":"schemaName",
"content":"message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
}"
}
無論何種方式,內容會被存儲於 etc/schemas/protobuf。
Sink 中使用 protobuf
- format,用於指定使用的編碼
- schemaId, 用於指定使用的 schema
{
"mqtt":{
"server":"tcp://127.0.0.1:1883",
"topic": "result",
"format":"protobuf",
"schemaId":"schemaName.Person" // protobuf的 ID分為兩部分,前面為文件名,後面為message名
}
}
Source 中使用 protobuf
FORMAT,支持 protobuf
SCHEMA_ID ,用於指定使用的 schema
CREATE STREAM demo() WITH (TYPE="NEURON", FORMAT="protobuf", SCHEMA_ID="schemaName.Person")
可視化拖拽編輯能力
之前 eKuiper 針對流式數據僅支持 SQL 形式編寫數據處理規則,有一定門檻,不方便業務人員直接參與規則編寫。為了進一步降低使用門檻,eKuiper 準備支持以可視化方式拖拽數據處理單元並進行簡單編輯配置,最終將多個數據處理單元連接起來自動形成數據處理規則,方便更多人直接使用 eKuiper 進行業務處理。目前這個方案正在原型設計階段。
即將到來
下個月我們將完善可視化拖拽編輯,併合併到主分支中。另外,eKuiper 將優化 sink 出錯之後的緩存機制,實現內存 + 磁盤的離線存儲,並在錯誤恢復後順序重發,以支持網絡斷開情況下更強的恢復能力,緩存更長時間的數據。
版權聲明: 本文為 EMQ 原創,轉載請註明出處。
原文鏈接:https://www.emqx.com/zh/blog/ekuiper-newsletter-202205