動態

詳情 返回 返回

Spark SQL分析層優化 - 動態 詳情

導讀:本期是《深入淺出Apache Spark》系列分享的第四期分享,第一期分享了Spark core的概念、原理和架構,第二期分享了Spark SQL的概念和原理,第三期則為Spark SQL解析層的原理和優化案例。本次分享內容主要是Spark SQL分析層的原理和優化的案例,且此優化案例是對於理解分析層原理很重要的。
本期介紹會圍繞下面五點展開: 
前情提要 
Spark SQL 分析層原理 
優化案例 
總結 
Q&A
►►►前情提要

首先介紹數新智能與Spark有深度關係的兩個產品。
圖片

賽博數智引擎,即CyberEngine,它可以幫助用户管理、部署、運維各種組件。該引擎旨在打造一款雲原生的數據服務底座,同時支持各種任務的調度,包括存儲等各方面的管理和運維工作。​
圖片

​賽博數據平台,包括數據治理、數據開發、數據探索、數據查詢等各種能力,其中Spark是它支持的重要數據引擎之一。
圖片

上一講主要介紹了Spark SQL解析層的優化。Spark SQL 解析層實際上是一條SQL執行生命週期中的第一個階段,首先它需要被解析成一棵抽象語法樹。解析後的抽象語法樹由於缺少與元數據的綁定,所以我們無法知道UnresolvedAttribute是一個數據庫表的列,還是Parquet文件中的一個元數據字段;我們也無法知道或者是一UnresolvedRelation是一張數據庫表,還是HDFS上的文件目錄。這些分析依賴於Spark SQL的分析層,也就是本次要分享的內容。

►►►Spark SQL 分析層原理
1.  Spark SQL分析層原理
圖片
剛才回顧瞭解析層,或者説是Spark裏面的Parser,它能夠把輸入的SQL 文本轉換成一棵解析後的抽象語法樹,那麼下一步就需要分析層Analyzer,即分析器對抽象語法樹進行分析,把抽象語法樹跟元數據信息進行綁定,知道數據的位置信息、元數據信息等等,之後才能有機會真正地查詢、執行或者修改它。分析器分析的過程依賴於一些元數據的組件,如SessionCatalog ,在SQL組件中屬於元老級的組件。在Spark 2.5版本中引入的Data Source V2中一個很重要的組件叫CatalogManager,其具有用户註冊機制,可以讓用户註冊自己的Catalog。可以理解為用户通過使用Spark內置的SessionCatalog,就可以擁有自己的Catalog。在多個Catalog註冊的情況下,可以實現多個數據源之間的聯邦,擁有更豐富的業務場景,更加靈活。有些人覺得Spark內置的SessionCatalog對於Hive的支持會有一些侷限,那麼就可以用CatalogManager的方式去實現。 不管是老的SessionCatalog,還是新的CatalogManager,實際上最重要的是對Catalog、數據庫、表、字段等元數據信息的管理。當然這裏提到的表只是從Catalog的角度抽象的一個概念,實際上它未必是一個真的表,例如一個以Kafka作為數據源的Catalog,你可以把Kafka裏面的一個Topic抽象成一個表,或者也可以根據這個邏輯把Topic看作是一個數據庫。總之,這是一個很靈活、很便利的新擴展,是一個為了幫助分析器獲取元數據信息的組件。在分析層中除了解決元數據信息的綁定之外,還需要去解決一些其他的問題,如內置函數的綁定。比如在SQL裏面包含一些函數,這些函數有些是無參數的,有些是帶有多個參數的。那麼這些函數是不是都是Spark本身就支持的呢?如果輸入的是一個ClickHouse的函數呢?因為有些函數肯定是Spark不支持或不認識的,這是Spark在分析過程中首先要去分析的內容,它不能跳過分析直接去執行這個函數。對函數的分析過程,Spark依賴於FunctionRegistry這個函數註冊表組件,函數註冊表裏面綁定了一些Spark支持的內置函數。如果用户想要實現一些Spark沒有但想要擴展的功能的時候,Spark也提供了用户自定義函數的支持,有新舊兩套接口去實現UDF。當用户自己去實現UDF的時候,實際上是註冊到整個Spark的元數據體系裏面,或者説是函數的體系裏面,分析器首先會從函數註冊表裏去查找相關函數,來確保SQL執行的合法性和安全性。這裏介紹了分析層在SQL整個執行流程中的作用,接下里會從更深入的角度去介紹分析層的一些內容。​
圖片
剛剛提到解析後的查詢樹或者説邏輯執行樹、抽象語法樹需要跟元數據綁定,分析器就是負責這些處理的角色。之前在第一、二講中,整體介紹Spark SQL的執行流程時,介紹過兩個很核心的抽象概念——規則(即Rule)和規則執行器(即RuleExecutor)。本次分享分析器,以及下一講將要分享的優化器,實際上就是規則執行器的具體應用。分析器裏面的每一個分析規則、優化規則都是規則的實現。規則執行器能夠幫助邏輯計劃去應用這些規則,這就是規則執行器的簡單原理。當然規則還需要一定的匹配過程,比如有一些規則要解析表或者數據源的位置,有一些可能是為了識別某些屬性是一個別名還是表的名字、表的字段,每個規則都有具體的針對性工作。最後需要提一提——規則的執行是有細節的。在第二講中分享規則的時候,提到除了規則信息的抽象之外,就是規則的執行。例如處理抽象語法樹,對整棵樹應用一個規則的時候,它需要一定的封裝邏輯。
圖片
在Spark SQL的執行流程中,分析器本身就是規則執行器的具體實現,它裏面會應用很多規則,這些規則可能是分批的,每一批裏面又有很多規則,這些規則有些可能只執行一次,有些可能執行多次。而後分析器會對解析後的語法樹進行處理,也可以理解是應用了一系列分析規則之後,它就會變成分析後的邏輯語法樹。

