Apache Spark最初在2009年誕生於美國加州大學伯克利分校的APM實驗室,並於2010年開源,如今是Apache軟件基金會下的頂級開源項目之一。Spark的目標是設計一種編程模型,能夠快速地進行數據分析。Spark提供了內存計算,減少了IO開銷。另外Spark是基於Scala編寫的,提供了交互式的編程體驗。經過10年的發展,Spark成為了炙手可熱的大數據處理平台,目前最新的版本是Spark3.0。本文主要是對Spark進行一個總體概覽式的介紹,後續內容會對具體的細節進行展開討論。本文的主要內容包括:



  • Spark的關注度分析

  • Spark的特點

  • Spark的一些重要概念

  • Spark組件概覽

  • Spark運行架構概覽

  • Spark編程小試牛刀



Spark的關注熱度分析



概況

下圖展示了近1年內在國內關於Spark、Hadoop及Flink的搜索趨勢

Spark大數據分析技術 spark 大數據_Hadoop

近1年內全球關於Spark、Hadoop及Flink的搜索趨勢,如下:

Spark大數據分析技術 spark 大數據_搜索_02


分析

從上面的4幅圖可以看出,近一年無論是在國內還是全球,關於Spark的搜索熱度始終是比Hadoop和Flink要高。近年來Flink發展迅猛,其在國內有阿里的背書,Flink天然的流處理特點使其成為了開發流式應用的首選框架。可以看出,雖然Flink在國內很火,但是放眼全球,熱度仍然不及Spark。所以學習並掌握Spark技術仍然是一個不錯的選擇,技術有很多的相似性,如果你已經掌握了Spark,再去學習Flink的話,相信你會有種似曾相識的感覺。


Spark的特點

  • 速度快
    Apache Spark使用DAG調度程序、查詢優化器和物理執行引擎,為批處理和流處理提供了高性能。
  • 易於使用
    支持使用Java,Scala,Python,R和SQL快速編寫應用程序。Spark提供了80多個高級操作算子,可輕鬆構建並行應用程序。
  • 通用性
    Spark提供了非常豐富的生態棧,包括SQL查詢、流式計算、機器學習和圖計算等組件,這些組件可以無縫整合在一個應用中,通過一站部署,可以應對多種複雜的計算場景
  • 運行模式多樣
    Spark可以使用Standalone模式運行,也可以運行在Hadoop,Apache Mesos,Kubernetes等環境中運行。並且可以訪問HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等多種數據源中的數據。


Spark的一些重要概念

  • RDD
    彈性分佈式數據集(Resilient Distributed Dataset),是分佈式內存的一個抽象概念,提供了一種高度受限的共享內存模型
  • DAG
    有向無環圖(Directed Acyclic Graph),反映RDD之間的依賴關係
  • Application
    用户編寫的Spark程序,由 driver program 和 executors 組成
  • Application jar
    用户編寫的應用程序JAR包
  • Driver program
    用程序main()函數的進程,可以創建SparkContext
  • Cluster manager
    集羣管理器,屬於一個外部服務,用於資源請求分配(如:standalone manager, Mesos, YARN)
  • Deploy mode
    部署模式,決定Driver進程在哪裏運行。如果是cluster模式,會由框架本身在集羣內部某台機器上啓動Driver進程。如果是client模式,會在提交程序的機器上啓動Driver進程
  • Worker node
    集羣中運行應用程序的節點Executor運行在Worknode節點上的一個進程,負責運行具體的任務,併為應用程序存儲數據
  • Task
    運行在executor中的工作單元
  • Job
    一個job包含多個RDD及一些列的運行在RDD之上的算子操作,job需要通過action操作進行觸發(比如save、collect等)
  • Stage
    每一個作業會被分成由一些列task組成的stage,stage之間會相互依賴


Spark組件概覽

Spark生態系統主要包括Spark Core、SparkSQL、SparkStreaming、MLlib和GraphX等組件,具體如下圖所示:

Spark大數據分析技術 spark 大數據_Hadoop_03

  • Spark Core
    Spark core是Spark的核心,包含了Spark的基本功能,如內存計算、任務調度、部署模式、存儲管理等。SparkCore提供了基於RDD的API是其他高級API的基礎,主要功能是實現批處理。
  • Spark SQL
    Spark SQL主要是為了處理結構化和半結構化數據而設計的,SparkSQL允許用户在Spark程序中使用SQL、DataFrame和DataSetAPI查詢結構化數據,支持Java、Scala、Python和R語言。由於DataFrame API提供了統一的訪問各種數據源的方式(包括Hive、Avro、Parquet、ORC和JDBC),用户可以通過相同的方式連接任何數據源。另外,Spark SQL可以使用hive的元數據,從而實現了與Hive的完美集成,用户可以將Hive的作業直接運行在Spark上。Spark SQL可以通過spark-sql的shell命令訪問。
  • SparkStreaming
    SparkStreaming是Spark很重要的一個模塊,可實現實時數據流的可伸縮,高吞吐量,容錯流處理。在內部,其工作方式是將實時輸入的數據流拆分為一系列的micro batch,然後由Spark引擎進行處理。SparkStreaming支持多種數據源,如kafka、Flume和TCP套接字等
  • MLlib
    MLlib是Spark提供的一個機器學習庫,用户可以使用Spark API構建一個機器學習應用,Spark尤其擅長迭代計算,性能是Hadoop的100倍。該lib包含了常見機器學習算法,比如邏輯迴歸、支持向量機、分類、聚類、迴歸、隨機森林、協同過濾、主成分分析等。
  • GraphX
    GraphX是Spark中用於圖計算的API,可認為是Pregel在Spark上的重寫及優化,GraphX性能良好,擁有豐富的功能和運算符,能在海量數據上自如地運行復雜的圖算法。GraphX內置了許多圖算法,比如著名的PageRank算法。


