本文旨在詳解 Flink TaskManager 的內存模型以及其各部分內存佔比的計算邏輯。首先,結合官網展示了當前 Flink 的內存模型,並在之後結合 JVM 自身內存模型和管理機制結合講解 Flink 內存模型的各個部分,最後結合源碼解釋了各部分內存佔比的計算邏輯。
1 內存模型組成部分
本節摘自官網:https://nightlies.apache.org/flink/flink-docs-release-1.20/zh...
| 組成部分 | 配置參數 | 描述 |
|---|---|---|
| 框架堆內存(Framework Heap Memory) | taskmanager.memory.framework.heap.size | 用於 Flink 框架的 JVM 堆內存(進階配置)。 |
| 任務堆內存(Task Heap Memory) | taskmanager.memory.task.heap.size | 用於 Flink 應用的算子及用户代碼的 JVM 堆內存。 |
| 託管內存(Managed memory) | taskmanager.memory.managed.size taskmanager.memory.managed.fraction | 由 Flink 管理的用於排序、哈希表、緩存中間結果及 RocksDB State Backend 的本地內存。 |
| 框架堆外內存(Framework Off-heap Memory) | taskmanager.memory.framework.off-heap.size | 用於 Flink 框架的堆外內存(直接內存或本地內存)(進階配置)。 |
| 任務堆外內存(Task Off-heap Memory) | taskmanager.memory.task.off-heap.size | 用於 Flink 應用的算子及用户代碼的堆外內存(直接內存或本地內存)。 |
| 網絡內存(Network Memory) | taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction | 用於任務之間數據傳輸的直接內存(例如網絡傳輸緩衝)。該內存部分為基於 Flink 總內存的受限的等比內存部分。這塊內存被用於分配網絡緩衝 |
| JVM Metaspace | taskmanager.memory.jvm-metaspace.size | Flink JVM 進程的 Metaspace。 |
| JVM 開銷 | taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction | 用於其他 JVM 開銷的本地內存,例如棧空間、垃圾回收空間等。該內存部分為基於進程總內存的受限的等比內存部分。 |
2 Flink 內存模型對內存的邏輯劃分
Flink 作為一個 JVM 進程,首先複習一下 JVM 內存的簡單劃分:
- Heap memory:JVM 進程內部使用的內存,用於存儲 java 對象,受 GC 控制;
- Native memory/Off-heap:Java 進程使用的用户地址空間上的非堆內存,不受 GC 控制,包含元空間(Metaspace)以及 Direct memory;
- Direct memory:Native Memory,意味着共享硬件內的底層緩衝區,目標是為了減少內存拷貝,可簡單理解為 NIO 中提供的 allocate 方法獲取的內存;
- Metaspace:Java8 後替代永久代的設計,之前類和方法、字符串常量池等信息存儲在永久代,而永久代使用堆內存,容易出現 OOM,且增加了 GC 的複雜性,而 metaspace 使用 native memory,降低了該風險。
參考資料:
- What is the difference between off-heap, native heap, direct memory and native memory?
JVM 其實真正控制的內存部分就是堆內、堆外(Direct 部分)、元空間,嚴格來説元空間也屬於堆外,但是 JVM 對這部分特別定義為了元空間。因此 Flink 在啓動 TM 時可以利用 -XMs、-Xmx 等 JVM 配置控制堆內內存,利用 -XX:MaxDirectMemorySize 和 -XX:MaxMetaspaceSize 分別控制 direct memory 和元空間。其中 task off-heap、framework off-heap、network memory 這三部分均屬於 direct memory。
而其他內存,比如通過 JNI 調用 C 代碼申請的內存其實是不在 JVM 管控內的,因此這部分內存也沒辦法通過 JVM 的參數控制,比如 Flink 的內存模型中,jvm overhead 和 managed memory 這兩部分內存是完全不受 JVM 管控的,屬於 native memory(非 direct),只能通過容器或者系統對 JVM 整體進程進行監控 kill。
一般情況下,配置 Flink 內存最簡單的方法是配置總內存。此外,Flink 也支持更細粒度的內存配置方式。
3 內存計算推斷邏輯
3.1 推斷入口及推斷模式
TaskManager 的內存計算推斷的邏輯入口在 org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils#processSpecFromConfig。
public static TaskExecutorProcessSpec processSpecFromConfig(final Configuration config) {
try {
return createMemoryProcessSpec(
config, PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
} catch (IllegalConfigurationException e) {
throw new IllegalConfigurationException(
"TaskManager memory configuration failed: " + e.getMessage(), e);
}
}
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#memoryProcessSpecFromConfig 有三種推斷模式(優先級自上而下):
public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
// all internal memory options are configured, use these to derive total Flink and
// process memory
return deriveProcessSpecWithExplicitInternalMemory(config);
} else if (config.contains(options.getTotalFlinkMemoryOption())) {
// internal memory options are not configured, total Flink memory is configured,
// derive from total flink memory
return deriveProcessSpecWithTotalFlinkMemory(config);
} else if (config.contains(options.getTotalProcessMemoryOption())) {
// total Flink memory is not configured, total process memory is configured,
// derive from total process memory
return deriveProcessSpecWithTotalProcessMemory(config);
}
return failBecauseRequiredOptionsNotConfigured();
}
-
如果以下部分的內存已經顯式配置 size,進入
deriveProcessSpecWithExplicitInternalMemory:taskmanager.memory.task.heap.sizetaskmanager.memory.managed.size
- 顯式配置了
taskmanager.memory.flink.size,進入deriveProcessSpecWithTotalFlinkMemory - 顯式配置了
taskmanager.memory.process.size,進入deriveProcessSpecWithTotalProcessMemory
3.2 deriveProcessSpecWithExplicitInternalMemory
private CommonProcessMemorySpec<FM> deriveProcessSpecWithExplicitInternalMemory(
Configuration config) {
FM flinkInternalMemory = flinkMemoryUtils.deriveFromRequiredFineGrainedOptions(config);
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
config, flinkInternalMemory.getTotalFlinkMemorySize());
return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
- 根據各部分內存的細粒度配置推斷整個 flink taskmanaged 的進程內存分配;
- 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存 jvmMetaspaceAndOverhead。
3.3 deriveProcessSpecWithTotalFlinkMemory
private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalFlinkMemory(
Configuration config) {
MemorySize totalFlinkMemorySize =
getMemorySizeFromConfig(config, options.getTotalFlinkMemoryOption());
FM flinkInternalMemory =
flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, totalFlinkMemorySize);
return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
- 根據配置獲取 totalFlinkMemorySize;
- 根據 totalFlinkMemorySize 推斷 flink taskmanager 進程各部分的內存分配: flinkInternalMemory;
- 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存。
3.4 deriveProcessSpecWithTotalProcessMemory
private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalProcessMemory(
Configuration config) {
MemorySize totalProcessMemorySize =
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, totalProcessMemorySize);
MemorySize totalFlinkMemorySize =
totalProcessMemorySize.subtract(
jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
FM flinkInternalMemory =
flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
- 根據配置獲取 totalProcessMemorySize;
- 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存 jvmMetaspaceAndOverhead;
- 計算得到 totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceAndOverhead;
- 根據 totalFlinkMemorySize 推斷 flink taskmanager 進程各部分的內存分配: flinkInternalMemory。
3.5 Jvm Metaspace 以及 overhead 部分推斷
- 從配置中獲取
taskmanager.memory.jvm-metaspace.size的配置(默認為 256M); - 將 metaspace 這部分內存加到 totalFlinkMemorySize 中得到 totalFlinkAndJvmMetaspaceSize;
-
如果配置了總進程內存,則利用總進程和其他部分的內存情況計算 得到 jvm overhead 部分的內存:
jvmOverheadSize = totalProcessSize - totalFlinkMemorySize - jvmMetaspaceSize - jvmOverheadSize。
-
如果未配置總進程內存,則利用 totalFlinkMemorySize + jvmMetaspaceSize 和
taskmanager.memory.jvm-overhead.fraction的值進行相對計算:jvmOverheadSize = max(jvmOverheadMinSize, min(jvmOverheadMaxSize, (totalFlinkMemorySize + jvmMetaspaceSize) * (jvmOverHeadFraction /(1- jvmOverHeadFraction)))
3.6 Flink 各部分內存的詳細計算邏輯(TaskExecutorFlinkMemoryUtils)
TaskExecutorFlinkMemoryUtils 提供了兩個方法:
- 給定 flink memory size(不包含 jvm metaspace 和 overhead),推斷內部的細粒度配置;
- 從細粒度的資源配置中推導出總的 flink memory size。
3.6.1 deriveFromTotalFlinkMemory
先計算幾個較為固定的內存部分:
taskmanager.memory.framework.heap.size,默認為 128 Mtaskmanager.memory.framework.off-heap.size,默認為 128 Mtaskmanager.memory.task.off-heap.size,默認為 0
檢查是否顯式配置了 taskmanager.memory.task.heap.size,
-
如果配置了
- 計算 heap memory:從配置中獲取
taskmanager.memory.task.heap.size; - 計算 managed memory:先看
taskmanager.memory.managed.size,如果未指定,則看taskmanager.memory.managed.fraction(默認為 0.4):
total flink memory * taskmanager.memory.managed.fraction,並限制在 0~Long.MAX_VALUE 之間。 -
計算 network buffer memory:
network buffer memory = total flink memory - framework heap/off-heap memory - task heap/off-heap memory - managed memory - 檢查 network buffer memory 是否符合配置規範,比如在
taskmanager.memory.network.min和taskmanager.memory.network.max之間,且如果顯式配置了taskmanager.memory.network.fraction,則與該比例需要一致。
- 計算 heap memory:從配置中獲取
-
如果沒配置
- 計算 managed memory,和上述邏輯一致;
- 計算 network buffer memory,和上述計算 managed memory 邏輯一致,唯一區別是 task.manager.memory.network.fraction 的默認值為 0.1;
-
計算 heap memory:
heap memory = total flink memory - framework heap/off-heap memory - task off-heap memory - managed memory - network buffer memory
3.6.2 deriveFromRequiredFineGrainedOptions
如果方法名描述得那樣,進入該方法的前提是 taskmanager.memory.task.heap.size 和
taskmanager.memory.managed.size 均已顯式配置。
- 先計算 heap memory 和 managed memory;
- 計算 framework heap memory 和 framework off-heap memory,默認為 128M;
- 計算 task off-heap memory,默認為 0;
- 檢查是否顯式配置了
taskmanager.memory.flink.size: -
如果配置了,計算
network buffer memory = total flink memory - framework heap/off-heap memory - task heap/off-heap memory - managed memory並檢查 network buffer memory 配置是否合理,見上文;
- 如果未配置,則優先通過配置計算 network buffer memory,再累加各部分內存得到 flink memory size。
4 內存配置優先級
從上述的計算邏輯可以看出來,大部分內存配置都有兩種,一個是具體大小的配置,一個是比例配置(fraction),當同時指定二者時,會優先採用指定的大小(Size)。 若二者均未指定,會根據默認佔比進行計算。
其次,內存的配置計算可以分為兩類:
第一類,配置了多少就給多少,如果細粒度配置這部分給完計算髮現超出總申請內存,則直接報錯,這部分內存包含:
- framework heap memory;
- framework off-heap memory;
- managed memory;
- task off-heap memory;
第二類,根據配置和實際分配情況做一些微調:
- task heap memory;
- network memory。
如果配置 task heap memory,那麼優先分配該塊,剩下的內存分配給 network memory,否則則優先分配 network memory。