Flink Watermark(水位線)機制詳解
Watermark是Flink處理事件時間(Event Time)的核心機制,用於處理亂序數據和觸發窗口計算。讓我全面深入地介紹:
一、核心概念
1. 什麼是Watermark?
**Watermark(水位線)**是一個時間戳標記,表示:
- “所有時間戳 ≤ Watermark 的數據都已到達”
- “時間戳 > Watermark 的數據可能還在路上”
Watermark(t) 的含義:
時間戳 ≤ t 的數據已經全部到達(或大部分到達)
2. 為什麼需要Watermark?
在分佈式流處理中,數據可能會:
- ⏰ 亂序到達:網絡延遲、多數據源等原因
- 🐌 延遲到達:某些數據比其他數據晚很多
- ❓ 何時觸發計算:不知道數據是否都到齊了
Watermark解決的核心問題:
在數據可能亂序的情況下,如何確定"某個時間窗口的數據已經全部到達,可以觸發計算了"?
3. Watermark示意圖
數據流(事件時間):
時間戳: 1 3 2 5 4 8 7 10 9
| | | | | | | | |
↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
[==================數據流==================>
Watermark插入(假設允許2秒延遲):
數據: 1 3 2 W(1) 5 4 W(3) 8 7 W(6) 10 9 W(8)
↑ ↑ ↑ ↑
Watermark(1) Watermark(3) ...
Watermark(6)表示:
- 時間戳 ≤ 6 的數據已經全部到達
- 窗口[0,5)可以觸發計算了
- 時間戳為3的延遲數據仍能被處理
二、Watermark的工作原理
1. Watermark與窗口觸發
窗口觸發條件:
當 Watermark >= 窗口結束時間 時,窗口觸發計算
示例:窗口 [0, 10)
- Watermark = 5 → 窗口不觸發(還有數據可能到達)
- Watermark = 9 → 窗口不觸發(還有數據可能到達)
- Watermark = 10 → 窗口觸發!(時間戳<10的數據已全部到達)
2. 完整流程示例
場景:5秒滾動窗口,允許3秒延遲
數據到達順序(事件時間):
t=1s → 進入窗口[0,5)
t=3s → 進入窗口[0,5)
t=2s → 進入窗口[0,5)(亂序)
t=7s → 進入窗口[5,10),生成Watermark(4)
t=6s → 進入窗口[5,10)(亂序)
t=10s → 進入窗口[10,15),生成Watermark(7)
t=9s → 進入窗口[5,10)(亂序)
Watermark推進過程:
1. 收到t=7s,當前最大時間戳=7
→ Watermark = 7 - 3 = 4
→ 窗口[0,5)不觸發(4 < 5)
2. 收到t=10s,當前最大時間戳=10
→ Watermark = 10 - 3 = 7
→ 窗口[5,10)不觸發(7 < 10)
3. 收到t=13s,當前最大時間戳=13
→ Watermark = 13 - 3 = 10
→ 窗口[5,10)觸發!(10 >= 10)
→ 輸出窗口[5,10)的計算結果
三、Watermark生成策略
1. 週期性生成(Periodic Watermarks)
特點:按固定時間間隔生成Watermark
// 方式1:有界亂序(最常用)
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// 原理:
// Watermark = 當前最大事件時間 - 允許的最大亂序時間
// 例如:最大事件時間=10s,允許亂序=3s → Watermark=7s
// 方式2:單調遞增(無亂序)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// 原理:
// Watermark = 當前最大事件時間
// 適用於數據嚴格按時間順序到達的場景
// 方式3:自定義週期性Watermark
WatermarkStrategy
.forGenerator((context) -> new WatermarkGenerator<Event>() {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 3000L; // 3秒
@Override
public void onEvent(Event event, long eventTimestamp,
WatermarkOutput output) {
// 每條數據到達時更新最大時間戳
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 週期性生成Watermark(默認200ms一次)
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
})
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
2. 標點式生成(Punctuated Watermarks)
特點:根據特定數據標記生成Watermark
// 自定義標點式Watermark
WatermarkStrategy
.forGenerator((context) -> new WatermarkGenerator<Event>() {
@Override
public void onEvent(Event event, long eventTimestamp,
WatermarkOutput output) {
// 遇到特殊標記數據時生成Watermark
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 標點式不使用週期性生成
}
})
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// 應用場景:
// 1. 數據源自帶Watermark標記
// 2. Kafka等消息隊列的特殊控制消息
// 3. 需要精確控制Watermark生成時機
四、完整代碼示例
示例1:基礎Watermark使用
public class BasicWatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 設置Watermark生成間隔(默認200ms)
env.getConfig().setAutoWatermarkInterval(1000L); // 1秒
// 模擬亂序數據
DataStream<Event> events = env.fromElements(
new Event("sensor1", 1000L, 25.5), // 1秒
new Event("sensor1", 3000L, 26.0), // 3秒
new Event("sensor1", 2000L, 25.8), // 2秒(亂序)
new Event("sensor1", 7000L, 27.0), // 7秒
new Event("sensor1", 5000L, 26.5), // 5秒(亂序)
new Event("sensor1", 11000L, 28.0) // 11秒
);
// 分配時間戳和Watermark(允許3秒亂序)
DataStream<Event> withWatermarks = events.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.timestamp)
);
// 5秒滾動窗口
withWatermarks
.keyBy(event -> event.sensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
@Override
public void process(
String sensorId,
Context ctx,
Iterable<Event> events,
Collector<String> out
) {
double sum = 0;
int count = 0;
for (Event event : events) {
sum += event.temperature;
count++;
}
out.collect(String.format(
"Sensor: %s, Window: [%d-%d], Avg Temp: %.2f, Count: %d",
sensorId,
ctx.window().getStart() / 1000,
ctx.window().getEnd() / 1000,
sum / count,
count
));
}
})
.print();
env.execute("Basic Watermark Example");
}
static class Event {
String sensorId;
Long timestamp;
Double temperature;
Event(String sensorId, Long timestamp, Double temperature) {
this.sensorId = sensorId;
this.timestamp = timestamp;
this.temperature = temperature;
}
}
}
/* 執行過程分析:
數據到達:t=1s, 3s, 2s, 7s, 5s, 11s
Watermark推進:
1. t=1s → maxTimestamp=1s → Watermark=-2s(1-3)
2. t=3s → maxTimestamp=3s → Watermark=0s(3-3)
3. t=2s → maxTimestamp=3s → Watermark=0s(不變)
4. t=7s → maxTimestamp=7s → Watermark=4s(7-3)
5. t=5s → maxTimestamp=7s → Watermark=4s(不變)
6. t=11s → maxTimestamp=11s → Watermark=8s(11-3)
→ 窗口[0,5)觸發!(8>=5)
窗口[0,5)包含的數據:
- t=1s ✅
- t=3s ✅
- t=2s ✅(亂序數據被正確處理)
輸出:
Sensor: sensor1, Window: [0-5], Avg Temp: 25.77, Count: 3
*/
示例2:觀察Watermark推進過程
public class WatermarkObserverExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 單並行度便於觀察
DataStream<Event> events = env.fromElements(
new Event(1000L),
new Event(3000L),
new Event(2000L), // 亂序
new Event(5000L),
new Event(4000L), // 亂序
new Event(8000L),
new Event(11000L)
);
// 允許2秒亂序
events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, ts) -> event.timestamp)
)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(
Event event,
Context ctx,
Collector<String> out
) {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(String.format(
"Event: t=%ds, CurrentWatermark: %ds",
event.timestamp / 1000,
currentWatermark == Long.MIN_VALUE ?
-999 : currentWatermark / 1000
));
}
})
.print();
env.execute("Watermark Observer");
}
static class Event {
Long timestamp;
Event(Long timestamp) { this.timestamp = timestamp; }
}
}
/* 輸出:
Event: t=1s, CurrentWatermark: -999s (初始值)
Event: t=3s, CurrentWatermark: 1s (3-2=1)
Event: t=2s, CurrentWatermark: 1s (maxTs仍是3)
Event: t=5s, CurrentWatermark: 3s (5-2=3)
Event: t=4s, CurrentWatermark: 3s (maxTs仍是5)
Event: t=8s, CurrentWatermark: 6s (8-2=6)
Event: t=11s, CurrentWatermark: 9s (11-2=9)
觀察:
- Watermark單調遞增,不會回退
- 亂序數據不影響Watermark(只看最大時間戳)
- Watermark = 最大事件時間 - 允許延遲
*/
示例3:多流Watermark對齊
public class MultiStreamWatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 數據源1:快速流(低延遲)
DataStream<Event> fastStream = env
.fromElements(
new Event("fast", 1000L),
new Event("fast", 2000L),
new Event("fast", 3000L),
new Event("fast", 10000L) // 快速推進到10秒
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((e, ts) -> e.timestamp)
);
// 數據源2:慢速流(高延遲)
DataStream<Event> slowStream = env
.fromElements(
new Event("slow", 1000L),
new Event("slow", 2000L),
new Event("slow", 3000L) // 只推進到3秒
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((e, ts) -> e.timestamp)
);
// 合流
fastStream
.union(slowStream)
.keyBy(event -> "key")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
public void process(String key, Context ctx,
Iterable<Event> events, Collector<String> out) {
int count = 0;
for (Event e : events) count++;
out.collect(String.format(
"Window [%d-%d]: %d events",
ctx.window().getStart() / 1000,
ctx.window().getEnd() / 1000,
count
));
}
})
.print();
env.execute("Multi-Stream Watermark");
}
static class Event {
String source;
Long timestamp;
Event(String source, Long timestamp) {
this.source = source;
this.timestamp = timestamp;
}
}
}
/* Watermark對齊原理:
合流後的Watermark = min(所有上游的Watermark)
fastStream Watermark: 1s → 2s → 3s → 10s
slowStream Watermark: 1s → 2s → 3s
合流後Watermark: 1s → 2s → 3s → 3s(被慢流拖住)
影響:
- 即使fastStream推進到10s,窗口[0,5)仍不觸發
- 因為合流後Watermark只有3s < 5s
- slowStream成為瓶頸(數據傾斜問題)
*/
五、延遲數據處理
1. 什麼是延遲數據?
延遲數據:事件時間 < 當前Watermark 的數據
示例:
當前Watermark = 10s
收到一條t=7s的數據 → 延遲數據(7 < 10)
窗口[5,10)已經在Watermark=10s時觸發計算
t=7s的數據到達時窗口已關閉 → 默認被丟棄!
2. 延遲數據處理策略
策略1:設置允許的延遲時間(Allowed Lateness)
dataStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2)) // 允許窗口關閉後2秒內的延遲數據
.sum(1);
/* 工作原理:
窗口[0,5):
1. Watermark=5s → 窗口首次觸發,輸出結果1
2. 收到t=3s的延遲數據 → 重新計算,輸出結果2(更新)
3. 收到t=4s的延遲數據 → 重新計算,輸出結果3(更新)
4. Watermark=7s → 窗口徹底關閉(5+2=7)
5. 之後的延遲數據被丟棄
優點:
- 容忍一定程度的延遲
- 結果更準確
缺點:
- 需要保持窗口狀態更長時間
- 可能產生多次輸出
*/
策略2:側輸出流(Side Output)收集延遲數據
// 定義延遲數據標籤
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data"){};
SingleOutputStreamOperator<String> result = dataStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(lateDataTag) // 延遲數據輸出到側輸出流
.sum(1);
// 獲取延遲數據
DataStream<Event> lateData = result.getSideOutput(lateDataTag);
// 處理延遲數據
lateData.print("Late Data"); // 可以單獨處理或記錄日誌
/* 優點:
- 不丟失任何數據
- 可以單獨分析延遲數據
- 用於監控和告警
應用場景:
- 數據質量監控
- 延遲數據統計
- 後續補償處理
*/
策略3:組合使用
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data"){};
SingleOutputStreamOperator<String> result = dataStream
.keyBy(event -> event.sensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2)) // 允許2秒延遲
.sideOutputLateData(lateDataTag) // 超過2秒的進側輸出
.aggregate(new MyAggregateFunction());
// 主流:正常和輕度延遲的數據
result.print("Main Output");
// 側輸出流:嚴重延遲的數據
result.getSideOutput(lateDataTag).print("Severe Late Data");
/* 數據分類:
1. 正常數據:t <= Watermark
→ 正常進入窗口
2. 輕度延遲:Watermark < t < Watermark+AllowedLateness
→ 進入窗口,觸發重新計算
3. 嚴重延遲:t >= Watermark+AllowedLateness
→ 輸出到側輸出流
時間線示例(窗口[0,5),允許延遲2秒):
Watermark=3s: t=2s → 正常數據
Watermark=5s: 窗口觸發,輸出結果
Watermark=6s: t=4s → 輕度延遲,重新計算
Watermark=7s: 窗口徹底關閉
Watermark=8s: t=3s → 嚴重延遲,進側輸出流
*/
完整示例:延遲數據處理
public class LateDataHandlingExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 定義延遲數據標籤
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data"){};
DataStream<Event> events = env.fromElements(
new Event("sensor1", 1000L, 25.0),
new Event("sensor1", 2000L, 26.0),
new Event("sensor1", 3000L, 27.0),
new Event("sensor1", 8000L, 28.0), // 推進Watermark到6s
new Event("sensor1", 4000L, 26.5), // 延遲數據1(在允許範圍內)
new Event("sensor1", 10000L, 29.0), // 推進Watermark到8s
new Event("sensor1", 2500L, 25.5) // 延遲數據2(超過允許延遲)
);
DataStream<Event> withWatermarks = events.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, ts) -> event.timestamp)
);
SingleOutputStreamOperator<String> result = withWatermarks
.keyBy(event -> event.sensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(1)) // 允許1秒延遲
.sideOutputLateData(lateDataTag) // 嚴重延遲數據輸出到側輸出
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
@Override
public void process(
String key,
Context ctx,
Iterable<Event> events,
Collector<String> out
) {
double sum = 0;
int count = 0;
for (Event event : events) {
sum += event.temperature;
count++;
}
out.collect(String.format(
"[%s] Window [%d-%d]: Avg=%.2f, Count=%d, Watermark=%d",
ctx.currentProcessingTime(),
ctx.window().getStart() / 1000,
ctx.window().getEnd() / 1000,
sum / count,
count,
ctx.currentWatermark() / 1000
));
}
});
// 主輸出流
result.print("Main");
// 延遲數據流
result.getSideOutput(lateDataTag)
.map(event -> String.format(
"Late Data: t=%ds, temp=%.1f",
event.timestamp / 1000,
event.temperature
))
.print("Late");
env.execute("Late Data Handling");
}
static class Event {
String sensorId;
Long timestamp;
Double temperature;
Event(String sensorId, Long timestamp, Double temperature) {
this.sensorId = sensorId;
this.timestamp = timestamp;
this.temperature = temperature;
}
}
}
/* 輸出:
Main> Window [0-5]: Avg=26.00, Count=3, Watermark=6
↑ Watermark=6s時觸發,包含t=1s,2s,3s
Main> Window [0-5]: Avg=26.13, Count=4, Watermark=6
↑ t=4s的延遲數據到達,重新計算(4s在允許延遲內)
Late> Late Data: t=2s, temp=25.5
↑ t=2.5s的數據超過允許延遲,進入側輸出流
觀察:
1. 窗口首次觸發:Watermark=6s (8-2=6 >= 5)
2. t=4s延遲數據觸發重算:4s在[5-1, 5+1]範圍內
3. t=2.5s嚴重延遲:Watermark已=8s,窗口在6s徹底關閉
*/
六、Watermark傳播機制
1. 單流傳播
Source → Map → KeyBy → Window
↓ ↓ ↓ ↓
W1 → W1 → W1 → W1
Watermark在算子間傳播:
- 每個算子收到Watermark後向下游轉發
- 保持單調遞增
2. 多流合併
Stream1 (W1=10s) ┐
├→ Union → (Watermark = min(10,5) = 5s)
Stream2 (W2=5s) ┘
規則:合流後的Watermark取所有上游的最小值
原因:保守策略,確保不會漏掉任何數據
3. 分流廣播
┌→ Stream1 (W=10s)
Source ─┤
└→ Stream2 (W=10s)
規則:所有分支獲得相同的Watermark
七、Watermark最佳實踐
1. 選擇合適的亂序時間
// ❌ 太小:丟失延遲數據
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))
// 1秒延遲可能不夠
// ✅ 適中:平衡準確性和延遲
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
// ❌ 太大:結果延遲高
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(10))
// 10分鐘太保守,實時性差
2. 監控Watermark延遲
dataStream
.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
long watermark = ctx.timerService().currentWatermark();
long eventTime = event.timestamp;
long lag = eventTime - watermark; // Watermark延遲
if (lag > 60000) { // 延遲超過1分鐘
// 記錄日誌或發送告警
System.err.println("High watermark lag: " + lag + "ms");
}
out.collect(event);
}
});
3. 處理空閒數據源
// 問題:某個分區長時間無數據,Watermark不推進
// 解決:設置空閒超時
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withIdleness(Duration.ofMinutes(1)) // 1分鐘無數據則視為空閒
.withTimestampAssigner((event, ts) -> event.timestamp);
/* 效果:
- 分區空閒1分鐘後,不再影響全局Watermark
- 其他活躍分區的Watermark可以正常推進
*/
4. Kafka數據源的Watermark
// Kafka分區獨立生成Watermark
FlinkKafkaConsumer<Event> consumer = new FlinkKafkaConsumer<>(...);
DataStream<Event> stream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withIdleness(Duration.ofMinutes(1)) // 重要!處理空閒分區
.withTimestampAssigner((event, ts) -> event.timestamp)
);
/* 注意事項:
1. Kafka每個分區獨立生成Watermark
2. 全局Watermark = min(所有分區的Watermark)
3. 某個分區空閒會拖慢全局Watermark
4. 必須設置withIdleness處理空閒分區
*/
八、關鍵要點總結
核心概念
- ✅ Watermark定義:時間戳標記,表示"≤該時間戳的數據已全部到達"
- ✅ 觸發條件:Watermark >= 窗口結束時間時觸發窗口計算
- ✅ 單調遞增:Watermark只能前進,不能後退
- ✅ 亂序處理:通過設置允許的亂序時間容忍延遲數據
生成策略
- ✅ 有界亂序:Watermark = 最大事件時間 - 允許延遲(最常用)
- ✅ 單調遞增:Watermark = 最大事件時間(無亂序場景)
- ✅ 自定義生成:根據業務需求定製Watermark邏輯
延遲數據
- ⚠️ allowedLateness:允許窗口關閉後繼續接收延遲數據
- ⚠️ sideOutputLateData:將嚴重延遲數據輸出到側輸出流
- ⚠️ 多次輸出:延遲數據可能導致窗口重複計算並輸出
多流場景
- ⚠️ Watermark對齊:多流合併取最小Watermark
- ⚠️ 空閒數據源:使用withIdleness避免空閒分區拖慢Watermark
- ⚠️ 數據傾斜:慢速分區會成為Watermark瓶頸
最佳實踐
- ✅ 根據業務容忍度選擇合適的亂序時間
- ✅ 監控Watermark延遲,及時發現數據源問題
- ✅ 使用側輸出流記錄延遲數據,便於分析和告警
- ✅ Kafka等多分區數據源必須設置空閒超時
Watermark是Flink事件時間處理的核心,理解其原理對於開發高質量的實時應用至關重要!
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。