李明東 王英 焦杰
摘要:本文基于Hadoop平臺設(shè)計了一個實時數(shù)據(jù)處理系統(tǒng),通過對主流實時計算框架的研究,解決了Spark,puma沒能解決的數(shù)據(jù)源主動接入問題.本系統(tǒng)設(shè)計主要包括核心計算模塊設(shè)計、數(shù)據(jù)接入模塊設(shè)計和存儲模塊設(shè)計.主要用到的算法包括可靠性機制算法、信號量機制算法、事務(wù)性機制算法等.實踐結(jié)果表明,系統(tǒng)處理效率高且運行穩(wěn)定.
關(guān)鍵詞:Hadoop;實時數(shù)據(jù)處理;可靠性機制
中圖分類號:TP311.13? 文獻標(biāo)識碼:A? 文章編號:1673-260X(2019)04-0047-03
隨著大數(shù)據(jù)技術(shù)的快速發(fā)展,要求現(xiàn)有平臺不僅能夠處理海量數(shù)據(jù),還要能夠快速的對接批量數(shù)據(jù),實現(xiàn)數(shù)據(jù)的實時處理與結(jié)果的展現(xiàn);本文以Hadoop主流數(shù)據(jù)處理平臺為基礎(chǔ),使用storm計算框架完成各個模塊的功能設(shè)計.在整個系統(tǒng)框架下,分別對可靠性等機制進行了內(nèi)外部環(huán)境的功能及性能的測試.為后續(xù)數(shù)據(jù)可視化的實時展現(xiàn)以及預(yù)測提供了堅實的理論和實驗基礎(chǔ).
1 實時數(shù)據(jù)處理系統(tǒng)結(jié)構(gòu)與功能設(shè)計
基于ARM微處理器芯片的智能遠程防盜系統(tǒng)的結(jié)構(gòu)功能設(shè)計主要包括:電源模塊、無線本系統(tǒng)開發(fā)核心基于Hadoop架構(gòu),結(jié)合了kafka[1]、HBase、Thritf,以及Zookeeper[2]集群等開源工具,使用Storm作為數(shù)據(jù)計算模塊.實時數(shù)據(jù)處理系統(tǒng)的環(huán)境和服務(wù)部署框架如圖1所示.
1.1 Storm-YARN
Hadoop集群中的3臺機器提供給Storm- YARN使用,一臺作為Nimbus,另外兩臺作為Supervisor機,每個開啟4個工作進程.
1.2 Kafka
用于提供高吞吐消息服務(wù)的Kafka隊列部署在一臺Linux物理機器上,Kafka可以有效地解決在線數(shù)據(jù)活躍導(dǎo)致與系統(tǒng)之間速度不匹配的問題.Kafka通過追加數(shù)據(jù)的方法完成對磁盤數(shù)據(jù)的長久保持,提高系統(tǒng)運算能力的同時又能穩(wěn)定存儲數(shù)據(jù).
1.3 HBase
Hadoop集群中的3臺機器被劃分用于部署HBase服務(wù).HBase架構(gòu)圖如圖2所示.
HBase中客戶端通過遠程過程調(diào)用機制與HRegionServer和HMaster進行通信.當(dāng)用戶對數(shù)據(jù)進行讀寫操作時,客戶端通過遠程過程調(diào)用機制與HRegionServer通信,對數(shù)據(jù)進行創(chuàng)建、權(quán)限、刪除等操作時,客戶端通過遠程過程調(diào)用機制與HMaster通信.
1.4 實時系統(tǒng)中Hadoop集群配置
本系統(tǒng)Hadoop集群包括10臺機器,一臺機器為Namenode,其余為Datanode.Namenode用于維護文件系統(tǒng)樹,包括樹內(nèi)的文件和目錄,Datanode存儲和檢索數(shù)據(jù)塊,并維護數(shù)據(jù)塊存儲列表,一定周期內(nèi)將信息發(fā)送給Namenode[3].
1.5 Zookeeper集群配置
Zookeeper集群分配4臺機器,Zookeeper采用與文件系統(tǒng)相似的目錄節(jié)點樹來存儲數(shù)據(jù),數(shù)據(jù)的集群管理通過維護和檢測數(shù)據(jù)的變化以實現(xiàn),此外Zookeeper在本系統(tǒng)中為HBase等程序提供服務(wù).
2 實時數(shù)據(jù)接入
2.1 實時數(shù)據(jù)處理
首先啟動kafka消息隊列服務(wù),將用戶數(shù)據(jù)源接入系統(tǒng)緩沖池,第二步啟動位于數(shù)據(jù)源層的數(shù)據(jù)源接入模組,讀取配置,向外提供服務(wù).用戶向系統(tǒng)發(fā)送一項任務(wù)時,系統(tǒng)首先對任務(wù)進行邏輯解析,將解析后的任務(wù)發(fā)送到計算層,完成實時計算和存儲.系統(tǒng)外應(yīng)用使用應(yīng)用程序編程接口將數(shù)據(jù)發(fā)送到系統(tǒng),并在消息隊列中進行緩存,數(shù)據(jù)在消息隊列中排隊等待,對數(shù)據(jù)處理需要在計算層有相關(guān)的在線處理進程.
在采用C/S架構(gòu)的數(shù)據(jù)源接入層中,外部應(yīng)用被稱為Client,系統(tǒng)即為Sever端.Client端可以通過發(fā)送數(shù)據(jù)給Server后,等待Server確定后繼續(xù)發(fā)送數(shù)據(jù)或者不經(jīng)過Server確定一直發(fā)送數(shù)據(jù)這兩種方式傳輸數(shù)據(jù)[4].實時數(shù)據(jù)接入流程圖如圖3所示.
2.2 數(shù)據(jù)處理模式設(shè)計
數(shù)據(jù)處理[4]包括對數(shù)據(jù)的統(tǒng)計、提取、過濾、計算TopN、數(shù)據(jù)聚合等,還要對中文數(shù)據(jù)流進行分詞操作.數(shù)據(jù)處理流程如圖4所示.
2.3 實時處理系統(tǒng)實現(xiàn)
實時處理系統(tǒng)由數(shù)據(jù)接入模塊、存儲模塊、核心計算模塊組成.
(1)數(shù)據(jù)接入工作流程如圖5所示.
模塊中分為客戶端、服務(wù)器、通信以及消息隊列;客戶端發(fā)送流式數(shù)據(jù)至服務(wù)器,同時為了提升消息的傳輸效率和質(zhì)量,在客戶端中加入了Retey機制,并設(shè)置最大的Retry次數(shù)是5次,當(dāng)連續(xù)5次調(diào)用失敗才算最終失敗.
當(dāng)客戶端調(diào)用失敗時拋出異常,系統(tǒng)調(diào)用handleTException方法處理異服務(wù)器需要能快速響應(yīng)客戶端的請求,因此本文服務(wù)器采用線程池工作模式,設(shè)置最小線程數(shù)8,最大線程數(shù)256,這樣提高了服務(wù)器響應(yīng)速度,又最大程度減少了資源的消耗[5].
Kafka消息隊列部署在一臺linux機器上,Kafka將來自同一數(shù)據(jù)源的消息即同一主題,默認分區(qū)個數(shù)為10.在Kafka中,生產(chǎn)者產(chǎn)生消息并且將消息發(fā)送給服務(wù)器;消費者負責(zé)使用消息,這三者的關(guān)系如圖6所示.
通信部分有handleMsg以及handleMsg兩個接口方法,用戶根據(jù)需求選擇調(diào)用.
(2)實時數(shù)據(jù)處理系統(tǒng)核心計算框架
本系統(tǒng)中的實時計算部分是基于Storm框架開發(fā),spout組件提供數(shù)據(jù)噴發(fā)服務(wù),Bolt組件提供數(shù)據(jù)處理操作,二者構(gòu)成Storm的在線計算任務(wù).核心計算框架如圖7所示.
3 實時數(shù)據(jù)處理系統(tǒng)算法的設(shè)計與處理
3.1 可靠性機制算法設(shè)計
在基于Storm平臺的可靠性機制算法下構(gòu)建流程如下圖所示.
如果消息處理失敗,則調(diào)用fail方法.首先將消息隊列頭中的消息移除,消息處理結(jié)果被標(biāo)記為失敗,進行計時.消息處理成功,調(diào)用ack方法,將消息隊列頭中的消息移除,消息處理結(jié)構(gòu)被標(biāo)記為成功,開始計時,計時結(jié)束調(diào)用nextTuple方法發(fā)送接下來的消息[6].
3.2 信號量機制算法
本系統(tǒng)基于Storm信號量機制開發(fā)了一個組件Signalspout.Signalspout組件用于發(fā)送清空緩存等操作的信號給其他組件,只需signalspout組件定時發(fā)射信號就能實現(xiàn)從一個方面控制多個時間粒度[7],signalspout工作原理如圖9所示.
3.3 事務(wù)性機制算法
使用TridentTopology事務(wù)性在線任務(wù)完成該算法,Trident包括Partition-local操作、Merge/Join操作、流分組操作、Pepartitionning操作、Aggregation操作[8].
4 總結(jié)
本文基于Hadoop平臺設(shè)計了一個實時數(shù)據(jù)處理系統(tǒng),彌補了spark、Hadoop平臺不能供多用戶實時操作數(shù)據(jù)的不足.HBaseBolt組件實現(xiàn)了存儲消息序列到HBase數(shù)據(jù)庫中,將tuple數(shù)據(jù)樣例轉(zhuǎn)變?yōu)閜ut實例進行存儲.改進后的實時處理系統(tǒng)確保數(shù)據(jù)源組件spout發(fā)出的信息能被bolts及時捕捉并處理.系統(tǒng)采用的信號量機制控制對時間粒度不同時,控制數(shù)據(jù)分流并進行置零計數(shù);通過多次運行試驗,系統(tǒng)處理數(shù)據(jù)及時且運行穩(wěn)定,提升了平臺處理數(shù)據(jù)的效率.
參考文獻:
〔1〕曲風(fēng)富.京東基于Samza的流失計算實踐[J].程序員,2014(2):40-43.
〔2〕Yang L,Yan Z.A method to avoid single failure of Namenode in HDFSZookeeper[J].Software,2016.
〔3〕金曉軍.Trident Storm與流計算經(jīng)驗[J].程序員,2015(10):99-103.
〔4〕朱珠.基于Hadoop的海量數(shù)據(jù)處理模型研究與應(yīng)用[D].北京郵電大學(xué),2014.
〔5〕陳飛.基于MapReduce的數(shù)據(jù)清洗算法研究[D].昆明理工大學(xué),2016.101-103.
〔6〕徐媛媛.基于MapReduce的相似性連接研究[D].寧波大學(xué),2014.22-25.
〔7〕雷斌.面向復(fù)雜距離度量的MapReduce相似性連接技術(shù)研究[D].東北大學(xué),2016.55-58.
〔8〕韓來明.基于遺傳算法的分布式數(shù)據(jù)挖MapReduce架構(gòu)研究[D].天津大學(xué),2015.31-35.