吳廣建 于夢(mèng)潔
摘? 要: SDC(Streamsets Data Collector)是一款可拖拽式的大數(shù)據(jù)ETL工具,可以不用寫代碼就能實(shí)現(xiàn)大量數(shù)據(jù)的處理,但要實(shí)現(xiàn)任務(wù)定時(shí)管理和多數(shù)據(jù)源等復(fù)雜功能則需要利用其公司的非開(kāi)源產(chǎn)品。文章介紹利用SDC內(nèi)部接口,設(shè)計(jì)開(kāi)發(fā)定時(shí)組件以及結(jié)合其自帶組件,實(shí)現(xiàn)管道(PipeLine)的定時(shí)任務(wù)調(diào)度和多數(shù)據(jù)源應(yīng)用。實(shí)驗(yàn)結(jié)果表明,拓展的框架組件可以實(shí)現(xiàn)偽實(shí)時(shí)及復(fù)雜的定時(shí)任務(wù),并結(jié)合內(nèi)部組件完成本機(jī)及遠(yuǎn)程多數(shù)據(jù)源整合應(yīng)用,能解決用戶對(duì)具體定時(shí)場(chǎng)景的應(yīng)用問(wèn)題。
關(guān)鍵詞: ETL工具; 管道; 定時(shí)組件; 多數(shù)據(jù)源
中圖分類號(hào):TP391? ? ? ? ? 文獻(xiàn)標(biāo)志碼:A? ? ?文章編號(hào):1006-8228(2019)07-19-03
Abstract: SDC(Streamsets Data Collector) is a drag-and-drop big data ETL tool that can handle large amounts of data without writing code, but for complex functions such as task timing management and multiple data sources, it is needed to work with its company's non-open source products. In this paper, SDC internal interface is used to design and develop a timing component and with the combination of SDC internal components to realize timing task scheduling and multi-data source application of pipeline. The experimental results show that the extended framework components can realize pseudo-real-time and complex timing tasks, and combine the internal components to complete the integration application of local and remote multiple data sources, which solves the application problem of users to specific timing scenarios.
Key words: big data; ETL tool; pipeline; timing component; multi-data source
0 引言
隨著大數(shù)據(jù)時(shí)代的到來(lái),數(shù)據(jù)量越來(lái)越多,早在2010年全球數(shù)據(jù)就跨入了ZB時(shí)代,到2020年全球?qū)碛?5ZB的數(shù)據(jù)量。由于數(shù)據(jù)量巨大、數(shù)據(jù)類型多樣、價(jià)值密度低,大數(shù)據(jù)技術(shù)為我們分析問(wèn)題和解決問(wèn)題提供了新的思路和方法,其研究漸漸成為熱點(diǎn)[3]。數(shù)據(jù)分析是整個(gè)大數(shù)據(jù)處理流程里最核心的部分,在數(shù)據(jù)分析的過(guò)程中, 會(huì)發(fā)現(xiàn)數(shù)據(jù)的價(jià)值所在[4]。數(shù)據(jù)分析處理工具的種類越來(lái)越多,我們對(duì)工具的性能、方便、及易用性要求越來(lái)越高,各式各樣的數(shù)據(jù)處理工具用在數(shù)據(jù)處理的不同場(chǎng)景[5],以滿足我們現(xiàn)在數(shù)據(jù)分析的需要。本文介紹的框架可以實(shí)現(xiàn)不用書寫一段代碼就可以實(shí)現(xiàn)端到端的數(shù)據(jù)傳送,可用于數(shù)據(jù)的離線提取、實(shí)時(shí)更新等,本文利用自定義定時(shí)組件、RPC組件和事件觸發(fā)組價(jià),實(shí)現(xiàn)了多數(shù)據(jù)源的整合定時(shí)更新。
1 簡(jiǎn)介
2014年,前Clouderra開(kāi)發(fā)團(tuán)隊(duì)負(fù)責(zé)人和前Informatica產(chǎn)品負(fù)責(zé)人創(chuàng)建了Streamsets公司,他們意識(shí)到大數(shù)據(jù)正在打破傳統(tǒng)的基于結(jié)構(gòu)數(shù)據(jù)的數(shù)據(jù)集成范式和數(shù)據(jù)移動(dòng)模式,因此開(kāi)發(fā)了多種大數(shù)據(jù)應(yīng)用產(chǎn)品,其中公司核心產(chǎn)品是一款開(kāi)源的數(shù)據(jù)移動(dòng)引擎框架SDC,是一款具有UI界面可拖拽的大數(shù)據(jù)ETL工具。
1.1 SDC相關(guān)組件
⑴ PipeLine
Pipeline畫布內(nèi)部,可以定義一個(gè)Origin數(shù)據(jù)源,多個(gè)processor處理器和多個(gè)Destination目的地。在Data Collector中可以運(yùn)行多個(gè)PipeLine, 當(dāng)PipeLine運(yùn)行時(shí)可以實(shí)時(shí)監(jiān)控每條數(shù)據(jù),提供數(shù)據(jù)狀態(tài)的可視化界面。PipeLine還提供數(shù)據(jù)報(bào)警、事件觸發(fā)、傳遞參數(shù),集群模式運(yùn)行等功能。
⑵ Original
Original組件對(duì)接外部接口或設(shè)備進(jìn)行數(shù)據(jù)的抽取,是數(shù)據(jù)的來(lái)源。每個(gè)PipeLine中只能有一個(gè)數(shù)據(jù)源(origin)例如:JDBC Query Consumer,Kafka Consumer。
⑶ Processor
Processors組件可以將origin抽取來(lái)的數(shù)據(jù)進(jìn)行清洗,然后再傳到Destination組件,processor組件可以添加多個(gè)。例如:字段刪除、分割、合并等。
⑷ Destination
Destination組件將清洗過(guò)后的數(shù)據(jù)裝載到目的地址,目的地址可以為多個(gè)。例如:將數(shù)據(jù)上傳的關(guān)系型數(shù)據(jù)庫(kù)、或者通過(guò)RPC http傳給另一個(gè)PipeLine。
⑸ Executor
在SDC中存在兩種類型的事件,一種是PipeLine相關(guān)事件,只有在PipeLine啟動(dòng)或者停止時(shí)產(chǎn)生;另一種是stage相關(guān)事件,是單個(gè)組件產(chǎn)生的事件,例如:JDBC query Consumer查詢完數(shù)據(jù)時(shí)會(huì)產(chǎn)生”jdbc-query-success”,”no-more-data”事件。
1.2 SCH(streamsets Control hub)[6]
SCH是一個(gè)非開(kāi)源的產(chǎn)品,在官網(wǎng)上付費(fèi)訂閱后才可使用。它為企業(yè)中的團(tuán)隊(duì)開(kāi)發(fā)提供了方便,主要功能:實(shí)現(xiàn)了PipeLine的自動(dòng)部署,PipeLine之間實(shí)現(xiàn)多個(gè)數(shù)據(jù)的拓?fù)浣Y(jié)構(gòu),實(shí)現(xiàn)PipeLine的定時(shí)操作,實(shí)現(xiàn)控制創(chuàng)建、版本管理、版本回滾等。官方為SCH提供了云服務(wù),也可將SCH安裝在遠(yuǎn)程的服務(wù)器上進(jìn)行PipeLine的管理工作。
2 SDC應(yīng)用
2.1 應(yīng)用擴(kuò)展
由于SCH是非開(kāi)源的組件,不利用SCH情況下無(wú)法實(shí)現(xiàn)SDC的定時(shí)任務(wù)調(diào)度,而且對(duì)于每個(gè)管道畫布只允許有一個(gè)數(shù)據(jù)源(origin)存在,這就限制了多個(gè)數(shù)據(jù)源進(jìn)行數(shù)據(jù)的整合問(wèn)題。為了解釋如何解決以上兩個(gè)問(wèn)題,下面展示了簡(jiǎn)單的應(yīng)用場(chǎng)景案例。案例中具體分為三個(gè)部分:①利用開(kāi)源的Quartz[7]包開(kāi)發(fā)SDC擴(kuò)展組件——基于Cron[8]表達(dá)式的定時(shí)任務(wù)組件;②利用系統(tǒng)自帶組件RPC實(shí)現(xiàn)多個(gè)數(shù)據(jù)源的數(shù)據(jù)整合;③利用組件的事件觸發(fā)機(jī)制和組件Pipeline Finisher Executor實(shí)現(xiàn)停止管道。圖1詳細(xì)描述了應(yīng)用的具體流程,圖中虛線箭頭代表發(fā)送http請(qǐng)求觸發(fā)管道,圖中實(shí)線箭頭表示數(shù)據(jù)流。
2.1.1 Cron PipeLine
Cron PipeLine中包括三個(gè)組件:第一個(gè)為自定義組件CRON trigger,用于定時(shí)觸發(fā)cron表達(dá)式產(chǎn)生數(shù)據(jù)流;第二個(gè)組件為系統(tǒng)自帶組件Stream Selector,功能是將第一個(gè)組件產(chǎn)生的數(shù)據(jù)流根據(jù)指定條件進(jìn)行分發(fā);第三個(gè)組件為自定義組件PipeLine Trigger,功能是根據(jù)第一個(gè)組件的數(shù)據(jù)流進(jìn)行定時(shí)啟動(dòng)指定管道。
⑴ CRON trigger組件
自定義CRON trigger組件為origin組件,利用開(kāi)源的作業(yè)調(diào)度框架Quartz和Quartz所支持的Cron表達(dá)式進(jìn)行origin組件開(kāi)發(fā)擴(kuò)展[9],以下為偽代碼。
① 利用LinkedBlockingQueue創(chuàng)建容器,以保證線程安全。
② 參考Quartz文檔,創(chuàng)建scheduler,job,trigger,利用界面?zhèn)鱽?lái)的cron表達(dá)式產(chǎn)生數(shù)據(jù)放入上面創(chuàng)建容器。
③ 在produce方法中將容器中數(shù)據(jù)拉出傳給下一個(gè)組件(LinkedBlockingQueue.poll(15,TimeUnit.SECONDS)可降低CPU占有率)。
⑵ PipeLine Trigger組件
自定義PipeLine Trigger組件利用SDC所提供的restApi接口,通過(guò)發(fā)送http請(qǐng)求控制PipeLine的啟動(dòng)。組件設(shè)計(jì)可以開(kāi)啟本地服務(wù)的Pipeline,也可開(kāi)啟遠(yuǎn)程服務(wù)的PipeLine,遠(yuǎn)程服務(wù)要提供遠(yuǎn)程地址及遠(yuǎn)程用戶密碼,以下為偽代碼。
① 傳入用戶名和密碼用于用戶校驗(yàn)。
② 根據(jù)SDC提供接口,發(fā)送http請(qǐng)求開(kāi)啟指定pipeline。
2.1.2 多數(shù)據(jù)源
多數(shù)據(jù)源是利用SDC的兩個(gè)RPC組件實(shí)現(xiàn)的。分別為發(fā)送端SDC RPC組件和接受端SDC RPC組件,原理是利用rpc協(xié)議[10]實(shí)現(xiàn)數(shù)據(jù)的傳輸。這兩個(gè)組件所在管道可以在同一個(gè)主機(jī)上,也可以在同一個(gè)內(nèi)網(wǎng)或者外網(wǎng)。
圖1中,在PipeLine1中連接數(shù)據(jù)源,將抽取的數(shù)據(jù)傳入發(fā)送端SDC RPC組件,發(fā)送端SDC RPC組件中配置對(duì)應(yīng)的數(shù)據(jù)傳輸目的地址和SDC RPC ID,在PipeLine3中接受端SDC RPC組件通過(guò)監(jiān)聽(tīng)指定的端口號(hào)和設(shè)置SDC RPC ID驗(yàn)證接受傳輸來(lái)的數(shù)據(jù)(SDC RPC ID用于識(shí)別發(fā)送端SDC RPC組件)。
3 結(jié)論及展望
本文主要闡述了SDC框架的拓展及應(yīng)用,通過(guò)配合自定義定時(shí)組件、組件SDC RPC和組件PipeLine Finish Executor,實(shí)現(xiàn)了多數(shù)據(jù)源整合的定時(shí)策略,通過(guò)多次框架組件測(cè)試表明整個(gè)組件實(shí)用性強(qiáng),能實(shí)現(xiàn)偽實(shí)時(shí)及復(fù)雜的定時(shí)任務(wù),解決了具體場(chǎng)景定時(shí)任務(wù)應(yīng)用問(wèn)題。由于組件實(shí)現(xiàn)了cron表達(dá)式輸入實(shí)現(xiàn)定時(shí),有些使用者對(duì)cron表達(dá)式不了解。因此組件擴(kuò)展還需進(jìn)一步研究,從而實(shí)現(xiàn)界面鼠標(biāo)點(diǎn)擊定時(shí)以及對(duì)任務(wù)的進(jìn)一步優(yōu)化管理。大數(shù)據(jù)時(shí)代導(dǎo)致大數(shù)據(jù)研發(fā)公司層出不窮,各式各樣的大數(shù)據(jù)框架供我們應(yīng)用在不同的大數(shù)據(jù)處理情景下,相信不久的將來(lái)將出現(xiàn)更加優(yōu)秀的大數(shù)據(jù)ETL處理框架。
參考文獻(xiàn)(References):
[1] Streamsets Data Collector. Available at https://streamsets.com/products/sdc
[2] Ralph Kimball著,周連科譯.數(shù)據(jù)倉(cāng)庫(kù)工具箱[M].清華大學(xué)出版社,2017.
[3] 陶雪嬌,胡曉峰,劉洋.大數(shù)據(jù)研究綜述[J].系統(tǒng)仿真學(xué)報(bào),2013.25(S1):142-146
[4] 劉智慧,張泉靈.大數(shù)據(jù)技術(shù)研究綜述[J].浙江大學(xué)學(xué)報(bào)(工學(xué)版),2014,48(06):957-972.
[5] 程學(xué)旗,靳小龍,王元卓,郭嘉豐,張鐵贏,李國(guó)杰.大數(shù)據(jù)系統(tǒng)和分析技術(shù)綜述[J].軟件學(xué)報(bào),2014.25(09):1889-1908
[6] Streamsets Control Hub.Available at https://streamsets.com/products/sch
[7] quartz. Available at http://www.quartz-scheduler.org/documentation
[8] Richard Blum著,門佳,武海峰譯.Linux命令行與shell腳本編程大全[M].人民郵電出版社,2016.
[9] Streamsets tutorials. Available at https://github.com/streamsets/tutorials
[10] 李智慧.大型分布式網(wǎng)站架構(gòu)[M].電子工業(yè)出版社,2018.