訂閱
糾錯
加入自媒體

基于XML描述的可編程函數(shù)式ETL實現(xiàn)

2019-07-02 10:24
EAWorld
關(guān)注

轉(zhuǎn)載本文需注明出處:微信公眾號EAWorld,違者必究。

引言:

傳統(tǒng) ETL 主要以 SQL 為主要技術(shù)手段,把數(shù)據(jù)經(jīng)抽取、清洗轉(zhuǎn)換之后加載到數(shù)據(jù)倉庫。但是在如今移動互聯(lián)網(wǎng)大力發(fā)展的場景下,產(chǎn)生大量碎片化和不規(guī)則的數(shù)據(jù)。政府,公安等行業(yè),傳統(tǒng)數(shù)據(jù)庫已經(jīng)遠(yuǎn)遠(yuǎn)無法滿足需求。數(shù)據(jù)原始文件通過文件導(dǎo)入到基礎(chǔ)庫,再通過大數(shù)據(jù) HQL等技術(shù)手段提取出二級庫,這中間的數(shù)據(jù)導(dǎo)入和 SQL ETL 的提取的過程,大量消耗 IO 性能和計算資源,在很多場景下已經(jīng)是數(shù)據(jù)處理的瓶頸所在。

普元在實施公安項目過程中開發(fā)了一種基于 XML 描述的可編程的函數(shù) ETL 轉(zhuǎn)換方法。主要用于大數(shù)據(jù)文件處理領(lǐng)域,能從原始數(shù)據(jù)文件直接、快速加載到專題庫的技術(shù)手段。技術(shù)方案主要解決了用 XML 的技術(shù)手段描述數(shù)據(jù)文件的格式,包含文件字段切分、字段類型、默認(rèn)值、異常值校驗、時間格式校驗。在處理時可添加自行開發(fā)的 JAVA UDF 函數(shù),函數(shù)實參支持變量、常量、表達(dá)式、函數(shù)和運算符重載。同時函數(shù)支持多層嵌套,即內(nèi)部函數(shù)的返回值最為外部函數(shù)的實參。該方案實現(xiàn)了 XML 內(nèi)函數(shù)體的語法解析并在運行過程中直接編譯為 Java 字節(jié)碼的技術(shù)。有效的解決了政府、公安、電信行業(yè)巨量的數(shù)據(jù)處理需要的大量計算資源和 IO 性能瓶頸,有效的提高了數(shù)據(jù)處理效率和降低了數(shù)據(jù)處理開發(fā)難度。

目錄:

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

二、XML 控制文件結(jié)構(gòu)和語法

三、函數(shù)和多層嵌套函數(shù)傳參

四、UDF 函數(shù)編寫方法

五、數(shù)據(jù)測試工具

六、FlumeOnYarn 架構(gòu)和分布式部署

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

對于數(shù)據(jù)開發(fā)項目,我們常常會面臨眾多的數(shù)據(jù)對接,部分場景不僅數(shù)據(jù)量大,且數(shù)據(jù)種類多,數(shù)據(jù)解析開發(fā)工作量巨大。對于大量數(shù)據(jù)對接,一般設(shè)計的 RPC 接口和 WebService 一般都達(dá)不到數(shù)據(jù)性能要求的。并且他們都是點對點的服務(wù),一旦上下游系統(tǒng)故障,都會造成整個數(shù)據(jù)對接異常。因此大部分都會選擇使用文件的方式進行數(shù)據(jù)對接。

對于非實時數(shù)據(jù)對接需求,這種方式的優(yōu)點:

在數(shù)據(jù)量大的情況下,可以通過文件傳輸,上游只寫入,無需關(guān)心數(shù)據(jù)業(yè)務(wù)和故障;

方案簡單,避免了網(wǎng)絡(luò)協(xié)議相關(guān)的概念;

維護簡單,只需保證磁盤寫入穩(wěn)定性即可;

我們常常會面臨基于此架構(gòu)的數(shù)據(jù)對接。但基于此架構(gòu)數(shù)據(jù)處理工作都在下游(即數(shù)據(jù)使用方)。

面對大量數(shù)據(jù)對接和眾多的數(shù)據(jù)類型,我們對于每種數(shù)據(jù)文件解析、解碼、清洗消耗大量的人力,并且基于編碼的方式對于較多數(shù)據(jù)類型的場景代碼量大,且難以管理。因此經(jīng)過多次數(shù)據(jù)開發(fā)實踐,我們開發(fā)了一種基于 XML 描述的方式來解析和清洗數(shù)據(jù)文件的實現(xiàn)。