2.  分析規則介紹
圖片
本節介紹一個簡單的例子,這個例子可以幫助大家快速瞭解如何去寫一個規則,為將來工作中使用Spark SQL對一些業務場景進行增強分析提供參考。例如ResolveRelations分析規則,如select xxx from Tab1,解析層會把這個SQL解析成抽象語法樹,這裏Tab1是一張真的表,但是解析器是不知道的,它只會把它簡單地封裝成一個UnresolvedRelation,即未分析的Relation,那麼這個Relation究竟是一個表還是其他什麼東西,是需要分析器來處理。分析器主要就是依靠ResolveRelations這個分析規則進行處理,上圖展示了分析規則的代碼片段,實際這個規則的代碼遠比展示的要多,但因為其他代碼對本次介紹的內容來説是一些干擾,所以就忽略了。從代碼片段可以看到它調用了plan.resolveOperatorsUpWithPruning的遍歷算法,resolveOperatorsUp是一個自底向上的過程,如果跟樹的遍歷算法類比的話,可以理解為它是深度遍歷的一種方式。首先它會幫你找到這棵樹的第一個葉子節點,但它不會立刻去執行計算,它還會往樹的更深處去找,直到找到最後的葉子節點,等這個葉子節點處理完了之後才會往上卷,這個就是對應的深度遍歷算法。但是既然Up了,為什麼後面還有一個WithPruning,Pruning的中文意思是裁剪,這裏為什麼要這麼用,後面再詳細介紹。拋開這個遍歷的過程,深度遍歷的方式會找到每一個UnresolvedRelation,每找到一個還不知道具體是幹什麼的relation之後,就會首先去調用 ResolveRelation的方法,去分析Tab1是一個文件目錄還是一個表,或者是其他什麼,如果調用完之後發現都不是,可能會返回一個none,代表它不是一個表,也不是Hadoop裏面的一個目錄。之後就會交給一個叫ResolveViews的方法,因為它可能是一個視圖,所以調用ResolveViews 的方法去獲取views的元信息。ResolveViews方法的代碼在這裏沒有展示,因為對本次的主題來説不是很重要,本次主要是想展示ResolveRelations這個分析規則模板實現的套路。

