Apache Spark最初在2009年誕生於美國加州大學伯克利分校的APM實驗室,並於2010年開源,如今是Apache軟件基金會下的頂級開源項目之一。Spark的目標是設計一種編程模型,能夠快速地進行數據分析。Spark提供了內存計算,減少了IO開銷。另外Spark是基於Scala編寫的,提供了交互式的編程體驗。經過10年的發展,Spark成為了炙手可熱的大數據處理平台,目前最新的版本是Spark3.0。本文主要是對Spark進行一個總體概覽式的介紹,後續內容會對具體的細節進行展開討論。本文的主要內容包括:
-
√Spark的關注度分析
-
√Spark的特點
-
√Spark的一些重要概念
-
√Spark組件概覽
-
√Spark運行架構概覽
-
√Spark編程小試牛刀
Spark的關注熱度分析
概況
下圖展示了近1年內在國內關於Spark、Hadoop及Flink的搜索趨勢
近1年內全球關於Spark、Hadoop及Flink的搜索趨勢,如下:
分析
從上面的4幅圖可以看出,近一年無論是在國內還是全球,關於Spark的搜索熱度始終是比Hadoop和Flink要高。近年來Flink發展迅猛,其在國內有阿里的背書,Flink天然的流處理特點使其成為了開發流式應用的首選框架。可以看出,雖然Flink在國內很火,但是放眼全球,熱度仍然不及Spark。所以學習並掌握Spark技術仍然是一個不錯的選擇,技術有很多的相似性,如果你已經掌握了Spark,再去學習Flink的話,相信你會有種似曾相識的感覺。
Spark的特點
- 速度快
Apache Spark使用DAG調度程序、查詢優化器和物理執行引擎,為批處理和流處理提供了高性能。 - 易於使用
支持使用Java,Scala,Python,R和SQL快速編寫應用程序。Spark提供了80多個高級操作算子,可輕鬆構建並行應用程序。 - 通用性
Spark提供了非常豐富的生態棧,包括SQL查詢、流式計算、機器學習和圖計算等組件,這些組件可以無縫整合在一個應用中,通過一站部署,可以應對多種複雜的計算場景 - 運行模式多樣
Spark可以使用Standalone模式運行,也可以運行在Hadoop,Apache Mesos,Kubernetes等環境中運行。並且可以訪問HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等多種數據源中的數據。
Spark的一些重要概念
- RDD
彈性分佈式數據集(Resilient Distributed Dataset),是分佈式內存的一個抽象概念,提供了一種高度受限的共享內存模型 - DAG
有向無環圖(Directed Acyclic Graph),反映RDD之間的依賴關係 - Application
用户編寫的Spark程序,由 driver program 和 executors 組成 - Application jar
用户編寫的應用程序JAR包 - Driver program
用程序main()函數的進程,可以創建SparkContext - Cluster manager
集羣管理器,屬於一個外部服務,用於資源請求分配(如:standalone manager, Mesos, YARN) - Deploy mode
部署模式,決定Driver進程在哪裏運行。如果是cluster模式,會由框架本身在集羣內部某台機器上啓動Driver進程。如果是client模式,會在提交程序的機器上啓動Driver進程 - Worker node
集羣中運行應用程序的節點Executor運行在Worknode節點上的一個進程,負責運行具體的任務,併為應用程序存儲數據 - Task
運行在executor中的工作單元 - Job
一個job包含多個RDD及一些列的運行在RDD之上的算子操作,job需要通過action操作進行觸發(比如save、collect等) - Stage
每一個作業會被分成由一些列task組成的stage,stage之間會相互依賴
Spark組件概覽
Spark生態系統主要包括Spark Core、SparkSQL、SparkStreaming、MLlib和GraphX等組件,具體如下圖所示:
- Spark Core
Spark core是Spark的核心,包含了Spark的基本功能,如內存計算、任務調度、部署模式、存儲管理等。SparkCore提供了基於RDD的API是其他高級API的基礎,主要功能是實現批處理。 - Spark SQL
Spark SQL主要是為了處理結構化和半結構化數據而設計的,SparkSQL允許用户在Spark程序中使用SQL、DataFrame和DataSetAPI查詢結構化數據,支持Java、Scala、Python和R語言。由於DataFrame API提供了統一的訪問各種數據源的方式(包括Hive、Avro、Parquet、ORC和JDBC),用户可以通過相同的方式連接任何數據源。另外,Spark SQL可以使用hive的元數據,從而實現了與Hive的完美集成,用户可以將Hive的作業直接運行在Spark上。Spark SQL可以通過spark-sql的shell命令訪問。 - SparkStreaming
SparkStreaming是Spark很重要的一個模塊,可實現實時數據流的可伸縮,高吞吐量,容錯流處理。在內部,其工作方式是將實時輸入的數據流拆分為一系列的micro batch,然後由Spark引擎進行處理。SparkStreaming支持多種數據源,如kafka、Flume和TCP套接字等 - MLlib
MLlib是Spark提供的一個機器學習庫,用户可以使用Spark API構建一個機器學習應用,Spark尤其擅長迭代計算,性能是Hadoop的100倍。該lib包含了常見機器學習算法,比如邏輯迴歸、支持向量機、分類、聚類、迴歸、隨機森林、協同過濾、主成分分析等。 - GraphX
GraphX是Spark中用於圖計算的API,可認為是Pregel在Spark上的重寫及優化,GraphX性能良好,擁有豐富的功能和運算符,能在海量數據上自如地運行復雜的圖算法。GraphX內置了許多圖算法,比如著名的PageRank算法。
Spark運行架構概覽
從整體來看,Spark應用架構包括以下幾個主要部分:
- Driver program
- Master node
- Work node
- Executor
- Tasks
- SparkContext
在Standalone模式下,運行架構如下圖所示:
Driver program
Driver program是Spark應用程序的main()函數(創建SparkContext和Spark會話)。運行Driver進程的節點稱之為Driver node,Driver進程與集羣管理器(Cluster Manager)進行通信,向Executor發送調度的task。
Cluster Manager
稱之為集羣管理器,主要用於管理集羣。常見的集羣管理器包括YARN、Mesos和Standalone,Standalone集羣管理器包括兩個長期運行的後台進程,其中一個是在Master節點,另外一個是在Work節點。在後續集羣部署模式篇,將詳細探討這一部分的內容,此處先有有一個大致印象即可。
Worker node
熟悉Hadoop的朋友應該知道,Hadoop包括namenode和datanode節點。Spark也類似,Spark將運行具體任務的節點稱之為Worker node。該節點會向Master節點彙報當前節點的可用資源,通常在每一台Worker node上啓動一個work後台進程,用於啓動和監控Executor。
Executor
Master節點分配資源,使用集羣中的Work node創建Executor,Driver使用這些Executor分配運行具體的Task。每一個應用程序都有自己的Executor進程,使用多個線程執行具體的Task。Executor主要負責運行任務和保存數據。
Task
Task是發送到Executor中的工作單元
SparkContext
SparkContext是Spark會話的入口,用於連接Spark集羣。在提交應用程序之前,首先需要初始化SparkContext,SparkContext隱含了網絡通信、存儲體系、計算引擎、WebUI等內容。值得注意的是,一個JVM進程中只能有一個SparkContext,如果想創建新的SparkContext,需要在原來的SparkContext上調用stop()方法。
Spark編程小試牛刀
Spark實現分組取topN案例
描述:在HDFS上有訂單數據order.txt文件,文件字段的分割符號",",其中字段依次表示訂單id,商品id,交易額。樣本數據如下:
Order_00001,Pdt_01,222.8
Order_00001,Pdt_05,25.8
Order_00002,Pdt_03,522.8
Order_00002,Pdt_04,122.4
Order_00002,Pdt_05,722.4
Order_00003,Pdt_01,222.8
問題:使用sparkcore,求每個訂單中成交額最大的商品id
實現代碼
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object TopOrderItemCluster {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("top n order and item")
val sc = new SparkContext(conf)
val hctx = new HiveContext(sc)
val orderData = sc.textFile("data.txt")
val splitOrderData = orderData.map(_.split(","))
val mapOrderData = splitOrderData.map { arrValue =>
val orderID = arrValue(0)
val itemID = arrValue(1)
val total = arrValue(2).toDouble
(orderID, (itemID, total))
}
val groupOrderData = mapOrderData.groupByKey()
/**
***groupOrderData.foreach(x => println(x))
***(Order_00003,CompactBuffer((Pdt_01,222.8)))
***(Order_00002,CompactBuffer((Pdt_03,522.8), (Pdt_04,122.4), (Pdt_05,722.4)))
***(Order_00001,CompactBuffer((Pdt_01,222.8), (Pdt_05,25.8)))
*/
val topOrderData = groupOrderData.map(tupleData => {
val orderid = tupleData._1
val maxTotal = tupleData._2.toArray.sortWith(_._2 > _._2).take(1)
(orderid, maxTotal)
}
)
topOrderData.foreach(value =>
println("最大成交額的訂單ID為:" + value._1 + " ,對應的商品ID為:" + value._2(0)._1)
/**
***最大成交額的訂單ID為:Order_00003 ,對應的商品ID為:Pdt_01
***最大成交額的訂單ID為:Order_00002 ,對應的商品ID為:Pdt_05
***最大成交額的訂單ID為:Order_00001 ,對應的商品ID為:Pdt_01
*/
)
//構造出元數據為Row的RDD
val RowOrderData = topOrderData.map(value => Row(value._1, value._2(0)._1))
//構建元數據
val structType = StructType(Array(
StructField("orderid", StringType, false),
StructField("itemid", StringType, false))
)
//轉換成DataFrame
val orderDataDF = hctx.createDataFrame(RowOrderData, structType)
// 將數據寫入Hive
orderDataDF.registerTempTable("tmptable")
hctx.sql("CREATE TABLE IF NOT EXISTS orderid_itemid(orderid STRING,itemid STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'")
hctx.sql("INSERT INTO orderid_itemid SELECT * FROM tmptable")
}
}
將上述代碼打包,提交到集羣運行,可以進入hive cli或者spark-sql的shell查看Hive中的數據。
總結
本文主要從整體上對Spark進行了介紹,主要包括Spark的搜索熱度分析、Spark的主要特點、Spark的一些重要概念以及Spark的運行架構,最後給出了一個Spark編程案例。本文是Spark系列分享的第一篇,可以先感受一下Spark的全局面貌,下一篇將分享Spark Core編程指南。