訂閱
糾錯(cuò)
加入自媒體

基于Spark的數(shù)據(jù)分析實(shí)踐

2019-06-19 09:55
EAWorld
關(guān)注

三、SparkSQL

Spark 從 1.3 版本開(kāi)始原有 SchemaRDD 的基礎(chǔ)上提供了類(lèi)似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開(kāi)發(fā)者的學(xué)習(xí)門(mén)檻,同時(shí)還支持Scala、Java與Python三種語(yǔ)言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。

一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果  -> 寫(xiě)入結(jié)果

SparkSQL 結(jié)構(gòu)化數(shù)據(jù)

處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);

把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);

非結(jié)構(gòu)化數(shù)據(jù)通過(guò) RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;

按照列式數(shù)據(jù)庫(kù),只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);

處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過(guò)一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過(guò)濾到不合法的數(shù)據(jù)。

SparkSQL DataFrame

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。如果熟悉 Python Pandas 庫(kù)中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.a(chǎn)pache.spark.sql._//定義數(shù)據(jù)的列名稱(chēng)和類(lèi)型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//導(dǎo)入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
//將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來(lái),把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通過(guò)SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭val df= spark.createDataFrame(rowRDD, dt)

可左右滑動(dòng)查看代碼

讀取規(guī)則數(shù)據(jù)文件作為DataFrame

SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
# 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄valdf=sqlContext.read().json(path);
# 讀取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
# 讀取 HadoopORC 文件vardf=sqlContext.read().orc(path);

可左右滑動(dòng)查看代碼

JSON 文件為每行一個(gè) JSON 對(duì)象的文件類(lèi)型,行尾無(wú)須逗號(hào)。文件頭也無(wú)須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open(        HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.a(chǎn)pache.spark.sql.parquet.row.metadata");

可左右滑動(dòng)查看代碼

allFiedls 的值就是各字段的名稱(chēng)和具體的類(lèi)型,整體是一個(gè)json格式進(jìn)行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過(guò) SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。

從Spark2.0以上的版本開(kāi)始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;

在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開(kāi)啟 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();

可左右滑動(dòng)查看代碼

// db 指 Hive 庫(kù)中的數(shù)據(jù)庫(kù)名,如果不寫(xiě)默認(rèn)為 default

// tableName 指 hive 庫(kù)的數(shù)據(jù)表名

sqlContext.sql(“select * from db.tableName”)

可左右滑動(dòng)查看代碼

SparkSQL ThriftServer

//首先打開(kāi) Hive 的 Metastore服務(wù)

hive$bin/hive –-service metastore –p 8093

可左右滑動(dòng)查看代碼

//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴(lài) jar

spark$hadoop fs –put jars/*.jar /lib/spark2

可左右滑動(dòng)查看代碼

// 啟動(dòng) spark thriftserver 服務(wù)

spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar

可左右滑動(dòng)查看代碼

當(dāng)hdfs 上傳了spark 依賴(lài) jar 時(shí),通過(guò)spark.yarn.jars 可看到日志 spark 無(wú)須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

可左右滑動(dòng)查看代碼

//通過(guò) spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

可左右滑動(dòng)查看代碼

-u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;

-n 是指定登陸到 spark Session 上的用戶(hù)名稱(chēng);

Beeline 還支持傳入-e 可傳入一行 SQL,

-e <query>                      query that should be executed

也可通過(guò) –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過(guò)程)

-f <exec file>                  script file that should be executed

SparkSQL Beeline 的執(zhí)行效果展示

SparkSQL ThriftServer

對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶(hù)都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過(guò)時(shí)間順序列表展示。

SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫(kù)工具創(chuàng)建查詢(xún),也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開(kāi)發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開(kāi)始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基于 SparkSQL 開(kāi)發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開(kāi)發(fā),并且降低開(kāi)發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無(wú)法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開(kāi)發(fā)者。

一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開(kāi)發(fā)模型;

一個(gè)可二次定制開(kāi)發(fā)的大數(shù)據(jù)開(kāi)發(fā)框架,提供了靈活的可擴(kuò)展 API;

一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫(kù),NoSQL 等統(tǒng)一的數(shù)據(jù)開(kāi)發(fā)視界語(yǔ)義;

基于 SQL 的開(kāi)發(fā)語(yǔ)言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;

支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);

支持開(kāi)源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。

SparkSQL Flow 適合的場(chǎng)景:

批量 ETL;

非實(shí)時(shí)分析服務(wù);

SparkSQL Flow XML 概覽

Properties 內(nèi)定義一組變量,可用于宏替換;

Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);

Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;

Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;

Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);

Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;

After 可定義 0到多個(gè)任務(wù)日志;

如你所見(jiàn),source 的 type 參數(shù)用于區(qū)分 source 的類(lèi)型,source 支持的種類(lèi)直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫(kù)還需要 driver,url,user和 password 參數(shù)。

Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語(yǔ)法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

<上一頁(yè)  1  2  3  4  下一頁(yè)>  
聲明: 本文由入駐維科號(hào)的作者撰寫(xiě),觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問(wèn)題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字

您提交的評(píng)論過(guò)于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無(wú)評(píng)論

暫無(wú)評(píng)論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號(hào)
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯(cuò)
x
*文字標(biāo)題:
*糾錯(cuò)內(nèi)容:
聯(lián)系郵箱:
*驗(yàn) 證 碼:

粵公網(wǎng)安備 44030502002758號(hào)