過去,企業數據集成大多采用ETL(提取、轉換、加載)批處理模式,即在夜間或業務低峯期將數據從業務庫同步到數據倉庫。然而,在數字化轉型的浪潮下,實時推薦、實時風控、實時監控等場景要求數據能在秒級甚至毫秒級內得到處理和分析。
ETLCloud作為一個專業的數據集成平台,提供了強大的實時數據集成與ETL處理能力,能夠高效採集業務系統的增量數據並進行實時轉換。然而,在實際應用中,任何處理平台都會面臨資源(如內存、CPU)的物理限制。如果在某一時刻,涌入的數據流量遠遠超過平台的處理能力,就可能導致數據處理延遲、數據積壓,甚至在極端情況下影響系統穩定性。
為了解決這一問題,我們通常引入Apache Kafka作為分佈式流處理平台的典範,以其高吞吐、可持久化、多訂閲者的特性,將其作為實時數據管道的“中樞神經系統”和緩衝層。具體流程如下:
- 首先將產生的流數據可靠地推送至Kafka集羣。Kafka的高吞吐和持久化特性確保了數據在涌入高峯時也不會丟失。
- 其次,由ETLCloud等處理平台以自身最優的處理速度從Kafka中消費數據。Kafka的解耦特性允許處理平台根據自身能力平穩消費,避免了被數據洪峯沖垮的風險。
- 最後,這種“生產者-Kafka-消費者”的架構,通過將數據存儲與數據處理分離,不僅顯著提升了系統的彈性和容錯能力,更是實現了系統間解耦,為保障端到端的數據一致性提供了堅實的基礎。
那麼本文將對這一場景進行示例配置。
一、ETLCloud數據源配置
這裏我們要配置四個數據源,分別是
源端數據庫:產生增量流數據;
Kafka生產者:將流數據推送到Kafka主題;
Kafka消費者:消費推送到Kafka主題的消息;
目標端數據庫:將消費到的消息進行處理推送到目標庫;
來到ETLCLoud平台首頁,進入數據源管理模塊。
首先我們來創建Kafka的數據源
注意,一個Kafka的數據源只能在屬性這裏選擇是生產者和消費者其中一種屬性進行配置,所以我們要配置兩個Kafka數據源,一個消費者,一個生產者
消費者配置示例:
生產者配置示例:
配置完了Kafka的數據源接下來配置源端和目標端的數據源,這裏源端MySql產生增量流數據,最後將數據推送到PostgreSql中去
mysql數據源配置:
PostgreSql數據源:
二、配置數據庫監聽器
配置完了數據源,接下來配置數據庫監聽器,數據庫監聽器的作用是監聽源端的數據變更,捕獲到數據變更推送到後續流程。
來到實時數據集成模塊,新建監聽器:
三、配置Kafka消息處理流程
首先來到離線數據集成模塊,先創建一個流程
四、配置Kafka監聽器
接着,來到實時數據集成,創建Kafka監聽器
保存後,啓動Kafka監聽器後啓動數據庫監聽器
監聽器會監聽源端mysql表的數據推送到Kafka中去
五、運行
源端插入10條數據
數據庫監聽器裏面監聽到數據並往Kafka裏面推送
數據成功被推到Kafka主題
消息推送到主題後,Kafka監聽器監聽到消息並啓動離線流程,在流程中將數據推送到數據庫中去
檢查數據庫,數據成功插入
以上便是通過ETLCloud構建Kafka實時數據管道的過程,ETLCloud通過其可視化開發、強大轉換能力、多目標支持和企業級可靠性,將流式ETL的複雜技術細節封裝起來,讓數據工程師和分析師能夠更專注於業務邏輯本身,而非底層實現,極大地加速了企業從數據到實時洞察的進程,是構建現代實時數據架構的理想選擇。