►►►優化案例1. 優化前 - 3.2.0以前的Rule的全面遍歷
圖片
針對樹的遍歷,Spark 3.2.0版本引入了一個比較大的優化,這個優化對於分析層來説也是相當重要。剛剛在介紹ResolveRelations時,採用了Spark 3.2.0之後的代碼,其中使用的是帶有 Withpruning後綴的operatorsUpWithpruning方法,表示有裁剪的意思,那麼它究竟是怎麼裁剪的呢?回到Spark 3.2.0以前這塊代碼的實現。 Spark在分析層最主要依賴於以下幾個方法,在3.2.0版本之前resolveOperators方法默認調用resolveOperatorsDown方法,resolveOperatorsDown是一個廣度優先的遍歷方法,此外,還有采用深度遍歷的resolveOperatorsUp方法,這是Spark 3.2.0以前分析層遍歷樹的基本原理。在應用規則實現遍歷樹的時候,不管是遍歷整棵樹,還是隨着遍歷的層次不同,遍歷整個樹的一部分或者是一個葉子節點,都會完完整整地去遍歷。可以理解為這棵樹不管有多深,每一層有多少個節點,在遍歷這棵樹的時候,它會把規則應用到整棵樹的每一個節點。Spark SQL中有很多分析規則,有些分析規則可能只針對特殊的模式或者特殊的場景,如果用户寫的SQL跟這個規則完全沒有關係,並且SQL本身很複雜,對應的樹的深度很高、體量龐大,如果不斷地應用這個規則去執行,就會浪費大量的CPU時鐘。
圖片
resolveOperators等三個方法,主要是針對整個邏輯計劃層面,而邏輯計劃是一個樹的結構,除了邏輯計劃之外,Spark裏面的表達式本身也是樹的結構。舉個最簡單的例子,例如在where條件裏輸入a>10 and b<0,這就是由兩個表達式用and組成的一棵樹,除了and組成樹之外,a > 10本身也是顆樹,大於號是一個父節點,大於號下面的a字段和10屬性值是兩個葉子。在Spark 3.2.0之前,表達式的轉換通過上圖左邊展示的transformExpressions、transformExpressionsUp和transformExpressionsDown。 而對邏輯計劃的處理則依賴上圖右邊展示的resolveOperators、resolveOperatorsUp和resolveOperatorsDown。2.優化後 - Support travelsal pruning in transform/resolve 
圖片
Spark 3.2.0 之後的這個優化,我個人認為是分析層自Spark SQL誕生以來做的最重要的優化之一。大家有興趣可以去看Spark社區任務裏面一些具體的討論,包括推進的過程和commit的一些代碼。這個優化主要包括以下三部分:首先是TreePattern,它是一個枚舉值,不管是表達式、還是邏輯計劃,它都會匹配對應的模式去實現;第二是TreePatternBits,是一個可以便於TreePattern快速查找的數據結構;第三是帶有裁剪功能的Transform函數。3.TreePattern
圖片
TreePattern是一個用Scala實現的枚舉類模板,實際的枚舉類會非常多,但本次為了介紹方便,只列舉了幾個枚舉值。例如AGGREGATE_EXPRESSION,它跟在Spark表達式裏面很重要的聚合表達式相關。如果表達式裏面包含別名,那就會用到ALIAS。再比如上文列舉的and邏輯運算表達式,這裏也會有AND和and表達式相對應。本次分享的最後會具體介紹EXPRESSION_WITH_RANDOM_SEED枚舉值。
4.TreePatternBits
圖片
這個接口的出現是為了能夠快速地幫助一個樹形結構(不管是邏輯計劃的樹,還是一個表達式的樹)與具體的Treepattern進行匹配校驗。從這個trait的代碼可以看到它裏面用了 BitSet 這個通過位運算來進行快速判別的數據結構。實際上每個TreePattern本身都有一個ID屬性,可以通過ID來進行判定。比如這裏有三個方法,第一個方法叫containsPattern,這個方法的作用就判定當前這棵樹有沒有某個pattern,如果有這個pattern 才會處理。比如剛介紹的ResolveRelations,它肯定會有與relation相關的pattern,只有找到邏輯樹裏面與relation對應的pattern之後,才會進一步去執行規則。當然,有些條件可能比較苛刻,比如像優化器層做優化,有Limit算子(字面上可以理解為SQL裏的limit)、Sort算子(字面上可以理解為SQL裏的order by)。 當order by 加 limit 同時出現時,就能應用很多優化規則。如果只有limit出現,或者只有order by 出現,那它就不需要應用這個規則。所以第二個方法叫containsAllPatterns,當同時滿足多個pattern時,或者可以理解為通過它快速判斷語法樹同時有多個節點時,才會去應用某個規則。第三種方法是containsAnyPattern,當出現任何一種情況時,就可以應用某個規則。據我理解,上面提到的Limit算子、Sort算子、Aggregation算子以及各種各樣的表達式在Spark 3.2.0之後,絕大部分或者説全部都已經實現了這個特質。
5.帶有裁剪功能的Transform函數
圖片
上文講resolveOperators方法的時候,它裏面默認是調用resolveOperatorsDown,這是Spark 3.2.0之前的方法,而現在已經換成了resolveOperatorsWithPruning,雖然還沒有體現遍歷樹的方式,也沒有説是深度還是廣度,但已經開始去調用這個帶有裁剪功能的解析操作了。可以看到resolveOperatorsWithPruning裏面默認調用了resolveOperatorsDownWithPruning,從深度遍歷和廣度遍歷的角度來説,這個邏輯還是不變的,只不過它替換成了帶有裁剪功能的一個新的實現或者調用而已。同樣resolveOperatorsUp也替換成了resolveOperatorsUpWithPruning。 這樣也是為了兼容Spark之前的老版本,以確保用户升級到SPARK 3.2.0後原來生產的代碼也可以正常執行,保證用户的可遷移性。
6.resolveOperatorsDownWithPruning
圖片
以resolveOperatorsDownWithPruning為例來深入的剖析裁剪具體是怎麼做到的,為什麼要裁剪?之前有提到在Spark 3.2.0之前,不管語法樹跟某個規則有沒有關係,規則都會應用一遍,因此浪費了大量CPU時鐘。而從resolveOperatorsDownWithPruning的代碼來看,這個方法的簽名帶有一個condition,這個condition就是以TreePatternBits特質為入參,也可以認為這裏傳遞進來的是一個邏輯節點或一個邏輯計劃樹,然後返回一個布爾值。 就是説condition會判定一棵樹符不符合這個規則,是否需要把這個規則應用到這棵樹上,代碼中cond.apply(self) 意思是應用到它自己,把規則本身作為參數傳遞給一個匿名函數,傳遞完之後,TreePatternBits來判斷到底有沒有符合這個樹的TreePattern。只有在符合了這個條件之後,才會進到最外層if 的框裏,接下來後面還有一個isRuleIneffective,進入這個方法體之後,它會對裏面的葉子節點進行處理,即調用resolveOperatorsDownWithPruning,處理完之後,再一層一層地往裏調用。可以理解為把整個樹的子樹一層一層的傳遞給resolveOperatorsDownWithPruning。所以基於現在這個邏輯,所有的樹和子樹都會被傳遞,但不一定會應用某個規則,而之前就是什麼都不管就直接開始執行分析規則,這就是它們優化前後的區別。
7.resolveOperatorsUpWithPruning
圖片
resolveOperatorsUpWithPruning方法和resolveOperatorsDownWithPruning其實大同小異,都是通過condition或者説實現了treepatternbits特質的邏輯計劃節點來作為入參,然後來判斷每個pattern是否滿足規則。以上兩個方法的介紹中,沒有對代碼中的 isRuleIneffective(ruleId) 和markRuleAsIneffective(ruleId) 進行介紹,其實它們跟模式匹配這種裁剪的方式在思想上是類似的,它們能夠進一步幫助處理裁剪的規則,只不過方法實現上有所區別。這兩個就留給感興趣的讀者自己去看看它們的實現。
8.優化後表達式樹的轉換
圖片
上面着重講了resolve裁剪的實現,其對應的表達式也有相應的變化,比如左側這三個新的函數或者接口都帶有WithPruning。表達式的優化也是類似的,根據之前對邏輯計劃的裁剪優化的分析,相信看懂它不會有什麼問題,感興趣的讀者可以去看看它的實現過程。
9.RevolveRandomSeed
圖片
上面給大家分享了本次優化帶來的三個新的組件,一是TreePattern枚舉類,二是TreePatternBits特質,可以快速判別一個邏輯計劃節點跟某一個規則是否匹配的一個接口,三是增加了一批便利的帶有裁剪功能的轉換實現。那麼這三者最後是怎麼應用到一起呢?我從Spark的源碼中挑了一個邏輯較為簡單的RevolveRandomSeed規則來講解。從這個規則的名字可以看到它是一個解決隨機種子表達式的特殊分析處理規則。從這個分析規則的代碼中可以看到,它把之前的resolveOperatorsUp方法替換成為了帶有裁剪功能的resolveOperatorsUpWithPruning,同時傳遞參數的時候,調用了containsPattern,這個contentsPattern就是上面提的那個數據結構特質裏面用於快速判別一個邏輯計劃跟一個模式是否匹配的數據結構,通過它來快速判斷這個邏輯計劃裏面包不包含EXPRESSION_WITH_RANDOM_SEED模式。如果包含了,才會走裏邊的邏輯,如果不包含,那麼就直接返回,不會有過多的消耗。再往下看就是針對邏輯計劃的介紹,可以看到進入這個分析規則的內部,以前的transformExpressionsUp現在也替換成了帶有裁剪功能的transformExpressionsUpWithPruning,並調用了containsPattern的方法來進行判斷。以上這個例子我相信已經足夠簡單的讓大家去理解Spark 3.2.0版本帶來的分析層、優化層的重大優化。
►►►總結
圖片
以上就是本次分析層的介紹。最後這個標題叫Stop earlier without traversing the entire tree,感興趣的讀者可以去Spark官網查看其詳細內容,在此不做展開介紹。

