目錄
- 一、環境準備
- (一)新建maven項目
- (二)添加框架支持
- (三)修改maven倉庫地址
- (四)pom文件
- (五)新建scala目錄
- 二、編寫具體代碼
- (一)全量抽取
- (二)增量抽取
- 題目一:兩個時間數據類型增量抽取字段
- 題目2 :一個時間數據類型增量抽取字段
- 題目3:一個數值型id作為增量抽取字段
- 三、將應用程序打包後提交到yarn平台運行
- (一)maven項目打jar包
- (二)平台環境的啓動
- (三)執行命令
一、環境準備
(一)新建maven項目
文件—》新建—》項目,選maven
(二)添加框架支持
選中項目,單機鼠標右鍵,選“添加框架支持”,將scala前的複選框勾上
(三)修改maven倉庫地址
文件——設置——搜索maven,按照題目要求修改本地倉庫
(四)pom文件
複製pom.xml的內容,點擊maven菜單中的刷新按鈕,重新加載項目
(五)新建scala目錄
在src下新建scala的目錄,並將scala目錄標記為源根。標記成功後文件夾為藍色。
二、編寫具體代碼
(一)全量抽取
全量抽取比較簡單,只需要讀取mysql數據庫中的數據,然後不經過任何篩選操作直接將數據全部寫入hive數據庫中。
(二)增量抽取
題目一:兩個時間數據類型增量抽取字段
抽取shtd_store庫中user_info的增量數據進入Hive的ods庫中表user_info。根據ods.user_info表中operate_time或create_time作為增量字段(即MySQL中每條數據取這兩個時間中較大的那個時間作為增量字段去和ods裏的這兩個字段中較大的時間進行比較),只將新增的數據抽入,字段名稱、類型不變,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.user_info命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下
```scala
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.Properties
object Chouqu {
def main(args: Array[String]): Unit = {
//創建SparkSession對象
val spark = SparkSession
.builder()
.appName("chouqu3")
//設置hive元服務的地址
.config("hive.metastore.uris", "thrift://master:9083")
//設置hive數據存儲目錄
.config("hive.metastore.warehouse.dir", "hdfs://master:8020/hive/warehouse")
//設置hive分區模式
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
//導入隱式包
import spark.implicits._
//設置mysql數據庫相關連接屬性
val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "1234")
//讀取mysql中的數據
val data = spark.read.jdbc(url, "user_info", prop)
//讀取hive中的數據
val hivedata = spark.read.table("ods.user_info")
//找到抽取數據的增量值,即hive中create_time,operate_time最大值
val maxtime =
//找到
spark.sql("select max(greatest(create_time,operate_time)) as maxtime from ods.user_info")
.withColumn("maxtime",$"maxtime".cast(StringType))
.head()
.getAs[String]("maxtime")
//獲取比賽前一天的時間,並格式化為yyyyMMdd格式
val curDate = LocalDateTime.now()
val preDate = curDate.minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
data
/*
藉助greatest函數找到"operate_time","create_time"的較大值
藉助gt函數完成比較:gr——大於;lt——小於
藉助where函數完成數據篩選
*/
.where(greatest("operate_time","create_time").gt(maxtime))
//增加分區字段
.withColumn("etl_date",lit(predate))
.write
//添加分區
.partitionBy("etl_date")
//增量抽取,需保留原有數據,因此用Append模式
.mode(SaveMode.Append)
.format("hive")
//寫入hive中
.saveAsTable("ods.user_info")
spark.stop()
}
}
題目2 :一個時間數據類型增量抽取字段
2、抽取shtd_store庫中sku_info的增量數據進入Hive的ods庫中表sku_info。根據ods.sku_info表中create_time作為增量字段,只將新增的數據抽入,字段名稱、類型不變,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.sku_info命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下;
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object Chouqu2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession
.builder()
.config("hive.metastore.uris","thrift://master:9083")
.config("hive.metastore.warehouse.dir","hdfs://master:8020/user/hive/warehouse")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","1234")
val mdf = spark.read.jdbc(url,"sku_info",prop)
val hdf = spark.read.table("ods.sku_info")
val maxtime = hdf
.agg(max("create_time") as "maxtime")
.withColumn("maxtime", $"maxtime".cast(StringType))
.collect()(0)
.getAs[String]("maxtime")
val curdate = LocalDate.now()
val predate = curdate.minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
mdf
.where($"create_time".gt(maxtime))
.withColumn("etl_date",lit(predate))
.write
.partitionBy("etl_date")
.mode(SaveMode.Append)
.format("hive")
.saveAsTable("ods.sku_info")
spark.stop()
}
}
題目3:一個數值型id作為增量抽取字段
3、抽取shtd_store庫中base_province的增量數據進入Hive的ods庫中表base_province。根據ods.base_province表中id作為增量字段,只將新增的數據抽入,字段名稱、類型不變並添加字段create_time取當前時間,同時添加靜態分區,分區字段為etl_date,類型為String,且值為當前比賽日的前一天日期(分區字段格式為yyyyMMdd)。使用hive cli執行show partitions ods.base_province命令,將結果截圖粘貼至客户端桌面【Release\任務B提交結果.docx】中對應的任務序號下;
package com.fuxi
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.Properties
object Chouqu3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
//.master("local")
// .config("fs.defaultFS", "hdfs://master:8020")
//jar運行去掉改行
//.config("spark.sql.warehouse.dir", "hdfs://master:8020/hive/warehouse")
.config("hive.metastore.uris","thrift://master:9083")
.config("hive.metastore.warehouse.dir","hdfs://master:8020/hive/warehouse")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "jdbc:mysql://master:3306/shtd_store?useSSL=false"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","1234")
val mdf = spark.read.jdbc(url,"base_province",prop)
val hdf = spark.read.table("ods.base_province")
val maxId = hdf
.agg(max("id") as "maxId")
.withColumn("maxId",$"maxId".cast(LongType) )
.collect()(0)
.getAs[Long]("maxId")
val predate = LocalDate.now()
.minusDays(1)
.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
mdf
.withColumn("etl_date",lit(predate))
.where($"id".gt(maxId))
.write
.partitionBy("etl_date")
.mode(SaveMode.Append)
.format("hive")
.saveAsTable("ods.base_province")
spark.stop()
}
}
三、將應用程序打包後提交到yarn平台運行
(一)maven項目打jar包
先點擊maven菜單項中clean,結束後再點擊package
(二)平台環境的啓動
啓動hadoop集羣:start-all.sh
啓動hive元服務:hive --service metastore&
(三)執行命令
spark-submit --master yarn --deploy-mode client --class com.yunxing.Chouqu /opt/jars/test.jar