本架構(gòu)實現(xiàn)適合以下幾個方面:

基于文件的數(shù)據(jù)對接;

文件無法直接導(dǎo)入到目標(biāo)數(shù)據(jù)庫,需要做轉(zhuǎn)換,清洗為目標(biāo)格式;

如上數(shù)據(jù)對接架構(gòu)圖,F(xiàn)lume 基本實現(xiàn)了基于文件系統(tǒng)的自動掃描和讀取,因此架構(gòu)實現(xiàn)了基于 Flume Sink 的模塊。本架構(gòu)也可作為SDK 作為框架集成到現(xiàn)有數(shù)據(jù)處理方案中。

二、XML數(shù)據(jù)控制文件結(jié)構(gòu)和語法

<?xml version="1.0" encoding="UTF-8"?><schema><key>JD_TYPE_V1</key><type>textfile</type><delimiter>,</delimiter><fields><field type="int">exp_flag</field>    <field type="string">sender_id</field>        <field type="string">sender_num</field>  <field type="string" value="unknown">sender_address</field>  <field type="string">receiver_num</field>  <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field>  <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>    <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>  <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field>  </fields></schema>

(可左右滑動查看全部代碼)

如上 XML 描述了一種數(shù)據(jù)文件類型及該類型的切分方法,數(shù)據(jù)每行經(jīng)過切分后,產(chǎn)生的多個數(shù)據(jù)列的轉(zhuǎn)換方法。

理論上,每種數(shù)據(jù)類型應(yīng)該對應(yīng)一個控制文件,意味著控制文件來描述該種數(shù)據(jù)類型如何解析和轉(zhuǎn)換。

Key 主要標(biāo)注該控制文件處理的類型ID;

Delimiter 為文件列切割字符;

Fields 中包含每列的字段描述;

數(shù)據(jù)類型支持Java基本類型和date類型;

Skip為數(shù)據(jù)對齊語法,控制在列中忽略某列的值;

Default = true 屬性為數(shù)據(jù)對齊語法,給某列提供默認(rèn)值,提供默認(rèn)值的列在數(shù)據(jù)列中不移動位移;

Value 提供了給該字段提供當(dāng)列中無值時提供默認(rèn)值;value=null則指定列值為null;

Date 類型需 pattern 屬性;

三、函數(shù)和多層嵌套函數(shù)傳參

默認(rèn)值

詞法分析時字段field 的value 屬性值沒有以英文小括號閉合的實體。如下示例中的primeton:

<field type="string" default="true" value="primeton">data_vendor</field>

(可左右滑動查看全部代碼)

函數(shù)

函數(shù)是由一組字符串、數(shù)字、下劃線組成的合法函數(shù)名和0 到多個形式參數(shù)組成。在詞法分析時字段field 的 value 屬性值由英文小括號閉合的實體。如下示例中的:

location(),yn(),concat();<field type="string" default="true" value=" unix_timestamp ">curr_time</field><field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>

(可左右滑動查看全部代碼)

函數(shù)名

函數(shù)體小括號前面的部分。一般由字符串、數(shù)字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即為該函數(shù)的函數(shù)名稱。

函數(shù)的形式參數(shù):

1.無參數(shù)

詞法分析時value的值滿足函數(shù)條件且函數(shù)體內(nèi)無參數(shù)。如下示例中:unix_timestamp() 獲得當(dāng)前系統(tǒng)內(nèi)的 Unix 時間戳;

<field type="string" default="true" value=" unix_timestamp()">curr_time</field>

(可左右滑動查看全部代碼)

2.常量型形參

詞法分析時函數(shù)體內(nèi)以英文單引號引用的值為函數(shù)體的常量型形參。如’100’,函數(shù)示例為:random_int(‘100’),生成 0-100 以內(nèi)的隨機整形數(shù)值;

<field type="string" default="true" value="random_int(‘100’)">rand_num</field>

(可左右滑動查看全部代碼)

3.變量型形參

詞法分析時函數(shù)體內(nèi)參數(shù)沒有英文單引號引用并且不以英文小括號閉合的為函數(shù)體的變量型形參。如下示例中的receiver_tel;

<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>

(可左右滑動查看全部代碼)

4.函數(shù)型形參

詞法分析時函數(shù)體內(nèi)沒有英文單引號并且以英文小括號閉合的參數(shù)類型參數(shù)為函數(shù)體的函數(shù)型參數(shù)。如下示例中的:none(sender_num)和none(receiver_num);

