基于Spark的數(shù)據(jù)分析實(shí)踐
三、SparkSQL
Spark 從 1.3 版本開(kāi)始原有 SchemaRDD 的基礎(chǔ)上提供了類(lèi)似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開(kāi)發(fā)者的學(xué)習(xí)門(mén)檻,同時(shí)還支持Scala、Java與Python三種語(yǔ)言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。
一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果 -> 寫(xiě)入結(jié)果
SparkSQL 結(jié)構(gòu)化數(shù)據(jù)
處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);
把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);
非結(jié)構(gòu)化數(shù)據(jù)通過(guò) RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;
按照列式數(shù)據(jù)庫(kù),只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);
處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過(guò)一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過(guò)濾到不合法的數(shù)據(jù)。
SparkSQL DataFrame
SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。如果熟悉 Python Pandas 庫(kù)中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。
TextFile DataFrame
import.org.a(chǎn)pache.spark.sql._//定義數(shù)據(jù)的列名稱(chēng)和類(lèi)型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//導(dǎo)入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
//將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來(lái),把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通過(guò)SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭val df= spark.createDataFrame(rowRDD, dt)
可左右滑動(dòng)查看代碼
讀取規(guī)則數(shù)據(jù)文件作為DataFrame
SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
# 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄valdf=sqlContext.read().json(path);
# 讀取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
# 讀取 HadoopORC 文件vardf=sqlContext.read().orc(path);
可左右滑動(dòng)查看代碼
JSON 文件為每行一個(gè) JSON 對(duì)象的文件類(lèi)型,行尾無(wú)須逗號(hào)。文件頭也無(wú)須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;
Parquet文件
Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open( HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.a(chǎn)pache.spark.sql.parquet.row.metadata");
可左右滑動(dòng)查看代碼
allFiedls 的值就是各字段的名稱(chēng)和具體的類(lèi)型,整體是一個(gè)json格式進(jìn)行展示。
讀取 Hive 表作為 DataFrame
Spark2 API 推薦通過(guò) SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。
在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。
從Spark2.0以上的版本開(kāi)始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;
在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開(kāi)啟 Hive 支持即可(enableHiveSupport())。
SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
可左右滑動(dòng)查看代碼
// db 指 Hive 庫(kù)中的數(shù)據(jù)庫(kù)名,如果不寫(xiě)默認(rèn)為 default
// tableName 指 hive 庫(kù)的數(shù)據(jù)表名
sqlContext.sql(“select * from db.tableName”)
可左右滑動(dòng)查看代碼
SparkSQL ThriftServer
//首先打開(kāi) Hive 的 Metastore服務(wù)
hive$bin/hive –-service metastore –p 8093
可左右滑動(dòng)查看代碼
//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴(lài) jar
spark$hadoop fs –put jars/*.jar /lib/spark2
可左右滑動(dòng)查看代碼
// 啟動(dòng) spark thriftserver 服務(wù)
spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar
可左右滑動(dòng)查看代碼
當(dāng)hdfs 上傳了spark 依賴(lài) jar 時(shí),通過(guò)spark.yarn.jars 可看到日志 spark 無(wú)須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar
可左右滑動(dòng)查看代碼
//通過(guò) spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)
bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop
可左右滑動(dòng)查看代碼
-u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;
-n 是指定登陸到 spark Session 上的用戶(hù)名稱(chēng);
Beeline 還支持傳入-e 可傳入一行 SQL,
-e <query> query that should be executed
也可通過(guò) –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過(guò)程)
-f <exec file> script file that should be executed
SparkSQL Beeline 的執(zhí)行效果展示
SparkSQL ThriftServer
對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶(hù)都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過(guò)時(shí)間順序列表展示。
SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫(kù)工具創(chuàng)建查詢(xún),也用于第三方的 BI 工具,如 tableau。
四、SparkSQL Flow
SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開(kāi)發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開(kāi)始 SparkSQL Flow 的介紹:
SparkSQL Flow 是基于 SparkSQL 開(kāi)發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開(kāi)發(fā),并且降低開(kāi)發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無(wú)法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開(kāi)發(fā)者。
一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開(kāi)發(fā)模型;
一個(gè)可二次定制開(kāi)發(fā)的大數(shù)據(jù)開(kāi)發(fā)框架,提供了靈活的可擴(kuò)展 API;
一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫(kù),NoSQL 等統(tǒng)一的數(shù)據(jù)開(kāi)發(fā)視界語(yǔ)義;
基于 SQL 的開(kāi)發(fā)語(yǔ)言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;
支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);
支持開(kāi)源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。
SparkSQL Flow 適合的場(chǎng)景:
批量 ETL;
非實(shí)時(shí)分析服務(wù);
SparkSQL Flow XML 概覽
Properties 內(nèi)定義一組變量,可用于宏替換;
Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);
Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;
Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);
Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;
After 可定義 0到多個(gè)任務(wù)日志;
如你所見(jiàn),source 的 type 參數(shù)用于區(qū)分 source 的類(lèi)型,source 支持的種類(lèi)直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫(kù)還需要 driver,url,user和 password 參數(shù)。
Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語(yǔ)法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。
Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。
發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
即日-11.13立即報(bào)名>>> 【在線(xiàn)會(huì)議】多物理場(chǎng)仿真助跑新能源汽車(chē)
-
11月20日火熱報(bào)名中>> 2024 智能家居出海論壇
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線(xiàn)會(huì)議
-
12月19日立即報(bào)名>> 【線(xiàn)下會(huì)議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
即日-12.26火熱報(bào)名中>> OFweek2024中國(guó)智造CIO在線(xiàn)峰會(huì)
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書(shū)》
推薦專(zhuān)題
- 1 【一周車(chē)話(huà)】沒(méi)有方向盤(pán)和踏板的車(chē),你敢坐嗎?
- 2 特斯拉發(fā)布無(wú)人駕駛車(chē),還未迎來(lái)“Chatgpt時(shí)刻”
- 3 特斯拉股價(jià)大跌15%:Robotaxi離落地還差一個(gè)蘿卜快跑
- 4 馬斯克給的“驚喜”夠嗎?
- 5 打完“價(jià)格戰(zhàn)”,大模型還要比什么?
- 6 馬斯克致敬“國(guó)產(chǎn)蘿卜”?
- 7 神經(jīng)網(wǎng)絡(luò),誰(shuí)是盈利最強(qiáng)企業(yè)?
- 8 比蘋(píng)果偉大100倍!真正改寫(xiě)人類(lèi)歷史的智能產(chǎn)品降臨
- 9 諾獎(jiǎng)進(jìn)入“AI時(shí)代”,人類(lèi)何去何從?
- 10 Open AI融資后成萬(wàn)億獨(dú)角獸,AI人才之爭(zhēng)開(kāi)啟
- 高級(jí)軟件工程師 廣東省/深圳市
- 自動(dòng)化高級(jí)工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷(xiāo)售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級(jí)銷(xiāo)售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專(zhuān)家 廣東省/江門(mén)市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市