Stories

Detail Return Return

【Flink】TaskManager 內存模型及計算邏輯詳解 - Stories Detail

本文旨在詳解 Flink TaskManager 的內存模型以及其各部分內存佔比的計算邏輯。首先,結合官網展示了當前 Flink 的內存模型,並在之後結合 JVM 自身內存模型和管理機制結合講解 Flink 內存模型的各個部分,最後結合源碼解釋了各部分內存佔比的計算邏輯。

1 內存模型組成部分

本節摘自官網:https://nightlies.apache.org/flink/flink-docs-release-1.20/zh...
image.png

組成部分 配置參數 描述
框架堆內存(Framework Heap Memory) taskmanager.memory.framework.heap.size 用於 Flink 框架的 JVM 堆內存(進階配置)。
任務堆內存(Task Heap Memory) taskmanager.memory.task.heap.size 用於 Flink 應用的算子及用户代碼的 JVM 堆內存。
託管內存(Managed memory) taskmanager.memory.managed.size taskmanager.memory.managed.fraction 由 Flink 管理的用於排序、哈希表、緩存中間結果及 RocksDB State Backend 的本地內存。
框架堆外內存(Framework Off-heap Memory) taskmanager.memory.framework.off-heap.size 用於 Flink 框架的堆外內存(直接內存或本地內存)(進階配置)。
任務堆外內存(Task Off-heap Memory) taskmanager.memory.task.off-heap.size 用於 Flink 應用的算子及用户代碼的堆外內存(直接內存或本地內存)。
網絡內存(Network Memory) taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction 用於任務之間數據傳輸的直接內存(例如網絡傳輸緩衝)。該內存部分為基於 Flink 總內存的受限的等比內存部分。這塊內存被用於分配網絡緩衝
JVM Metaspace taskmanager.memory.jvm-metaspace.size Flink JVM 進程的 Metaspace。
JVM 開銷 taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction 用於其他 JVM 開銷的本地內存,例如棧空間、垃圾回收空間等。該內存部分為基於進程總內存的受限的等比內存部分。

2 Flink 內存模型對內存的邏輯劃分

Flink 作為一個 JVM 進程,首先複習一下 JVM 內存的簡單劃分:

  • Heap memory:JVM 進程內部使用的內存,用於存儲 java 對象,受 GC 控制;
  • Native memory/Off-heap:Java 進程使用的用户地址空間上的非堆內存,不受 GC 控制,包含元空間(Metaspace)以及 Direct memory;
  • Direct memory:Native Memory,意味着共享硬件內的底層緩衝區,目標是為了減少內存拷貝,可簡單理解為 NIO 中提供的 allocate 方法獲取的內存;
  • Metaspace:Java8 後替代永久代的設計,之前類和方法、字符串常量池等信息存儲在永久代,而永久代使用堆內存,容易出現 OOM,且增加了 GC 的複雜性,而 metaspace 使用 native memory,降低了該風險。

參考資料:

  1. What is the difference between off-heap, native heap, direct memory and native memory?

JVM 其實真正控制的內存部分就是堆內、堆外(Direct 部分)、元空間,嚴格來説元空間也屬於堆外,但是 JVM 對這部分特別定義為了元空間。因此 Flink 在啓動 TM 時可以利用 -XMs、-Xmx 等 JVM 配置控制堆內內存,利用 -XX:MaxDirectMemorySize 和 -XX:MaxMetaspaceSize 分別控制 direct memory 和元空間。其中 task off-heap、framework off-heap、network memory 這三部分均屬於 direct memory。

而其他內存,比如通過 JNI 調用 C 代碼申請的內存其實是不在 JVM 管控內的,因此這部分內存也沒辦法通過 JVM 的參數控制,比如 Flink 的內存模型中,jvm overhead 和 managed memory 這兩部分內存是完全不受 JVM 管控的,屬於 native memory(非 direct),只能通過容器或者系統對 JVM 整體進程進行監控 kill。

一般情況下,配置 Flink 內存最簡單的方法是配置總內存。此外,Flink 也支持更細粒度的內存配置方式。

3 內存計算推斷邏輯

3.1 推斷入口及推斷模式

