訂閱
糾錯(cuò)
加入自媒體

基于Spark的數(shù)據(jù)分析實(shí)踐

2019-06-19 09:55
EAWorld
關(guān)注

SparkSQL Flow 支持的Sourse

支持從 Hive 獲得數(shù)據(jù);

支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile

支持RDBMS數(shù)據(jù)庫(kù):PostgreSQL, MySQL,Oracle

支持 NOSQL 數(shù)據(jù)庫(kù):Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進(jìn)行切分,切分不夠的列使用 null 填充。

<source type="textfile" table_name="et_rel_pty_cong"              fields="cust_id,name1,gender1,age1:int"               delimiter=","              path="file:///Users/zhenqin/software/hive/user.txt"/>

可左右滑動(dòng)查看代碼

Tablename 為該文件映射的數(shù)據(jù)表名,可理解為數(shù)據(jù)的視圖;

Fields 為切分后的字段,使用逗號(hào)分隔,字段后可緊跟該字段的類型,使用冒號(hào)分隔;

Delimiter 為每行的分隔符;

Path 用于指定文件地址,可以是文件,也可是文件夾;

Path 指定地址需要使用協(xié)議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關(guān);

SparkSQL Flow DB Source

<source type="mysql" table_name="et_rel_pty_cong"                table="user"                url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8"                driver="com.mysql.jdbc.Driver"                user="root" password="123456"/>

可左右滑動(dòng)查看代碼

RDBMS 是從數(shù)據(jù)庫(kù)使用 JDBC讀取 數(shù)據(jù)集。支持 type 為:db、mysql、oracle、postgres、mssql;

tablename 為該數(shù)據(jù)表的抽象 table 名稱(視圖);

url、driver、user,password 為數(shù)據(jù)庫(kù) JDBC 驅(qū)動(dòng)信息,為必須字段;

SparkSQL 會(huì)加載該表的全表數(shù)據(jù),無(wú)法使用 where 條件。


SparkSQL Flow Transformer

<transform type="sql" table_name="cust_id_agmt_id_t" cached="true">            SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids            FROM user_concat_testx            group by c_phone,c_type,c_num</transform>

可左右滑動(dòng)查看代碼

Transform 支持 cached 屬性,默認(rèn)為 false;如果設(shè)置為 true,相當(dāng)于把該結(jié)果緩存到內(nèi)存中,緩存到內(nèi)存中的數(shù)據(jù)在后續(xù)其它 Transform 中使用能提高計(jì)算效率。但是需使用大量?jī)?nèi)存,開(kāi)發(fā)者需要評(píng)估該數(shù)據(jù)集能否放到內(nèi)存中,防止出現(xiàn) OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數(shù)據(jù)到一個(gè)或者多個(gè)目標(biāo)。這些目標(biāo),基本覆蓋了 Source 包含的外部系統(tǒng)。下面以 Hive 舉例說(shuō)明:

<target type="hive" table_name="cust_id_agmt_id_t"  savemode=”append”target_table_name="cust_id_agmt_id_h(yuǎn)"/>

可左右滑動(dòng)查看代碼

table_name 為 source 或者 Transform 定義的表名稱;

target_table_name 為 hive 中的表結(jié)果,Hive 表可不存在也可存在,sparksql 會(huì)根據(jù) DataFrame 的數(shù)據(jù)類型自動(dòng)創(chuàng)建表;

savemode 默認(rèn)為 overwrite 覆蓋寫入,當(dāng)寫入目標(biāo)已存在時(shí)刪除源表再寫入;支持 append 模式, 可增量寫入。

Target 有一個(gè)特殊的 show 類型的 target。用于直接在控制臺(tái)輸出一個(gè) DataFrame 的結(jié)果到控制臺(tái)(print),該 target 用于開(kāi)發(fā)和測(cè)試。

<target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/>

可左右滑動(dòng)查看代碼

Rows 用于控制輸出多少行數(shù)據(jù)。

SparkSQL Around

After 用于 Flow 在運(yùn)行結(jié)束后執(zhí)行的一個(gè)環(huán)繞,用于記錄日志和寫入狀態(tài)。類似 Java 的 try {} finally{ round.execute() }

多個(gè) round 一定會(huì)執(zhí)行,round 異常不會(huì)導(dǎo)致任務(wù)失敗。

<prepare>        <round type="mysql"               sql="insert into cpic_task_h(yuǎn)istory(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)               values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"               url="${jdbc.url}" .../></prepare><after>        <round type="mysql"               sql="update cpic_task_h(yuǎn)istory set               end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"               url="${jdbc.url}”…/></after>

可左右滑動(dòng)查看代碼

Prepare round 和 after round 配合使用可用于記錄 SparkSQL Flow 任務(wù)的運(yùn)行日志。

SparkSQL Around可使用的變量

SparkSQL Around的執(zhí)行效果

Prepare round 可做插入(insert)動(dòng)作,after round 可做更新 (update)動(dòng)作,相當(dāng)于在數(shù)據(jù)庫(kù)表中從執(zhí)行開(kāi)始到結(jié)束有了完整的日志記錄。SparkSQL Flow 會(huì)保證round 一定能被執(zhí)行,而且 round 的執(zhí)行不影響任務(wù)的狀態(tài)。

SparkSQL Flow 提交

bin/spark-submit --master yarn-client --driver-memory 1G --num-executors 10 --executor-memory 2G --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar --queue default --name FlowTest etl-flow-0.2.0.jar -f hive-flow-test.xml

可左右滑動(dòng)查看代碼

接收必須的參數(shù) –f,可選的參數(shù)為支持 Kerberos 認(rèn)證的租戶名稱principal,和其認(rèn)證需要的密鑰文件。

usage: spark-submit --jars etl-flow.jar --class                    com.yiidata.etl.flow.source.FlowRunner -f,--xml-file <arg>     Flow XML File Path    --keytabFile <arg>   keytab File Path(Huawei)    --krb5File <arg>     krb5 File Path(Huawei)    --principal <arg>    principal for hadoop(Huawei)

可左右滑動(dòng)查看代碼

SparkSQL Execution Plan

每個(gè)Spark Flow 任務(wù)本質(zhì)上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的數(shù)據(jù)表操作。

regiserDataFrameAsTable 是每個(gè) source 和 Transform 的數(shù)據(jù)在 SparkSQL 中的數(shù)據(jù)視圖,每個(gè)視圖都會(huì)在 SparkContex 中注冊(cè)一次。

聲明: 本文由入駐維科號(hào)的作者撰寫,觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問(wèn)題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字

您提交的評(píng)論過(guò)于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無(wú)評(píng)論

暫無(wú)評(píng)論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號(hào)
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯(cuò)
x
*文字標(biāo)題:
*糾錯(cuò)內(nèi)容:
聯(lián)系郵箱:
*驗(yàn) 證 碼:

粵公網(wǎng)安備 44030502002758號(hào)