Spark運行架構概覽

從整體來看,Spark應用架構包括以下幾個主要部分:

  • Driver program
  • Master node
  • Work node
  • Executor
  • Tasks
  • SparkContext

Standalone模式下,運行架構如下圖所示:

Spark大數據分析技術 spark 大數據_Hadoop_04



Driver program

Driver program是Spark應用程序的main()函數(創建SparkContext和Spark會話)。運行Driver進程的節點稱之為Driver node,Driver進程與集羣管理器(Cluster Manager)進行通信,向Executor發送調度的task。



Cluster Manager

稱之為集羣管理器,主要用於管理集羣。常見的集羣管理器包括YARN、Mesos和Standalone,Standalone集羣管理器包括兩個長期運行的後台進程,其中一個是在Master節點,另外一個是在Work節點。在後續集羣部署模式篇,將詳細探討這一部分的內容,此處先有有一個大致印象即可。



Worker node

熟悉Hadoop的朋友應該知道,Hadoop包括namenode和datanode節點。Spark也類似,Spark將運行具體任務的節點稱之為Worker node。該節點會向Master節點彙報當前節點的可用資源,通常在每一台Worker node上啓動一個work後台進程,用於啓動和監控Executor。



Executor

Master節點分配資源,使用集羣中的Work node創建Executor,Driver使用這些Executor分配運行具體的Task。每一個應用程序都有自己的Executor進程,使用多個線程執行具體的Task。Executor主要負責運行任務和保存數據。



Task

Task是發送到Executor中的工作單元



SparkContext

SparkContext是Spark會話的入口,用於連接Spark集羣。在提交應用程序之前,首先需要初始化SparkContext,SparkContext隱含了網絡通信、存儲體系、計算引擎、WebUI等內容。值得注意的是,一個JVM進程中只能有一個SparkContext,如果想創建新的SparkContext,需要在原來的SparkContext上調用stop()方法。

Spark編程小試牛刀



Spark實現分組取topN案例

描述:在HDFS上有訂單數據order.txt文件,文件字段的分割符號",",其中字段依次表示訂單id,商品id,交易額。樣本數據如下:

Order_00001,Pdt_01,222.8
Order_00001,Pdt_05,25.8
Order_00002,Pdt_03,522.8
Order_00002,Pdt_04,122.4
Order_00002,Pdt_05,722.4
Order_00003,Pdt_01,222.8

問題:使用sparkcore,求每個訂單中成交額最大的商品id



實現代碼

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object TopOrderItemCluster {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("top n order and item")
    val sc = new SparkContext(conf)
    val hctx = new HiveContext(sc)
    val orderData = sc.textFile("data.txt")
    val splitOrderData = orderData.map(_.split(","))
    val mapOrderData = splitOrderData.map { arrValue =>
      val orderID = arrValue(0)
      val itemID = arrValue(1)
      val total = arrValue(2).toDouble
      (orderID, (itemID, total))
    }
    val groupOrderData = mapOrderData.groupByKey()

    /**
      ***groupOrderData.foreach(x => println(x))
      ***(Order_00003,CompactBuffer((Pdt_01,222.8)))
      ***(Order_00002,CompactBuffer((Pdt_03,522.8), (Pdt_04,122.4), (Pdt_05,722.4)))
      ***(Order_00001,CompactBuffer((Pdt_01,222.8), (Pdt_05,25.8)))
      */
   
    val topOrderData = groupOrderData.map(tupleData => {
      val orderid = tupleData._1
      val maxTotal = tupleData._2.toArray.sortWith(_._2 > _._2).take(1)
      (orderid, maxTotal)
    }
    )
    topOrderData.foreach(value =>
      println("最大成交額的訂單ID為:" + value._1 + " ,對應的商品ID為:" + value._2(0)._1)

      /**
        ***最大成交額的訂單ID為:Order_00003 ,對應的商品ID為:Pdt_01
        ***最大成交額的訂單ID為:Order_00002 ,對應的商品ID為:Pdt_05
        ***最大成交額的訂單ID為:Order_00001 ,對應的商品ID為:Pdt_01
        */
      
    )
    //構造出元數據為Row的RDD
    val RowOrderData = topOrderData.map(value => Row(value._1, value._2(0)._1))
    //構建元數據
    val structType = StructType(Array(
      StructField("orderid", StringType, false),
      StructField("itemid", StringType, false))
    )
    //轉換成DataFrame
    val orderDataDF = hctx.createDataFrame(RowOrderData, structType)
   // 將數據寫入Hive
    orderDataDF.registerTempTable("tmptable")
    hctx.sql("CREATE TABLE IF NOT EXISTS orderid_itemid(orderid STRING,itemid STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'")
      hctx.sql("INSERT INTO orderid_itemid SELECT * FROM tmptable")
  }

}

將上述代碼打包,提交到集羣運行,可以進入hive cli或者spark-sql的shell查看Hive中的數據。

總結

本文主要從整體上對Spark進行了介紹,主要包括Spark的搜索熱度分析、Spark的主要特點、Spark的一些重要概念以及Spark的運行架構,最後給出了一個Spark編程案例。本文是Spark系列分享的第一篇,可以先感受一下Spark的全局面貌,下一篇將分享Spark Core編程指南。