<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>

(可左右滑動查看全部代碼)

詞法分析獲得到函數(shù)體的同時,使用函數(shù)名調(diào)用UdfRegistors.getUdf(udfName) 函數(shù),以檢驗當(dāng)前系統(tǒng)必要存在該函數(shù),否則則拋出無法識別的函數(shù)異常。

5.類型校驗

詞法分析階段獲得了字段 field 的取值是默認(rèn)值或者函數(shù),下一步需校驗其默認(rèn)值或函數(shù)的返回值是否能和定義的字段類型相匹配。如果是函數(shù)同時校驗函數(shù)的形參和實參類型是否相匹配。

<field type="string" default="true" value="primeton">data_vendor</field><field type="int"    default="true" value="2">call_flag</field>

(可左右滑動查看全部代碼)

如上示例中的primeton 需能轉(zhuǎn)換為 string 類型,call_flag 需能轉(zhuǎn)換為 int 類型。如果類型不能轉(zhuǎn)換,則會拋出類型無法轉(zhuǎn)換異常。對于函數(shù),通過 returnType 返回類型和字段類型進行校驗,可匹配或者是該類型的子類型則類型驗證通過。

四、UDF 函數(shù)編寫方法

編寫一個UDF函數(shù)的步驟:

繼承 UDF 類,實現(xiàn) eval 方法;

Eval 方法傳入的是一個數(shù)組參數(shù);

判斷參數(shù)長度是否和預(yù)期的一致;

判斷位置參數(shù)類型是否和預(yù)期的一致;

實現(xiàn)函數(shù)體;

返回eval函數(shù)執(zhí)行的返回值,理論上該返回值的類型應(yīng)該一致,不應(yīng)該同一函數(shù)返回多種類型值;

函數(shù)編寫者應(yīng)該保證函數(shù)體內(nèi)是線程安全的;

UDF 實現(xiàn)如下:

public abstract class UDF {   /**   * 是否支持該組參數(shù)類型,不支持拋出UnsupportedTypeException異常。默認(rèn)返回 true   */   public void support(Class<?>... paramsClass)throws UnsupportedTypeException;   /*** 該 UDF 返回值類型,用于校驗嵌套函數(shù)類型是否匹配?煞祷睾唵晤愋,map,array,record 等類型.默認(rèn)返回 String 類型*/   public Class<?> returnType();/*** UDF 執(zhí)行函數(shù),當(dāng)輸入不符合預(yù)期時,向外拋出異常* @param params 函數(shù)的輸入實參* @return 函數(shù)輸出結(jié)果,簡單類型或者復(fù)雜類型,支持簡單類型,map,array,record 類型*/public abstract Object eval(Object... params);}

(可左右滑動查看全部代碼)

一個判斷是否包含子串的UDF 寫法:

所有的UDF都通過一個核心注冊類(這點類似 Hive 的FunctionRegistry)