TaskManager 的內存計算推斷的邏輯入口在 org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils#processSpecFromConfig。

public static TaskExecutorProcessSpec processSpecFromConfig(final Configuration config) {
    try {
        return createMemoryProcessSpec(
                config, PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
    } catch (IllegalConfigurationException e) {
        throw new IllegalConfigurationException(
                "TaskManager memory configuration failed: " + e.getMessage(), e);
    }
}

org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#memoryProcessSpecFromConfig 有三種推斷模式(優先級自上而下):

public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
    if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
        // all internal memory options are configured, use these to derive total Flink and
        // process memory
        return deriveProcessSpecWithExplicitInternalMemory(config);
    } else if (config.contains(options.getTotalFlinkMemoryOption())) {
        // internal memory options are not configured, total Flink memory is configured,
        // derive from total flink memory
        return deriveProcessSpecWithTotalFlinkMemory(config);
    } else if (config.contains(options.getTotalProcessMemoryOption())) {
        // total Flink memory is not configured, total process memory is configured,
        // derive from total process memory
        return deriveProcessSpecWithTotalProcessMemory(config);
    }
    return failBecauseRequiredOptionsNotConfigured();
}
  1. 如果以下部分的內存已經顯式配置 size,進入 deriveProcessSpecWithExplicitInternalMemory

    • taskmanager.memory.task.heap.size
    • taskmanager.memory.managed.size
  2. 顯式配置了 taskmanager.memory.flink.size,進入 deriveProcessSpecWithTotalFlinkMemory
  3. 顯式配置了 taskmanager.memory.process.size,進入 deriveProcessSpecWithTotalProcessMemory

3.2 deriveProcessSpecWithExplicitInternalMemory

private CommonProcessMemorySpec<FM> deriveProcessSpecWithExplicitInternalMemory(
        Configuration config) {
    FM flinkInternalMemory = flinkMemoryUtils.deriveFromRequiredFineGrainedOptions(config);
    JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
            deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
                    config, flinkInternalMemory.getTotalFlinkMemorySize());
    return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
  1. 根據各部分內存的細粒度配置推斷整個 flink taskmanaged 的進程內存分配;
  2. 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存 jvmMetaspaceAndOverhead。

3.3 deriveProcessSpecWithTotalFlinkMemory

private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalFlinkMemory(
            Configuration config) {
    MemorySize totalFlinkMemorySize =
            getMemorySizeFromConfig(config, options.getTotalFlinkMemoryOption());
    FM flinkInternalMemory =
            flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
    JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
            deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, totalFlinkMemorySize);
    return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
  1. 根據配置獲取 totalFlinkMemorySize;
  2. 根據 totalFlinkMemorySize 推斷 flink taskmanager 進程各部分的內存分配: flinkInternalMemory;
  3. 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存。

3.4 deriveProcessSpecWithTotalProcessMemory

private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalProcessMemory(
        Configuration config) {
    MemorySize totalProcessMemorySize =
            getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
    JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
            deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, totalProcessMemorySize);
    MemorySize totalFlinkMemorySize =
            totalProcessMemorySize.subtract(
                    jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
    FM flinkInternalMemory =
            flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
    return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}
  1. 根據配置獲取 totalProcessMemorySize;
  2. 計算 jvm metaspace 和 jvm overhead 部分需要佔用的內存 jvmMetaspaceAndOverhead;
  3. 計算得到 totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceAndOverhead;
  4. 根據 totalFlinkMemorySize 推斷 flink taskmanager 進程各部分的內存分配: flinkInternalMemory。

