Stories

Detail Return Return

Spark源碼解析(一):RDD之Transfrom算子 - Stories Detail

一、延遲計算

RDD 代表的是分佈式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations)

在 RDD 的編程模型中,一共有兩種算子,Transformations 類算子和 Actions 類算子。開發者需要使用 Transformations 類算子,定義並描述數據形態的轉換過程,然後調用 Actions 類算子,將計算結果收集起來、或是物化到磁盤。

在這樣的編程模型下,Spark 在運行時的計算被劃分為兩個環節。

  1. 基於不同數據形態之間的轉換,構建計算流圖(DAG,Directed Acyclic Graph)
  2. 通過 Actions 類算子,以回溯的方式去觸發執行這個計算流圖

換句話説,開發者調用的各類 Transformations 算子,並不立即執行計算,當且僅當開發者調用 Actions 算子時,之前調用的轉換算子才會付諸執行。在業內,這樣的計算模式有個專門的術語,叫作“延遲計算”(Lazy Evaluation)。

二、Spark算子分類

在 RDD 的開發框架下,哪些算子屬於 Transformations 算子,哪些算子是 Actions 算子呢?

這裏給出一張自己在極客看的課程中的圖

img

三、Transform算子執行流程(源碼)

Map轉換算是 RDD 的經典轉換操作之一了.就以它開頭.Map的源碼如下:

image-20230224103912741

1. sc.clean(f)

首先掉了一個sc.clean(f) , 我們進到clean函數裏看下:

image-20230224104117191

註釋中明確提到了這個函數的功能:clean 整理一個閉包,使其可以序列化併發送到任務.

這裏的代碼有些多,大概知道這個函數的功能是這樣就ok了,閉包的問題會在另一篇文章裏仔細介紹

2. MapPartitionsRDD

進入到函數後源碼如下:

image-20230224105719758

這是一個MapPartitionsRDD。我們仔細看它的構成,從而來理解它是如何描述MapPartitionsRDD的.

2.1 var prev:RDD[T]

這裏的 prev 就是父RDD,f 則是Map中傳入的處理函數,除了這兩個就沒有了,也就是説明 RDD中沒有存儲具體的數據本身

這再次印證了轉換不會產生任何數據.它只是單純了記錄父RDD以及如何轉換的過程就完了,不會在轉換階段產生任何數據集

2.2 preservesPartitioning

preservesPartitioning 表示是否保持父RDD的分區信息.
如果為false(默認為false),則會對結果重新分區.也就是Map系默認都會分區
如果為true,保留分區. 則按照 firstParent 保留分區   

image-20230224110557226

可以看到根據 dependencies 找到其第一個父 RDD

image-20230224110711910

2.3 compute 計算邏輯
2.3.1 compute方法

RDD 抽象類要求其所有子類都必須實現 compute 方法,該方法接受的參數之一是一個Partition 對象,目的是計算該分區中的數據。

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以看到,compute 方法調用當前 RDD 內的第一個父 RDD 的 iterator 方法,該方的目的是拉取父 RDD 對應分區內的數據。

iterator 方法會返回一個迭代器對象,迭代器內部存儲的每個元素即父 RDD 對應分區內已經計算完畢的數據記錄。得到的迭代器作為 f 方法的一個參數。fRDD 類的 map 方法中指定,即實際的轉換函數。

compute 方法會將迭代器中的記錄一一輸入 f 方法,得到的新迭代器即為所求分區中的數據。

其他 RDD 子類的 compute 方法與之類似,在需要用到父 RDD 的分區數據時候,就會調用 iterator 方法,然後根據需求在得到的數據之上執行粗粒度的操作。換句話説,compute 函數負責的是父 RDD 分區數據到子 RDD 分區數據的變換邏輯。

2.3.2 iterator方法

此方法的實現在 RDD 這個抽象類中

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementers of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

interator首先檢查 存儲級別 storageLevel:此處可參考RDD持久化

如果存儲級別不是NONE, 説明分區的數據説明分區的數據要麼已經存儲在文件系統當中,要麼當前 RDD 曾經執行過 cachepersise 等持久化操作,此時需要從存儲空間讀取分區數據,調用 getOrCompute 方法

image-20230224114953570

getOrCompute 方法會根據 RDD 編號:id分區編號:partition.index 計算得到當前分區在存儲層對應的塊編號:blockId,通過存儲層提供的數據讀取接口提取出塊的數據。

代碼中的這幾句註釋給的非常到位,大致的判斷順序如下:

  • 塊命中的情況:也就是數據之前已經成功存儲到介質中,這其中可能是數據本身就在存儲介質中(比如通過讀取HDFS創建的RDD),也可能是 RDD 在經過持久化操作並且經歷了一次計算過程,這個時候我們就能成功讀取數據並將其返回
  • 塊未命中的情況:可能是數據已經丟失,或者 RDD 經過持久化操作,但是是當前分區數據是第一次被計算,因此會出現拉取得到數據為 None 的情況。這就意味着我們需要計算分區數據,繼續調用 RDDcomputeOrReadCheckpoint 方法來計算數據,並將計算得到的數據緩存到存儲介質中,下次就無需再重複計算。

如果當前RDD的存儲級別為 None,説明為未經持久化的 RDD,需要重新計算 RDD 內的數據,這時候調用 RDD 類的 computeOrReadCheckpoint 方法,該方法也在持久化 RDD 的分區獲取數據失敗時被調用。

image-20230224142431572

computeOrReadCheckpoint 方法會檢查當前 RDD 是否已經被標記成檢查點,如果未被標記成檢查點,則執行自身的 compute 方法來計算分區數據,否則就直接拉取父 RDD 分區內的數據。

需要注意的是,對於標記成檢查點的情況,當前 RDD 的父 RDD 不再是原先轉換操作中提供數據的父 RDD,而是被 Apache Spark 替換成一個 CheckpointRDD 對象,該對象中的數據存放在文件系統中,因此最終該對象會從文件系統中讀取數據並返回給 computeOrReadCheckpoint 方法

參考文章:

Cache 和 Checkpoint

Add a new Comments

Some HTML is okay.