►►►Q&A
Q1:在實際應用中應該如何監控和調試 Spark SQL 查詢計劃,以確保 ResolveRelations規則的正確執行?
A:ResultRelations是Spark分析器裏很重要的一個分析規則,它能分析relation對應的這個文件包括哪些信息,這些都是Spark社區經過了大量增強的一個很健壯的東西。 怎麼監控它能正確執行?假如它不能夠正確執行,那可能就是Spark的bug 了,可以反饋給Spark社區,讓Spark社區去解決。

Q2:邏輯計劃算子樹規則應用的優化效果,在批處理的背景下,用户應該是無感知的嗎?
A:針對用户的SQL場景,大多數情況下用户肯定是無感知的,因為這塊優化畢竟只是針對邏輯計劃進行的優化,邏輯計劃優化帶來的性能提升可能沒有針對物理算子進行的優化帶來的提升那麼明顯,但是也不全然如此。比如説用到一些服務化的Spark的時候,如HiveThriftServer2,Kyuubi等長運行週期的服務,如果它要處理的SQL併發很高,且用户的SQL本身又很複雜,那麼這種優化用户應該還是有感知的。

Q3:同樣的數據量和運算量,用Spark做離線批量計算,用Flink做實時流計算,能節省多少資源,資源主要是節省在哪些地方?
A:我覺得節約最主要還是離線批量計算本身帶來的優勢。例如網絡傳輸消息,從時效性來説,如果只傳輸一條信息,它會立即去傳輸,時效比較快,而從整個批量的角度來説,每傳一個消息,就有一個數據包,這些消息加起來的數據量實際上比成批去傳輸的數據量要更大,如果網絡的連接不是長連接而是短連接,還要大量的在連接建立方面的開銷。所以Spark或者Hive批處理場景的誕生就是為了提高數據執行的處理效率,包括節省計算資源、提高數據的吞吐量等。從這個角度來説,按Filnk現在的發展也可以按批去處理一些數據,並不是來一條消息就立馬下發的模式,實際上它現在也有消息的緩存,不管它是發送端還是接收端,都有緩存隊列這種類似批的概念。當然具體的資源節省可能還是需要用實際的數據進行測試,以上只是偏理論的分析。

Q4:邏輯計劃優化,除了常見的謂詞下推、列剪裁、常量累加,還有哪些優化?
A:其實優化是非常多的,比如你剛剛説的謂詞下推,還有Project 下推、聚合下推。而常量,除了你説的那種常量之外,還有對random表達式進行比較運算的優化,Limit下推、Limit和Sort這種top n場景的優化,對規則的重寫優化等等。

user avatar u_13482808 頭像 zyx178 頭像 zeran 頭像 actionopensource 頭像 yishenjiroudekaixinguo 頭像 tdengine 頭像 libin9iai 頭像
點贊 7 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.