3.5 Jvm Metaspace 以及 overhead 部分推斷

  1. 從配置中獲取 taskmanager.memory.jvm-metaspace.size 的配置(默認為 256M);
  2. 將 metaspace 這部分內存加到 totalFlinkMemorySize 中得到 totalFlinkAndJvmMetaspaceSize;
  3. 如果配置了總進程內存,則利用總進程和其他部分的內存情況計算 得到 jvm overhead 部分的內存:

    jvmOverheadSize = totalProcessSize - totalFlinkMemorySize - jvmMetaspaceSize
  4. jvmOverheadSize。
  5. 如果未配置總進程內存,則利用 totalFlinkMemorySize + jvmMetaspaceSize 和 taskmanager.memory.jvm-overhead.fraction 的值進行相對計算:

    jvmOverheadSize = max(jvmOverheadMinSize, min(jvmOverheadMaxSize, (totalFlinkMemorySize + jvmMetaspaceSize) * (jvmOverHeadFraction /(1- jvmOverHeadFraction)))

3.6 Flink 各部分內存的詳細計算邏輯(TaskExecutorFlinkMemoryUtils)

TaskExecutorFlinkMemoryUtils 提供了兩個方法:

  1. 給定 flink memory size(不包含 jvm metaspace 和 overhead),推斷內部的細粒度配置;
  2. 從細粒度的資源配置中推導出總的 flink memory size。

3.6.1 deriveFromTotalFlinkMemory

先計算幾個較為固定的內存部分:

  • taskmanager.memory.framework.heap.size,默認為 128 M
  • taskmanager.memory.framework.off-heap.size,默認為 128 M
  • taskmanager.memory.task.off-heap.size,默認為 0

檢查是否顯式配置了 taskmanager.memory.task.heap.size

  • 如果配置了

    • 計算 heap memory:從配置中獲取 taskmanager.memory.task.heap.size
    • 計算 managed memory:先看 taskmanager.memory.managed.size ,如果未指定,則看 taskmanager.memory.managed.fraction(默認為 0.4):
      total flink memory * taskmanager.memory.managed.fraction,並限制在 0~Long.MAX_VALUE 之間。
    • 計算 network buffer memory:

      network buffer memory = total flink memory
                              - framework heap/off-heap memory 
                              - task heap/off-heap memory 
                              - managed memory
    • 檢查 network buffer memory 是否符合配置規範,比如在 taskmanager.memory.network.mintaskmanager.memory.network.max 之間,且如果顯式配置了 taskmanager.memory.network.fraction ,則與該比例需要一致。
  • 如果沒配置

    • 計算 managed memory,和上述邏輯一致;
    • 計算 network buffer memory,和上述計算 managed memory 邏輯一致,唯一區別是 task.manager.memory.network.fraction 的默認值為 0.1;
    • 計算 heap memory:

      heap memory = total flink memory
                    - framework heap/off-heap memory 
                    - task off-heap memory 
                    - managed memory
                    - network buffer memory

3.6.2 deriveFromRequiredFineGrainedOptions

如果方法名描述得那樣,進入該方法的前提是 taskmanager.memory.task.heap.size
taskmanager.memory.managed.size 均已顯式配置。

  1. 先計算 heap memory 和 managed memory;
  2. 計算 framework heap memory 和 framework off-heap memory,默認為 128M;
  3. 計算 task off-heap memory,默認為 0;
  4. 檢查是否顯式配置了 taskmanager.memory.flink.size
  5. 如果配置了,計算

    network buffer memory = total flink memory 
                             - framework heap/off-heap memory 
                             - task heap/off-heap memory 
                             - managed memory

    並檢查 network buffer memory 配置是否合理,見上文;

  6. 如果未配置,則優先通過配置計算 network buffer memory,再累加各部分內存得到 flink memory size。

4 內存配置優先級

從上述的計算邏輯可以看出來,大部分內存配置都有兩種,一個是具體大小的配置,一個是比例配置(fraction),當同時指定二者時,會優先採用指定的大小(Size)。 若二者均未指定,會根據默認佔比進行計算。

其次,內存的配置計算可以分為兩類:

第一類,配置了多少就給多少,如果細粒度配置這部分給完計算髮現超出總申請內存,則直接報錯,這部分內存包含:

  • framework heap memory;
  • framework off-heap memory;
  • managed memory;
  • task off-heap memory;

第二類,根據配置和實際分配情況做一些微調:

  • task heap memory;
  • network memory。

如果配置 task heap memory,那麼優先分配該塊,剩下的內存分配給 network memory,否則則優先分配 network memory。

user avatar wuliaodeliema Avatar wangdachui_5d9d33e8767fc Avatar
Favorites 2 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.