目錄
- 一、Spark Core
- 1、什麼是Spark?特點
- 二、安裝和部署Spark、Spark 的 HA
- 1、spark體系結構
- 2、spark的搭建
- 3、Spark的 HA
- 三、執行Spark的任務:兩個工具
- 1、spark-submit:用於提交Spark的任務
- 2、spark-shell 相當於REPL
- 四、WordCount(scala版本和java版本)
- 1、scala版本的WordCount
- 2、java版本的WordCount
- 五、分析Spark的任務流程
- 1、分析WordCount程序處理過程
- 2、Spark調度任務的過程
- 六、RDD和RDD特性、RDD的算子
- 1、RDD:彈性分佈式數據集
- 2、 算子
- 3、RDD的集合運算
- 4、分組操作:reduceByKey
- 5、cogroup
- 6、reduce操作(Action)
- 7、需求:按照value排序
- 七、RDD的高級算子
- 1、mapPartitionsWithIndex
- 2、aggregate
- 八、編程案例
- 1、分析日誌
- 2、創建自定義分區
- 3、使用JDBCRDD 操作數據庫
- 4、操作數據庫:把結果存放到數據庫中
Spark Core
Spark生態圈:
Spark Core : RDD(彈性分佈式數據集)
Spark SQL
Spark Streaming
Spark MLLib :協同過濾,ALS,邏輯迴歸等等 –> 機器學習
Spark Graphx : 圖計算
一、Spark Core
1、什麼是Spark?特點
Apache Spark™ is a unified analytics engine for large-scale data processing.
特點:快、易用、通用性、兼容性(完全兼容Hadoop)
快:快100倍(Hadoop 3 之前)
易用:支持多種語言開發
通用性:生態系統全
易用性:兼容Hadoop
二、安裝和部署Spark、Spark 的 HA
1、spark體系結構
Spark的運行方式
Yarn
Standalone:本機調試(demo)
Worker:從節點。每個服務器上,資源和任務的管理者。只負責管理一個節點
執行過程:
一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD
客户端:Driver Program 提交任務到集羣中
1)spark-submit
2)spark-shell
2、spark的搭建
1)準備工作:JDK 配置主機名 免密碼登錄
2)偽分佈式模式
在一台虛擬機上模擬分佈式環境(Master和Worker在一個節點上)
配置spark-env.sh
vi spark-env.sh
配置slaves
vi slaves
hsiehchou121
瀏覽器訪問hsiehchou121:8080
在spark中使用scala語言
3)全分佈式環境
修改slave文件 拷貝到其他三台服務器 啓動
3、Spark的 HA
回顧HA(高可用)
(*)HDFS Yarn Hbase Spark 主從結構
(*)單點故障
(1)基於文件目錄的單點恢復
主要用於開發或測試環境。當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就可以通過重新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息
基於文件系統的單點恢復,主要是在spark-en.sh裏對SPARK_DAEMON_JAVA_OPTS設置
|
配置參數
|
參考值
|
|
spark.deploy.recoveryMode
|
設置為FILESYSTEM開啓單點恢復功能,默認值:NONE
|
|
spark.deploy.recoveryDirectory
|
Spark 保存恢復狀態的目錄
|
參考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery”
(*)本質:還是隻有一個主節點Master,創建了一個恢復目錄,保存集羣狀態和任務的信息
當Master掛掉,重新啓動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態
用途:這個只用於開發和測試,但是生產使用用zookeeper
(2)基於Zookeeper :和Hadoop類似
ZooKeeper提供了一個Leader Election機制,利用這個機制可以保證雖然集羣存在多個Master,但是隻有一個是Active的,其他的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。由於集羣的信息,包括Worker, Driver和Application的信息都已經持久化到ZooKeeper,因此在切換的過程中只會影響新Job的提交,對於正在進行的Job沒有任何的影響
|
配置參數
|
參考值
|
|
spark.deploy.recoveryMode
|
設置為ZOOKEEPER開啓單點恢復功能,默認值:NONE
|
|
spark.deploy.zookeeper.url
|
ZooKeeper集羣的地址
|
|
spark.deploy.zookeeper.dir
|
Spark信息在ZK中的保存目錄,默認:/spark
|
參考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark”
(*)複習一下zookeeper:
相當於一個數據庫,把一些信息存放在zookeeper中,比如集羣的信息
數據同步功能,選舉功能,分佈式鎖功能
數據同步:給一個節點中寫入數據,可以同步到其他節點
選舉:Zookeeper中存在不同的角色,Leader Follower。如果Leader掛掉,重新選舉Leader
分佈式鎖:秒殺。以目錄節點的方式來保存數據
修改 spark-env.sh
同步到其他三台服務器
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou122:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou123:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou124:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
在hsiehchou121 start-all hsiehchou121 master hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在hsiehchou121 start-master hsiehchou121 master hsiehchou122 master(standby) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在hsiehchou121 上kill master
hsiehchou122 master(Active) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在網頁http://192.168.116.122:8080/ 可以看到相應信息
三、執行Spark的任務:兩個工具
1、spark-submit:用於提交Spark的任務
任務:jar
舉例:蒙特卡洛求PI(圓周率)
–class指明主程序的名字
其中100指定執行的次數
2、spark-shell 相當於REPL
spark-shell是Spark自帶的交互式Shell程序,方便用户進行交互式編程,用户可以在該命令行下用scala編寫spark程序
(*)啓動Spark Shell:spark-shell
也可以使用以下參數:
參數説明:
例如:
注意:
如果啓動spark shell時沒有指定master地址,但是也可以正常啓動spark shell和執行spark shell中的程序,其實是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣建立聯繫
作為一個獨立的Application運行
兩種模式:
(1)本地模式
spark-shell 後面不接任何參數,代表本地模式
./spark-shell
Spark context available as ‘sc’ (master = local[*], app id = local-1554372019995).
sc 是 SparkContext 對象名。 local[*] 代表本地模式,不提交到集羣中運行
(2)集羣模式
提交到集羣運行
Spark context available as ‘sc’ (master = spark://hsiehchou121:7077, app id = app-20190404190030-0000).
master = spark://hsiehchou121:7077
Spark session available as ‘spark’
Spark Session 是 2.0 以後提供的,利用 SparkSession 可以訪問spark所有組件
示例:WordCount程序
程序如下:
説明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile(“hdfs://192.168.116.121:9000/data.txt”)是hdfs中讀取數據
flatMap(_.split(” “))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(+)按照key進行reduce,並將value累加
saveAsTextFile(“hdfs://192.168.116.121:9000/output/wc”)將結果寫入到hdfs中
(*)處理本地文件,把結果打印到屏幕上
vi /root/hd/tmp_files/test_WordCount.txt
I love China
I love Jiangsu
Jiangsu is a beautiful place in China
(*)處理HDFS文件,結果保存在hdfs上
-rw-r–r– 3 root supergroup 0 2019-04-04 19:12 /out/0404/test_WordCount/_SUCCESS
-rw-r–r– 3 root supergroup 16 2019-04-04 19:12 /out/0404/test_WordCount/part-00000
-rw-r–r– 3 root supergroup 65 2019-04-04 19:12 /out/0404/test_WordCount/part-00001
_SUCCESS 代表程序執行成功
part-00000 part-00001 結果文件,分區。裏面內容不重複
(*)單步運行WordCount —-> RDD
RDD 彈性分佈式數據集
(1)依賴關係 : 寬依賴和窄依賴
(2)算子:
函數:
Transformation : 延時計算 map flatMap textFile
Action : 立即觸發計算 collect
説明:scala複習
(*)flatten:把嵌套的結果展開
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
(*)flatmap : 相當於一個 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)
myList.flatMap(x=>x.map(_*2))
執行過程:
1、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 代表一個List
2、flatten
3、在IDE中開發scala版本和Java版本的WorkCount
四、WordCount(scala版本和java版本)
1、scala版本的WordCount
新建一個工程,把jar引入到工程中
export Demo1.jar 點擊下一步,把jar包上傳到服務器上/root/hd/tmp_files/下
在spark裏面的bin目錄下輸入
2、java版本的WordCount
五、分析Spark的任務流程
1、分析WordCount程序處理過程
2、Spark調度任務的過程
提交到及羣眾運行任務時,spark執行任務調度
六、RDD和RDD特性、RDD的算子
1、RDD:彈性分佈式數據集
(*)Spark中最基本的數據抽象
(*)RDD的特性
- Internally, each RDD is characterized by five main properties:
* - A list of partitions
1)是一組分區
RDD由分區組成,每個分區運行在不同的Worker上,通過這種方式來實現分佈式計算
- A function for computing each split
在RDD中,提供算子處理每個分區中的數據 - -A list of dependencies on other RDDs
RDD存在依賴關係:寬依賴和窄依賴 - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可以自定義分區規則來創建RDD - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
優先選擇離文件位置近的節點來執行
如何創建RDD
(1)通過SparkContext.parallelize方法來創建
(2)通過外部數據源來創建
2、 算子
1)Transformation
map(func):相當於for循環,返回一個新的RDD
filter(func):過濾
flatMap(func):flat+map 壓平
mapPartitions(func):對RDD中的每個分區進行操作
mapPartitionsWithIndex(func):對RDD中的每個分區進行操作,可以取到分區號
sample(withReplacement, fraction, seed):採樣
集合運算
union(otherDataset):對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset):對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks])):去重
聚合操作:group by
groupByKey([numTasks]) :在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]):在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]):按照key進行聚合
排序
sortByKey([ascending], [numTasks]):在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]):與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
重分區:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
舉例:
(1)創建一個RDD,每個元素乘以2,再排序
過濾出大於20的元素:
(2)字符串(字符)類型的RDD
3、RDD的集合運算
4、分組操作:reduceByKey
reduceByKey will provide much better performance.
官方不推薦使用 groupByKey 推薦使用 reduceByKey
5、cogroup
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合併,與groupByKey返回值上與區別
6、reduce操作(Action)
聚合操作
7、需求:按照value排序
做法:
1、交換,把key 和 value交換,然後調用sortByKey方法
2、再次交換
(2)Action
reduce(func):通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的
collect():在驅動程序中,以數組的形式返回數據集的所有元素
count():返回RDD的元素個數
first():返回RDD的第一個元素(類似於take(1))
take(n):返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]):返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]):takeOrdered和top類似,只不過以和top相反的順序返回元素
saveAsTextFile(path):將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) :將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統
saveAsObjectFile(path) :saveAsObjectFile用於將RDD中的元素序列化成對象,存儲到文件中
countByKey():針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func):在數據集的每一個元素上,運行函數func進行更新。
與map類似,沒有返回值
3、特性
1)RDD的緩存機制
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的
緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition
(*)作用:提高性能
(*)使用:標識RDD可以被緩存 persist cache
(*)可以緩存的位置:
舉例:測試數據,92萬條
進入spark-shell命令
2)RDD的容錯機制:通過檢查點來實現
檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷
設置checkpoint的目錄,可以是本地的文件夾、也可以是HDFS。一般是在具有容錯能力,高可靠的文件系統上(比如HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據
/**
- Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
- directory set with
SparkContext#setCheckpointDirand all references to its parent - RDDs will be removed. This function must be called before any job has been
- executed on this RDD. It is strongly recommended that this RDD is persisted in
- memory, otherwise saving it on a file will require recomputation.
*/
(*)複習檢查點:
HDFS中的檢查點:有SecondaryNamenode來實現日誌的合併
(*)RDD的檢查點:容錯
概念:血統 Lineage
理解:表示任務執行的生命週期
WordCount textFile —> redceByKey
如果血統越長,越容易出錯
假如有檢查點,可以從最近的一個檢查點開始,往後面計算。不用重頭計算
(*)RDD檢查點的類型
(1)基於本地目錄:需要將Spark shell 或者任務運行在本地模式上(setMaster(“local”))
開發和測試
(2)HDFS目錄:用於生產
sc.setCheckPointDir(目錄)
舉例:設置檢查點
設置檢查點目錄:
scala> sc.setCheckpointDir(“hdfs://192.168.116.121:9000/sparkchkpt”)
標識rdd1可以執行檢查點操作
scala> rdd1.checkpoint
scala> rdd1.count
res2: Long = 921911
(3)依賴關係:寬依賴,窄依賴
劃分任務執行的stage
RDD和它依賴的父RDD(s)的關係有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用(一(父)對一(子))
總結:窄依賴我們形象的比喻為獨生子女
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition(一(父)對多(子))
總結:窄依賴我們形象的比喻為超生
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關係的不同將DAG劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據
七、RDD的高級算子
1、mapPartitionsWithIndex
對RDD中的每個分區(帶有下標)進行操作,下標用index表示
通過這個算子,我們可以獲取分區號
def mapPartitionsWithIndex<a href="
f: %28Int, Iterator%5bT%5d%29 ⇒ Iterator%5bU%5d,
preservesPartitioning: Boolean = false”>U(implicit arg0: ClassTag[U]): RDD[U]
通過將函數應用於此RDD的每個分區來返回新的RDD,同時跟蹤原始分區的索引
preservesPartitioning指輸入函數是否保留分區器,除非是一對RDD並且輸入函數不修改keys,否則應該是false
參數:f是個函數參數 f 中第一個參數是Int,代表分區號,第二個Iterator[T]代表分區中的元素
舉例:把分區中的元素,包括分區號,都打印出來
2、aggregate
聚合操作。類似於分組
(*)先對局部進行聚合操作,再對全局進行聚合操作
調用聚合操作
分析結果:初始值是100,代表每個分區多了一個100
全局操作,也多了一個100
100+100+100 = 300
對RDD中的元素進行求和
- RDD.map
- 聚合操作(效率大於map)
(*)對字符串操作
結果分析:
- *abc *def
- **def*abc
(*)複雜的例子
1)
執行過程:
第一個分區:
第一次比較: “” “12” 長度最大值 2 2–>”2”
第二次比較: “2” “23” 長度最大值 2 2–>”2”
第二個分區:
第一次比較: “” “345” 長度最大值 3 3–>”3”
第二次比較: “3” “4567” 長度最大值 4 4–>”4”
結果:24 或者42
2)
執行過程:
第一個分區:
第一次比較: “” “12” 長度最小值 0 0–>”0”
第二次比較: “0” “23” 長度最小值 1 1–>”1”
第二個分區:
第一次比較: “” “345” 長度最小值 0 0–>”0”
第二次比較: “0” “4567” 長度最小值 1 1–>”1”
3)aggregateByKey:類似於aggregate,區別:操作的是 key value 的數據類型
1.將每個動物園(分區)中,動物數最多的動物,進行求和
動物園0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],
動物園1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])
2.將所有動物求和
aggregateByKey效率更高
4)coalesce與repartition
與分區有關
都是對RDD進行重分區
區別:
coalesce 默認不會進行Shuffle 默認 false 如需修改分區,需置為true
repartition 會進行Shuffle
5)其他高級算子
比較好的高級算子的博客(推薦)
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
八、編程案例
1、分析日誌
需求:找到訪問量最高的兩個網頁
(*)第一步:對網頁的訪問量求和
(*)第二步:排序,降序
日誌數據
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/ HTTP/1.1” 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/head.jsp HTTP/1.1” 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/body.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
結果:
(/hadoop.jsp,9)
(/oracle.jsp,9)
2、創建自定義分區
根據jsp文件的名字,將各自的訪問日誌放入到不同的分區文件中
3、使用JDBCRDD 操作數據庫
將RDD的數據保存到mysql數據庫中
mysql的company的emp數據
1 Tom 10 2400
2 Alis 11 1900
3 Kei 12 1500
4 Mi 11 900
結果
ArrayBuffer((Alis,1900), (Kei,1500))
JdbcRDD參數説明
|
參數名稱
|
類型
|
説明
|
|
sc
|
org.apache.spark.SparkContext
|
Spark Context對象
|
|
getConnection
|
scala.Function0[java.sql.Connection]
|
得到一個數據庫Connection
|
|
sql
|
scala.Predef.String
|
執行的SQL語句
|
|
lowerBound
|
scala.Long
|
下邊界值,即:SQL的第一個參數
|
|
upperBound
|
scala.Long
|
上邊界值,即:SQL的第二個參數
|
|
numPartitions
|
scala.Int
|
分區的個數,即:啓動多少個Executor
|
|
mapRow
|
scala.Function1[java.sql.ResultSet, T]
|
得到的結果集
|
JdbcRDD的缺點:從上面的參數説明可以看出,JdbcRDD有以下兩個缺點:
(1)執行的SQL必須有兩個參數,並類型都是Long
(2)得到的結果是ResultSet,即:只支持select操作
4、操作數據庫:把結果存放到數據庫中