目錄

  • 前言
  • SparkConf類的構造方法
  • Spark配置項的存儲
  • 設置配置項
  • 直接用Set類方法設置
  • 通過系統屬性加載
  • 克隆SparkConf
  • 獲取配置項
  • 校驗配置項
  • 總結

前言

從本文開始,討論Spark基礎支撐子系統的具體實現。首先來看WordCount中最先出現的SparkConf。

上一篇已經講過,SparkConf類負責管理Spark的所有配置項。在我們使用Spark的過程中,經常需要靈活配置各種參數,來使程序更好、更快地運行,因此也必然要與SparkConf類頻繁打交道。瞭解它的細節不無裨益。

SparkConf類的構造方法

下面先來看一看SparkConf類的構造方法。為了讀起來清晰明瞭,可能會在不影響理解的前提下適當刪去無關代碼、註釋,並調整順序。

代碼#1.1 - o.a.s.SparkConf類的構造方法

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
  import SparkConf._

  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  def this() = this(true)

  // ...
}

代碼#1.1中的import語句是從SparkConf類的伴生對象中導入一些東西,它們主要管理過期的、舊版本兼容的配置項,以及日誌輸出。Scala中沒有Java的靜態(static)概念,類的伴生對象中維護的成員和方法就可以視為類的靜態成員和靜態方法。

SparkConf類有一個主構造方法參數loadDefaults,它指示是否要從Java系統屬性(即System.getProperties()取得的屬性)加載默認的與Spark相關的配置。

Spark配置項的存儲

SparkConf內部是採用ConcurrentHashMap來維護所有配置項鍵值的。

代碼#1.2 - o.a.s.SparkConf.settings字段

private val settings = new ConcurrentHashMap[String, String]()

這自然是考慮到了併發環境下的線程安全性問題。另外,它的鍵與值類型都為String,説明所有Spark配置項都以字符串形式存儲。

設置配置項

要設置Spark配置項,有以下三種方法。

直接用Set類方法設置

這是我們開發過程中最常用的方法。SparkConf中提供了多種多樣的Set類方法,最基礎的set()方法重載如下。

代碼#1.3 - o.a.s.SparkConf.set()方法

def set(key: String, value: String): SparkConf = {
    set(key, value, false)
  }

  private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
    if (key == null) {
      throw new NullPointerException("null key")
    }
    if (value == null) {
      throw new NullPointerException("null value for " + key)
    }
    if (!silent) {
      logDeprecationWarning(key)
    }
    settings.put(key, value)
    this
  }

可見配置項的鍵值都不能為null。並且包括set()在內的所有Set類方法都返回this,所以支持鏈式調用,這樣使用起來比較簡潔。

另外,還有一些方法可以快速設置常用配置項,比如上篇代碼#0.1中出現過的setMaster()與setAppName()。它們最終也會調用set()方法。

代碼#1.4 - o.a.s.SparkConf.setAppName()與setMaster()方法

def setMaster(master: String): SparkConf = {
    set("spark.master", master)
  }

  def setAppName(name: String): SparkConf = {
    set("spark.app.name", name)
  }
通過系統屬性加載

如果上述代碼#1.1中的loadDefaults參數為true,那麼SparkConf會從Java系統屬性中加載配置項。如果調用無參的輔助構造方法,即直接new SparkConf()的話,也會將loadDefaults設為true。Java系統屬性可以通過System.setProperty()方法在程序中動態設置。

來看代碼#1.1中調用的loadFromSystemProperties()方法。

代碼#1.5 - o.a.s.SparkConf.loadFromSystemProperties()方法

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }

它使用通用工具類Utils中的方法取得系統屬性,過濾出以字符串“spark.”為前綴的鍵,然後調用set()方法設置鍵值。由於系統屬性相關的參數是一次性初始化的,所以用Set類方法設置的值可以覆蓋它們。

克隆SparkConf

SparkConf類繼承了Cloneable特徵(trait,類似於Java接口的增強版)並覆寫了clone()方法,因此SparkConf是可以(深)克隆的。

代碼#1.6 - o.a.s.SparkConf.clone()方法

override def clone: SparkConf = {
    val cloned = new SparkConf(false)
    settings.entrySet().asScala.foreach { e =>
      cloned.set(e.getKey(), e.getValue(), true)
    }
    cloned
  }

雖然ConcurrentHashMap保證線程安全,不會影響SparkConf實例共享,但在高併發的情況下,鎖機制可能會帶來性能問題。我們就可以克隆SparkConf到多個組件中,以讓它們獲得相同的配置參數。

獲取配置項

獲取配置項只有一個途徑,即調用Get類方法。Get類方法同樣有很多實現,基礎的get()與getOption()如下所示。

代碼#1.7 - o.a.s.SparkConf.get()與getOption()方法

def get(key: String): String = {
    getOption(key).getOrElse(throw new NoSuchElementException(key))
  }

  def get(key: String, defaultValue: String): String = {
    getOption(key).getOrElse(defaultValue)
  }

  def getOption(key: String): Option[String] = {
    Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
  }

獲取配置項時,會同時檢查過期配置(getDeprecatedConfig()方法是伴生對象中定義的),並且會使用Scala Option來包裝返回的結果,對於有值(Some)和無值(None)的情況可以靈活處理。

另外,Get類方法中有不少涉及數據類型轉換和單位轉換,如getDouble()、getLong()、getSizeAsMb()、getTimeAsSeconds()等等,都是為了使用方便,不再贅述。

校驗配置項

SparkConf中有一個方法validateSettings(),用來校驗配置項。它的源碼很長,但是邏輯比較簡單,主要是對過期配置項進行警告,以及對非法設置或不兼容的配置項拋出異常。

限於篇幅原因,這裏就不貼出該方法的源碼了。感興趣的看官可以自己找找看,裏面校驗了大量之後一定會用到的配置項。

總結

本文通過SparkConf類的部分源碼,簡述了SparkConf的構造方法、配置存儲,以及設置、獲取、校驗配置項的方法邏輯。

SparkConf是SparkContext初始化的必備前提。瞭解了SparkConf,就可以分析複雜得多的SparkContext了。