public final class UdfRegistors {   /**    * UDF 函數(shù)映射    */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>();    static {UDF_CACHED.put("copy", new CopyUDF());  // 復(fù)制一個變量的值      UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個變量是否相等      UDF_CACHED.put("yn", new YnUDF()); // 根據(jù)輸入true,false 轉(zhuǎn)換為 Y、NUDF_CACHED.put("null", new NullUDF()); // 判斷變量是否為null// add udf methodUDF_CACHED.put("location", new LocationUDF());     // 獲得手機號碼的歸屬地   UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據(jù)國家名稱獲取國家代碼    UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據(jù)省名稱獲取省代碼    UDF_CACHED.put("city_code", new CityCodeUDF());    // 根據(jù)城市名稱獲取城市代碼    UDF_CACHED.put("phone_num", new PhoneNumUDF());  // 校驗是否是手機號或者固話UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗是否可以轉(zhuǎn)化成數(shù)字}/*** 添加一個UDF函數(shù)     * @param key UDF 函數(shù)     * @param value UDF 函數(shù) eval 應(yīng)線程安全    * @return     */    public static boolean addUdf(String key, UDF value) {        return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) 。 null;    }    /**     * 獲得內(nèi)置的 udf 函數(shù)     */    public static UDF getUdf(String udfName) {        return UDF_CACHED.get(udfName.toLowerCase());    }}

(可左右滑動查看全部代碼)

UDF 函數(shù)注冊時期:

可在編譯期綁定內(nèi)置的 UDF 函數(shù);

可在系統(tǒng)啟動時配置自加載的 UDF 函數(shù);

可在運行期動態(tài)注入UDF 函數(shù);

五、數(shù)據(jù)測試工具

數(shù)據(jù)對接過程,面對數(shù)據(jù)是否能轉(zhuǎn)換為目標(biāo)結(jié)果常常無從所知;赬ML 控制文件的數(shù)據(jù)解析,可實現(xiàn)一個測試工具。該工具通過上傳數(shù)據(jù)文件和上傳 XML 控制文件,可對數(shù)據(jù)文件隨機的讀取行進行匹配測試,只要數(shù)據(jù)列和目標(biāo) XML文件能通過列匹配測試,則數(shù)據(jù)可通過 ETL 解析清洗。否則繼續(xù)修改 XML 控制文件,直到順利通過匹配。

六、FlumeOnYarn 架構(gòu)和分布式部署

本架構(gòu)適合以文件作為數(shù)據(jù)對接的方案,另一方面,通過擴展 Flume 即可實現(xiàn)拿來主義。Flume 內(nèi)部實現(xiàn)對 Channel 的 Transaction,對于每個以文件構(gòu)造的 Event 對象是原子操作,要么全部成功,要么失敗。flume依賴事務(wù)來保證event的可靠性。Flume 默認(rèn)沒有分布式實現(xiàn),因此開發(fā)了 FlumeOnYarn 的架構(gòu),用于支持 Flume 的分布式部署。

FlumeOnYarn優(yōu)勢:

無需每個節(jié)點安裝 Flume,可一鍵啟動和停止;

配置文件在客戶端節(jié)點修改,自動復(fù)制到 Yarn 上各實例,無需每個節(jié)點修改;

基于 CDH或HDP的發(fā)行版,即使實現(xiàn)了 Web 可視化化的配置和分布式部署,但是對于 Flume 只能實現(xiàn)單配置文件實例,無法實現(xiàn)多配置實例;

集群的規(guī)?梢愿鶕(jù)數(shù)據(jù)量大小進行實時的調(diào)整(增減節(jié)點),實現(xiàn)彈性處理。通過命令或者 api 即可控制(CDH 等需要在頁面添加 host,繁瑣且不易動態(tài)調(diào)整);

多個租戶或者同一租戶多個處理實例互不影響,且能隔離(Yarn Container);

FlumeOnYarn 架構(gòu)

上圖所示,提交FlumeOnYarn 需要客戶端,該客戶端沒有太多和Flume安裝包結(jié)構(gòu)特殊的地方,只是在 lib 下添加了 flume-yarn 的架構(gòu)支持和 bin 下 flume-on-yarn 的啟動腳本。

Flume OnYarn 客戶端程序

通過 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群。如下的命令即可一次性申請多個 Yarn 資源節(jié)點,實現(xiàn)一鍵部署:

bin/flume-on-yarn yarn -s --name agent_name –conf  conf/flume-h(huán)dfs.conf  --num-instances 5

(可左右滑動查看全部代碼)

總結(jié)

推薦閱讀

元數(shù)據(jù)新型存儲架構(gòu)的探索

基于 Spark 的數(shù)據(jù)分析實踐

本地讀寫的多活數(shù)據(jù)存儲架構(gòu)設(shè)計要義

關(guān)于作者:震秦,普元資深開發(fā)工程師,專注于大數(shù)據(jù)開發(fā) 8 年,擅長 Hadoop 生態(tài)內(nèi)各工具的使用和優(yōu)化。參與某公關(guān)廣告(上市)公司DMP 建設(shè),負(fù)責(zé)數(shù)據(jù)分層設(shè)計和批處理,調(diào)度實現(xiàn),完成交付使用;參與國內(nèi)多省市公安社交網(wǎng)絡(luò)項目部署,負(fù)責(zé)產(chǎn)品開發(fā)(Spark 分析應(yīng)用);參與數(shù)據(jù)清洗加工為我方主題庫并部署上層應(yīng)用。

關(guān)于EAWorld:微服務(wù),DevOps,數(shù)據(jù)治理,移動架構(gòu)原創(chuàng)技術(shù)分享。關(guān)注微信公眾號EAWorld!

聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權(quán)或其他問題,請聯(lián)系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內(nèi)容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯
x
*文字標(biāo)題:
*糾錯內(nèi)容:
聯(lián)系郵箱:
*驗 證 碼:

粵公網(wǎng)安備 44030502002758號