劉思宇+梁毅+陳誠
摘要:Spark是一種新型分布式海量數(shù)據(jù)處理平臺,在應(yīng)用執(zhí)行過程中,Spark以任務(wù)作為最小執(zhí)行單元。因此,任務(wù)執(zhí)行時間預(yù)測是指導(dǎo)Spark進行性能分析、優(yōu)化資源調(diào)度以及故障監(jiān)控的基礎(chǔ)。在Spark平臺中,由于計算數(shù)據(jù)分布不均及網(wǎng)絡(luò)資源的共享,導(dǎo)致同樣計算邏輯的任務(wù)在不同計算節(jié)點上執(zhí)行的時間可能產(chǎn)生很大差異,需根據(jù)實時運行環(huán)境進行動態(tài)預(yù)測。通過結(jié)合任務(wù)在不同節(jié)點所需數(shù)據(jù)量以及集群網(wǎng)絡(luò)狀況,對任務(wù)在不同節(jié)點的執(zhí)行時間進行預(yù)測。實驗表明,該方法對任務(wù)進行預(yù)估,誤差可保證在19%以內(nèi),任務(wù)執(zhí)行時間預(yù)估算法對Spark調(diào)優(yōu)有一定的指導(dǎo)作用。
關(guān)鍵詞:大數(shù)據(jù);Spark;預(yù)測;分布式;任務(wù)
DOIDOI:10.11907/rjdk.171509
中圖分類號:TP306
文獻標識碼:A 文章編號:1672-7800(2017)012-0019-03
Abstract:Spark is a new distributed big data processing platform. In the implementation of Spark, task is the minimum execution unit. Therefore, the prediction of the execution time of the task can guide Spark to perform performance analysis, optimize resource scheduling and fault monitoring. In Spark platform, due to the uneven distribution of computing data and the sharing of network resources, the task with same computing logic may have different execution time in different nodes, and it needs to be dynamically predicted according to the real time environment. Currently on the Spark platform, the prediction technology is rarely studied. This paper predicts the execution time of each task at different nodes by combing the amount of data required by different tasks in each node and the status of cluster network. Experiment show that the method can be used to estimate the tasks in the task set, the error can guarantee less than 19%. Therefore, the task execution time estimation algorithm proposed in this paper has some guiding effect on Spark tuning.
Key Words:big data; Spark; prediction; distributed; task
0 引言
大數(shù)據(jù)時代,新型海量數(shù)據(jù)處理平臺大量涌現(xiàn)。其中,以Spark數(shù)據(jù)處理平臺為典型代表的分布式內(nèi)存計算平臺得到廣泛關(guān)注 [1]。Spark是繼Hadoop之后提出的一種基于內(nèi)存的分布式大數(shù)據(jù)處理平臺,被譽為可以取代Map/Reduce的下一代大數(shù)據(jù)處理核心技術(shù)[2-3]。與Hadoop Map/Reduce相比,Spark基于內(nèi)存的運算可提升100倍處理速度[4]。
任務(wù)是Spark的最小執(zhí)行單元。由于不同任務(wù)所需數(shù)據(jù)可能存在于集群各節(jié)點上,且數(shù)據(jù)量不盡相同,導(dǎo)致同樣計算邏輯任務(wù)在不同的計算節(jié)點上執(zhí)行的時間產(chǎn)生很大差異,需要根據(jù)應(yīng)用運行的實時環(huán)境進行動態(tài)預(yù)測[5]。對任務(wù)執(zhí)行時間的有效預(yù)測可以指導(dǎo)Spark進行性能分析、優(yōu)化資源調(diào)度以及監(jiān)控平臺故障。目前,在Spark平臺還沒有任務(wù)執(zhí)行時間預(yù)估技術(shù)。
本文分析了Spark平臺任務(wù)的拉取、執(zhí)行過程,結(jié)合各任務(wù)在不同節(jié)點所需的數(shù)據(jù)量以及集群網(wǎng)絡(luò)狀況,對各任務(wù)執(zhí)行時間進行預(yù)測,為優(yōu)化資源調(diào)度、應(yīng)用性能分析奠定基礎(chǔ)。
1 相關(guān)技術(shù)
1.1 Spark系統(tǒng)模型
Spark 是由 UC Berkeley AMP 實驗室開發(fā)的開源通用的海量數(shù)據(jù)處理平臺,是對Map/Reduce型海量數(shù)據(jù)處理平臺的創(chuàng)新與豐富,其構(gòu)架如圖1所示。
Spark平臺采用Master/Slave結(jié)構(gòu),其中集群管理器作為Master端,負責平臺中的應(yīng)用與資源管理;計算節(jié)點作為Slave端,負責啟動任務(wù)執(zhí)行器Executor,由Executor負責任務(wù)的實際計算。在執(zhí)行應(yīng)用時,會在相應(yīng)節(jié)點運行Driver進程,負責整個應(yīng)用的執(zhí)行和管理。
Spark大數(shù)據(jù)處理平臺的計算模型使用有向無環(huán)圖(Directed Acyclic Graph, DAG),描述復(fù)雜數(shù)據(jù)處理邏輯并提供更豐富的數(shù)據(jù)操作原語。Spark引入新的分布式數(shù)據(jù)集抽象表達模型——彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDDs)。RDD作為Spark平臺的核心概念,用于描述分布存儲于多個節(jié)點的海量數(shù)據(jù)集。
1.2 Spark任務(wù)執(zhí)行模型
Spark計算模型中,依據(jù)數(shù)據(jù)操作類型,作業(yè)分為多個階段(Stage),各階段以一定的拓撲結(jié)構(gòu)執(zhí)行。在單個階段內(nèi)部,為了操作海量數(shù)據(jù)集,Spark會并行執(zhí)行一組完全相同的任務(wù)來處理RDD的每一分片。任務(wù)執(zhí)行完成后,輸出結(jié)果會進行分區(qū)處理,具有相同分區(qū)值的元組會傳給后繼階段中同一個任務(wù)進行相關(guān)計算。不同Stage之間的數(shù)據(jù)傳輸過程被稱為混洗(Shuffle)。同一階段任務(wù)啟動時,會首先選擇合適的節(jié)點,并拉取其它節(jié)點上屬于該任務(wù)的數(shù)據(jù)進行計算。
執(zhí)行任務(wù)時,Spark首先并行拉取遠程數(shù)據(jù),而后對拉取來的數(shù)據(jù)進行處理,Spark的任務(wù)執(zhí)行模型如圖2所示。
2 任務(wù)執(zhí)行時間預(yù)測方法
本節(jié)介紹任務(wù)執(zhí)行時間預(yù)測技術(shù),主要是不同節(jié)點上各任務(wù)數(shù)據(jù)量獲取及各任務(wù)執(zhí)行時間計算。
2.1 各節(jié)點數(shù)據(jù)量獲取
當新的Stage開始時,各任務(wù)需要根據(jù)上一階段不同任務(wù)寫入Spark既有模塊MapStatus中的文件位置信息,尋找合適的啟動位置,本方案擬定義新的方法對Map任務(wù)輸出到各節(jié)點的數(shù)據(jù)量進行統(tǒng)計,核心算法如下:
2.3 系統(tǒng)實現(xiàn)
系統(tǒng)基于Spark 1.6平臺實現(xiàn),主要針對Spark平臺中既有的各模塊進行改造,并新增了數(shù)據(jù)采集模塊和任務(wù)執(zhí)行代價預(yù)測模塊,圖3為系統(tǒng)架構(gòu)。
在Executor端,新增了用以獲取Executor間網(wǎng)絡(luò)通信狀況以及Executor輸出數(shù)據(jù)信息的ExecutorMonitor。
在Driver端,新增了PartitionSizeGetter組件以及TaskCostEstimator組件。其中PartitionSizeGetter模塊負責在任務(wù)啟動之前訪問Spark既有的組件MapOutputTracker,統(tǒng)計出該任務(wù)在不同節(jié)點所需拉取的數(shù)據(jù)量,為任務(wù)執(zhí)行代價估計模塊做準備。TaskCostEstimator通過PartitionSizeGetter模塊獲取不同任務(wù)在不同節(jié)點上所需的數(shù)據(jù)量,以及在Driver上的網(wǎng)絡(luò)通信狀況,通過計算模型進行綜合,對任務(wù)在不同位置的執(zhí)行時間進行預(yù)估。
3 性能評估
3.1 實驗環(huán)境及負載選擇
本系統(tǒng)基于Apache Spark 1.6實現(xiàn),所用操作系統(tǒng)為CentOS6.2,實驗測試環(huán)境由 4臺物理節(jié)點構(gòu)成,每臺節(jié)點的硬件配置、網(wǎng)絡(luò)狀況、操作系統(tǒng)以及JVM版本等如表1所示。在本測試環(huán)境中,1臺節(jié)點作為提交節(jié)點,其余3臺作為數(shù)據(jù)計算節(jié)點。
任務(wù)執(zhí)行時間通過Spark提供的UI界面監(jiān)控獲得,選擇BigDataBench中的標準負載WordCount進行實驗。
3.2 預(yù)測結(jié)果分析
監(jiān)測WordCount負載中執(zhí)行Count操作的各任務(wù)所花費的時間,圖4為任務(wù)執(zhí)行時間預(yù)測值與實際值的對比情況。
使用本文方法對各任務(wù)的執(zhí)行時間進行預(yù)測,所得到的預(yù)測曲線與實際曲線基本吻合,任務(wù)預(yù)測時間與實際執(zhí)行時間的平均誤差為13%,最大誤差不超過19%。
4 結(jié)語
本文面向Spark海量數(shù)據(jù)處理平臺,設(shè)計并實現(xiàn)了任務(wù)執(zhí)行時間的預(yù)測模型,該模型充分考慮了任務(wù)的數(shù)據(jù)拉取代價與數(shù)據(jù)處理代價,對任務(wù)執(zhí)行時間進行了綜合預(yù)測。實驗表明,本文預(yù)測方法可有效預(yù)測任務(wù)執(zhí)行時間,最大誤差不超過19%。
參考文獻:
[1] AVAILABLE.Apache spark [EB/OL]. https://spark.apache.org.
[2] AVAILABLE.Apache hadoop [EB/OL]. http://hadoop.apache.org/.
[3] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters[EB/OL]. http://blog.csdn.net/cnlht/article/details/6181799.
[4] KRISH K R, ANWAR A, BUTT A R. HatS: a heterogeneity-aware tiered storage for hadoop[C].Ieee/acm International Symposium on Cluster, Cloud and Grid Computing. IEEE,2014:502-511.
[5] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets[C].A fault-tolerant abstraction for in-memory cluster computing,2014.
(責任編輯:杜能鋼)