国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

基于數(shù)據(jù)特性的Spark任務(wù)性能優(yōu)化

2018-02-27 03:06吳毅堅(jiān)趙文耘
關(guān)鍵詞:傾斜度代碼節(jié)點(diǎn)

柴 寧 吳毅堅(jiān) 趙文耘

1(復(fù)旦大學(xué)軟件學(xué)院 上海 201203) 2(復(fù)旦大學(xué)計(jì)算機(jī)科學(xué)技術(shù)學(xué)院 上海 201203) 3(上海市數(shù)據(jù)科學(xué)重點(diǎn)實(shí)驗(yàn)室 上海 200433)

0 引 言

近年來(lái)移動(dòng)互聯(lián)網(wǎng)和社交網(wǎng)絡(luò)的發(fā)展迅速,互聯(lián)網(wǎng)上的數(shù)據(jù)開始呈指數(shù)級(jí)的增長(zhǎng),如何高效而快速地對(duì)數(shù)據(jù)進(jìn)行處理和分析逐漸成為了研究熱點(diǎn)。Google公司作為擁有海量搜索數(shù)據(jù)的互聯(lián)網(wǎng)公司,于2003年至2006年提出分布式文件系統(tǒng)HDFS和函數(shù)式編程模型Map-Reduce的概念[1]。Yahoo! 公司基于二者開發(fā)了可擴(kuò)展的分布式計(jì)算框架Hadoop。由于Hadoop的基于硬盤存儲(chǔ)數(shù)據(jù)的特點(diǎn),在進(jìn)行大數(shù)據(jù)集的多輪迭代運(yùn)算時(shí),硬盤的I/O和數(shù)據(jù)傳輸相對(duì)耗時(shí),達(dá)不到相對(duì)實(shí)時(shí)的處理速度[2]。這一點(diǎn)在對(duì)日志監(jiān)控,機(jī)器學(xué)習(xí)算法等領(lǐng)域,尤為明顯。

Spark作為分布式數(shù)據(jù)處理框架,采用內(nèi)存計(jì)算的方法引入彈性數(shù)據(jù)集RDD(Resilient Distributed Datasets)[3]的概念,將數(shù)據(jù)加載到內(nèi)存里,降低了數(shù)據(jù)交換的訪問(wèn)延遲,達(dá)到了準(zhǔn)實(shí)時(shí)分析大數(shù)據(jù)集的能力。Spark框架的推廣和普及,極大地提升了分布式數(shù)據(jù)處理任務(wù)的運(yùn)行效率。

然而,Spark框架本身也同樣存在了運(yùn)行效率的問(wèn)題,存在可優(yōu)化的空間。Spark運(yùn)行大致有如下兩個(gè)問(wèn)題:(1) 彈性數(shù)據(jù)集RDD帶來(lái)的內(nèi)存不足的問(wèn)題。出現(xiàn)內(nèi)存不足的情況是因?yàn)镾park選擇將數(shù)據(jù)持久化到機(jī)器內(nèi)存中。這樣設(shè)計(jì)的初衷是為了減少節(jié)點(diǎn)中的數(shù)據(jù)交換,可以加快數(shù)據(jù)任務(wù)的處理速度。但是當(dāng)單個(gè)節(jié)點(diǎn)需要計(jì)算的數(shù)據(jù)超過(guò)機(jī)器內(nèi)存的處理極限的時(shí)候,任務(wù)會(huì)因?yàn)閮?nèi)存不足而導(dǎo)致失敗。(2) 數(shù)據(jù)傾斜問(wèn)題導(dǎo)致的運(yùn)行效率低下。數(shù)據(jù)集合具有不同的數(shù)據(jù)特性。如一篇文章中不同單詞出現(xiàn)的次數(shù)是不相同的。我們把上述的數(shù)據(jù)分布不均的特性稱之為數(shù)據(jù)傾斜。具有數(shù)據(jù)傾斜特性的數(shù)據(jù)在Spark框架中會(huì)導(dǎo)致數(shù)據(jù)在不同節(jié)點(diǎn)分布不均的問(wèn)題。這不僅僅會(huì)影響數(shù)據(jù)處理效率,嚴(yán)重情況下也會(huì)導(dǎo)致任務(wù)失敗。

針對(duì)上述問(wèn)題,本文提出根據(jù)不同的數(shù)據(jù)特性自適應(yīng)地對(duì)Spark代碼進(jìn)行優(yōu)化的思路,從而達(dá)到對(duì)Spark數(shù)據(jù)處理任務(wù)進(jìn)行調(diào)優(yōu)的目的。Spark系統(tǒng)本身已具有優(yōu)化不同數(shù)據(jù)集合特性的能力,但是程序員在編寫代碼時(shí)要了解所需要處理數(shù)據(jù)集的數(shù)據(jù)特性卻并不容易。對(duì)于需要進(jìn)行性能優(yōu)化的任務(wù),優(yōu)化的步驟如下:(1) 程序自動(dòng)分析代碼片段[4],生成有向無(wú)環(huán)圖(DAG);(2) 計(jì)算圖中數(shù)據(jù)的傾斜度;(3) 根據(jù)不同的數(shù)據(jù)特性和場(chǎng)景自動(dòng)選擇生成相應(yīng)的優(yōu)化方案。

1 相關(guān)工作

目前針對(duì)Spark任務(wù)的優(yōu)化研究的主要的思路有三點(diǎn)。一是針對(duì)數(shù)據(jù)對(duì)象的緩存進(jìn)行優(yōu)化,二是針對(duì)Spark任務(wù)的運(yùn)行參數(shù)進(jìn)行優(yōu)化,三是針對(duì)任務(wù)調(diào)度中的資源分配進(jìn)行優(yōu)化。

通過(guò)對(duì)Spark框架內(nèi)存計(jì)算模型的研究和分析,同時(shí)對(duì)Spark框架中內(nèi)存使用行為進(jìn)行建模,可以針對(duì)Spark的緩存系統(tǒng)實(shí)現(xiàn)不同程度的優(yōu)化和改進(jìn)。比如實(shí)現(xiàn)Spark系統(tǒng)的緩存策略自動(dòng)化,通過(guò)代碼語(yǔ)義分析自動(dòng)識(shí)別有緩存意義的中間數(shù)據(jù)加載到緩存系統(tǒng)中。比如根據(jù)RDD的大小和權(quán)重信息,提出新的緩存算法,優(yōu)化Spark系統(tǒng)的緩存模型等[5]。

Spark框架默認(rèn)的參數(shù)配置不能使得所有數(shù)據(jù)分析任務(wù)都能夠高效運(yùn)行。因此可以針對(duì)不同的數(shù)據(jù)分析任務(wù)進(jìn)行Spark框架的參數(shù)配置,可以優(yōu)化Spark任務(wù)的運(yùn)行效率。將Spark任務(wù)的運(yùn)行數(shù)據(jù)和參數(shù)配置保存到數(shù)據(jù)庫(kù)中,同時(shí)針對(duì)Spark任務(wù)進(jìn)行特征工程提取任務(wù)特征,最后通過(guò)計(jì)算任務(wù)之間的相似度從數(shù)據(jù)庫(kù)中選擇合適的參數(shù)配置對(duì)任務(wù)進(jìn)行優(yōu)化[6]。

Spark框架在運(yùn)行數(shù)據(jù)分析任務(wù)時(shí)需要結(jié)合當(dāng)前集群的資源和任務(wù)所需要的資源進(jìn)行動(dòng)態(tài)分配。通過(guò)完整的分析Spark框架中任務(wù)執(zhí)行過(guò)程以及資源調(diào)度分配的策略,可以根據(jù)任務(wù)運(yùn)行數(shù)據(jù)提出資源調(diào)度分配優(yōu)化模型,并針對(duì)調(diào)度資源優(yōu)化提出了系統(tǒng)的解決方案[7]。

同時(shí),目前已經(jīng)有了較多技術(shù)文章針對(duì)數(shù)據(jù)傾斜問(wèn)題提出了對(duì)應(yīng)的優(yōu)化策略。美團(tuán)公司在技術(shù)文檔中分享了如何利用Spark框架的調(diào)度機(jī)制處理具有數(shù)據(jù)傾斜特征的數(shù)據(jù)任務(wù),可以對(duì)Spark任務(wù)增加分區(qū)數(shù)量或者對(duì)數(shù)據(jù)進(jìn)行離散化操作,也可以利用廣播變量將小數(shù)據(jù)集合分布到各個(gè)計(jì)算節(jié)點(diǎn)中。

此外針對(duì)Spark官網(wǎng)及Cloudera提供的任務(wù)調(diào)優(yōu)方案上也有針對(duì)數(shù)據(jù)結(jié)構(gòu)序列化、資源調(diào)度,以及任務(wù)運(yùn)行時(shí)的參數(shù)設(shè)置的優(yōu)化建議。

為此本文提出了針對(duì)具有不同的數(shù)據(jù)特性的數(shù)據(jù)源,進(jìn)行代碼的自動(dòng)分析,嘗試解決因數(shù)據(jù)傾斜帶來(lái)的Spark任務(wù)運(yùn)行的效率問(wèn)題。

2 Spark數(shù)據(jù)傾斜問(wèn)題

2.1 數(shù)據(jù)傾斜

數(shù)據(jù)傾斜意味著數(shù)據(jù)集合中不同屬性的值出現(xiàn)的次數(shù)不是均勻分布的,在統(tǒng)計(jì)學(xué)中屬于數(shù)據(jù)分配不均的問(wèn)題。科學(xué)研究中的很多數(shù)據(jù)都是分布不均勻的。比如在天體物理學(xué)領(lǐng)域描述宇宙演化的數(shù)據(jù)集Millennium simulation,數(shù)據(jù)集中每個(gè)節(jié)點(diǎn)的質(zhì)量分布如圖1所示。超過(guò)75%的數(shù)值出現(xiàn)不超過(guò)十次,而出現(xiàn)頻率最高的7個(gè)數(shù)值每一個(gè)的出現(xiàn)次數(shù)都超過(guò)了2 000萬(wàn)次[8]。

圖1 數(shù)據(jù)集節(jié)點(diǎn)質(zhì)量分布圖

為了解釋數(shù)據(jù)傾斜對(duì)Spark任務(wù)產(chǎn)生的影響。我們首基于MapReduce的高效粗糙集屬性約簡(jiǎn)算法先以一個(gè)統(tǒng)計(jì)詞頻的例子介紹Spark框架的工作流程[9]。

在程序的map階段,對(duì)輸入的文章中的每一行執(zhí)行map函數(shù),并生成鍵值對(duì)。其中key代表在一行數(shù)據(jù)中出現(xiàn)的單詞,value則代表單詞出現(xiàn)的次數(shù)。在程序的reduce階段并行地對(duì)每一個(gè)鍵值對(duì)執(zhí)行reducer函數(shù),將相同key出現(xiàn)的次數(shù)相加。在數(shù)據(jù)分配到reduce函數(shù)之前,Spark程序會(huì)執(zhí)行shuffle操作,將具有相同key的鍵值對(duì)分配到同一個(gè)執(zhí)行reduce函數(shù)的節(jié)點(diǎn)上。

因?yàn)镾park具有shuffle機(jī)制,所以在數(shù)據(jù)傾斜的情況下,shuffle操作將位于不同節(jié)點(diǎn)中具有相同key的大量的數(shù)據(jù)拉到同一個(gè)節(jié)點(diǎn)中執(zhí)行reduce操作。而一個(gè)Spark 任務(wù)只能在一個(gè)partition中之行,所以某一些數(shù)據(jù)量異常巨大的key的任務(wù)運(yùn)行時(shí)間就會(huì)非常緩慢。

以Word Count出現(xiàn)數(shù)據(jù)傾斜為例,在map階段,每一篇文章都被劃分為每個(gè)獨(dú)立的單詞,從而發(fā)現(xiàn)文中大量的單詞都是hello,少部分的單詞是world。在Shuffle階段,我們需要將不同節(jié)點(diǎn)上具有相同key的鍵值對(duì)分配到相同的節(jié)點(diǎn)中。在Reduce階段,有大量的帶有單詞hello的鍵值對(duì)被分配到了同一臺(tái)機(jī)器上,而剩余的少量的world的key被分配到同一臺(tái)機(jī)器。兩個(gè)節(jié)點(diǎn)的機(jī)器配置和網(wǎng)絡(luò)帶寬都是相同的,但是其中一臺(tái)機(jī)器處理的數(shù)據(jù)量是另一臺(tái)機(jī)器的成百上千倍。整個(gè)任務(wù)的運(yùn)行瓶頸就在執(zhí)行reduce任務(wù)較多的節(jié)點(diǎn)上。當(dāng)數(shù)據(jù)量上升到TB、PB級(jí)別時(shí),就會(huì)出現(xiàn)運(yùn)行時(shí)間長(zhǎng)甚至內(nèi)存不足的情況。

2.2 現(xiàn)有處理方法及問(wèn)題

如何解決數(shù)據(jù)傾斜帶來(lái)的問(wèn)題,現(xiàn)有的方法是利用Spark本身的特性,緩解因?yàn)閿?shù)據(jù)傾斜導(dǎo)致的分區(qū)不均的問(wèn)題。根據(jù)3.1節(jié)的描述,數(shù)據(jù)傾斜的問(wèn)題是出現(xiàn)在Spark任務(wù)處理的shuffle階段,如果要處理數(shù)據(jù)傾斜的問(wèn)題,我們可以在shuffle階段進(jìn)行優(yōu)化。

優(yōu)化的目標(biāo)最終減少因?yàn)橹匦路峙鋽?shù)據(jù)導(dǎo)致的某一個(gè)Reduce節(jié)點(diǎn)中數(shù)據(jù)過(guò)多的問(wèn)題。處理方式有兩種思路,第一種是通過(guò)增加任務(wù)處理分區(qū)數(shù)或者是按照Key的維度對(duì)數(shù)據(jù)進(jìn)行離散化,嘗試從shuffle階段緩解數(shù)據(jù)傾斜的壓力。第二種是利用Spark的廣播變量的特性直接忽略shuffle階段,從根本上解決數(shù)據(jù)傾斜的問(wèn)題。具體的處理流程和分析在4.3節(jié)中介紹。

目前Spark將數(shù)據(jù)傾斜的優(yōu)化策略交給程序員中利用代碼手動(dòng)完成,這就經(jīng)常會(huì)導(dǎo)致程序運(yùn)行的效率低下甚至產(chǎn)生程序報(bào)錯(cuò)。主要原因有:

(1) 程序員對(duì)數(shù)據(jù)特性本身不敏感,沒(méi)有針對(duì)具有數(shù)據(jù)傾斜特性的Spark程序進(jìn)行優(yōu)化。

(2) 選擇錯(cuò)誤的優(yōu)化方案。錯(cuò)誤的優(yōu)化策略會(huì)占用系統(tǒng)的額外內(nèi)存和網(wǎng)絡(luò)帶寬。效果只會(huì)是適得其反,降低數(shù)據(jù)分析任務(wù)的性能。

隨著項(xiàng)目復(fù)雜度和代碼量的提高,優(yōu)化策略的問(wèn)題會(huì)變得越來(lái)越嚴(yán)重。如果可以使用自動(dòng)分析的方法,自動(dòng)根據(jù)數(shù)據(jù)的傾斜特性選擇相應(yīng)的代碼優(yōu)化策略,無(wú)疑會(huì)降低程序員的負(fù)擔(dān),避免上述的問(wèn)題。下面將對(duì)這方面進(jìn)行初步研究,通過(guò)分析與建模,目的對(duì)處理數(shù)據(jù)傾斜數(shù)據(jù)的spark任務(wù)進(jìn)行智能優(yōu)化,并加速任務(wù)的運(yùn)行速度。

3 自適應(yīng)數(shù)據(jù)傾斜優(yōu)化方案

3.1 數(shù)據(jù)傾斜度

為了更好地衡量數(shù)據(jù)的傾斜程度,本文對(duì)數(shù)據(jù)集合中的數(shù)據(jù)分布的均勻程度進(jìn)行了定義,提出了數(shù)據(jù)傾斜度的概念。數(shù)據(jù)傾斜度的計(jì)算借鑒了分類統(tǒng)計(jì)中的平均絕對(duì)偏差的概念,統(tǒng)計(jì)一個(gè)數(shù)據(jù)集合中每個(gè)Key出現(xiàn)的次數(shù),然后計(jì)算每個(gè)觀測(cè)值和算術(shù)平均值的偏差的絕對(duì)值的平均。同時(shí)為了對(duì)結(jié)果進(jìn)行正則和標(biāo)準(zhǔn)劃,我們引入了相對(duì)平均絕對(duì)偏差的計(jì)算方式,也就是用平均絕對(duì)偏差除以算數(shù)平均值。最后數(shù)據(jù)傾斜度的定義就取二分之一的相對(duì)平均絕對(duì)偏差[10]。如公式所示:

(1)

式中:G代表數(shù)據(jù)傾斜度,xi代表數(shù)據(jù)集合中每個(gè)key出現(xiàn)的次數(shù)。選取平均絕對(duì)偏差的作為計(jì)算數(shù)據(jù)傾斜度主要是考慮結(jié)果的通用性和高效性,在比較了多重分類統(tǒng)計(jì)中的度量之后,最終選擇了絕對(duì)偏差方案。數(shù)據(jù)傾斜度G的范圍在0~1之間,G越接近1表明數(shù)據(jù)的傾斜程度越大,G越接近0表明數(shù)據(jù)的分布平均。

3.2 有向無(wú)環(huán)圖

為了對(duì)Spark代碼進(jìn)行靜態(tài)分析,獲取程序運(yùn)行時(shí)中間數(shù)據(jù)的相互依賴,本文通過(guò)分析Spark程序運(yùn)行時(shí)的日志信息,和各個(gè)中間數(shù)據(jù)之間的相互依賴及各項(xiàng)操作生成了一個(gè)有向無(wú)環(huán)圖(DAG)。DAG代表了Spark代碼的真正計(jì)算路徑。

Spark任務(wù) DAG 圖上的每個(gè)節(jié)點(diǎn)表示一種 RDD類 型。在Spark代碼中,程序員在RDD上定義了一系列操作。這些操作可以是map接著reduce,也可以是一系列的map和reduce的集合,都會(huì)被Spark記錄下RDD之間的相互依賴,我們可以據(jù)此畫出一張關(guān)于計(jì)算路徑的有向無(wú)環(huán)圖(DAG)。用DAG圖可以從算法邏輯和數(shù)據(jù)規(guī)模大小兩個(gè)方面來(lái)準(zhǔn)確地刻畫任務(wù)的特征。

為了便于理解,下面列出了統(tǒng)計(jì)詞頻任務(wù)的Spark代碼。圖2是任務(wù)對(duì)應(yīng)的有向無(wú)環(huán)圖。圖的頂點(diǎn)(方框)表示系統(tǒng)中的數(shù)據(jù)類型RDD, 圖的邊代表不同操作之間的關(guān)系。圖中表示的過(guò)程是首先讀取文件,分別進(jìn)行flatMap和map操作,然后再將map的結(jié)果執(zhí)行reduceByKey的操作進(jìn)行聚合,最后將結(jié)果輸出到文件中。通過(guò)這樣的DAG已經(jīng)能夠清楚地刻畫任務(wù)執(zhí)行的基本流程[11]。

1. val text_file = spark.textFile(″source_path″);

2. val word_count=text_file.flatMap(lambda line: line.split())

.map(lambda word: (word, 1))

.reduceByKey(lambda a, b: a+b);

3. word_count.saveAsTextFile(′dest_path′)

圖2 統(tǒng)計(jì)詞頻任務(wù)的源代碼和有向無(wú)環(huán)圖

3.3 數(shù)據(jù)傾斜任務(wù)優(yōu)化

首先我們把數(shù)據(jù)傾斜的任務(wù)分為兩大類[12]:(1) 任務(wù)代碼中直接調(diào)用Spark的Map-Reduce方法;(2) 任務(wù)代碼中任務(wù)代碼中調(diào)用更抽象的rdd.join(),再有Spark的解釋器編編譯成具體的Map-Reduce 任務(wù)代碼。針對(duì)具體的任務(wù)分類,分別有不同的優(yōu)化方案。

針對(duì)普通的Map-Reduce的任務(wù),如果出現(xiàn)數(shù)據(jù)傾斜的情況,在執(zhí)行reduce的任務(wù)時(shí),不同節(jié)點(diǎn)執(zhí)行的數(shù)據(jù)量的不同,導(dǎo)致了數(shù)據(jù)任務(wù)的遲緩。解決方案通常有兩種思路。

1) 提高shuffle階段任務(wù)的最大并行度,即Spark框架中對(duì)于用戶設(shè)置的最大分區(qū),具體的參數(shù)名字是 spark.sql.shuffle.partitions。Spark框架對(duì)該值的默認(rèn)值是200,對(duì)于傾斜程度不同的數(shù)據(jù)處理任務(wù)需要?jiǎng)討B(tài)的進(jìn)行調(diào)整。如圖3所示,增加分區(qū)數(shù)字之后,每個(gè)reduce節(jié)點(diǎn)執(zhí)行的數(shù)據(jù)量變少,執(zhí)行速度也更快。

圖3 shuffle過(guò)程中增加分區(qū)數(shù)

2) 對(duì)shuffle階段的鍵值對(duì)進(jìn)行離散化操作。通過(guò)對(duì)數(shù)據(jù)進(jìn)行離散化,對(duì)數(shù)據(jù)先進(jìn)行聚合操作,然后重新對(duì)離散的數(shù)據(jù)進(jìn)行歸一化操作,最后在執(zhí)行一次聚合操作,從而有效緩解數(shù)據(jù)傾斜程度,加快數(shù)據(jù)任務(wù)處理效率。數(shù)據(jù)的離散化可以通過(guò)添加隨機(jī)前綴的方式,利用flatMap方法將一個(gè)key離散成多個(gè)key,分散到多個(gè)shuffle任務(wù)中執(zhí)行,從而解決單個(gè)處理數(shù)據(jù)量過(guò)多的問(wèn)題。見圖4。

圖4 shuffle過(guò)程中為key增加隨機(jī)后綴

針對(duì)RDD之間需要Join的任務(wù),如果出現(xiàn)數(shù)據(jù)傾斜的情況。也有如下兩種處理方法:

1) 如果其中一個(gè)RDD數(shù)據(jù)量較小,使用廣播變量方式減少shuffle階段的數(shù)據(jù)交換。Spark允許程序員在不同機(jī)器之間緩存一個(gè)只讀的變量,從而節(jié)省在不同的任務(wù)之間傳遞數(shù)據(jù)的消耗。這種變量被稱為廣播變量。廣播變量的優(yōu)勢(shì)包括,利用一種高效的方式在每個(gè)集群節(jié)點(diǎn)上緩存一個(gè)大量的數(shù)據(jù)集合。同時(shí),Spark也嘗試?yán)酶咝У膹V播算法來(lái)分布式的廣播變量,以期望降低數(shù)據(jù)交換的消耗,如圖5所示。

圖5 將RDD轉(zhuǎn)化為廣播變量,避免shuffle過(guò)程

2) 分拆RDD。根據(jù)每個(gè)KEY的傾斜程度,將RDD分拆為傾斜的和分布均勻的兩部分。可以將少數(shù)幾個(gè)KEY導(dǎo)致的數(shù)據(jù)傾斜分拆出去,然后進(jìn)行數(shù)據(jù)離散化操作,此時(shí)數(shù)據(jù)會(huì)分散到多個(gè)任務(wù)中執(zhí)行。數(shù)據(jù)聚合操作之后,再使用Union方法將分拆的兩個(gè)RDD進(jìn)行合并。如圖6所示。

圖6 根據(jù)key傾斜度將RDD分拆,reduce結(jié)束后再union

3.4 算法設(shè)計(jì)

針對(duì)Spark程序中經(jīng)常出現(xiàn)的數(shù)據(jù)傾斜導(dǎo)致的運(yùn)行效率的問(wèn)題,通過(guò)程序的智能分析,針對(duì)數(shù)據(jù)傾斜的不同應(yīng)用場(chǎng)景自動(dòng)地對(duì)代碼執(zhí)行優(yōu)化策略。針對(duì)策略的自動(dòng)化,算法的實(shí)現(xiàn)思路如下:

(1) 分析Spark代碼[13]。通過(guò)在Spark源碼中植入監(jiān)聽代碼,根據(jù)日志信息對(duì)數(shù)據(jù)結(jié)構(gòu)RDD和相應(yīng)的函數(shù)操作進(jìn)行建模,即可以得到當(dāng)前代碼的有向無(wú)環(huán)圖(DAG)。圖中的每一個(gè)點(diǎn)都代表一個(gè)RDD,圖中的每一個(gè)邊都代表RDD執(zhí)行的函數(shù)操作。

(2) 判斷RDD是否出現(xiàn)數(shù)據(jù)傾斜。方法是對(duì)DAG圖中的每一個(gè)點(diǎn),也就是RDD依次進(jìn)行采樣分析,根據(jù)數(shù)據(jù)集合大小和數(shù)據(jù)分布計(jì)算出數(shù)據(jù)傾斜度,如果數(shù)據(jù)傾斜度大于一定的閾值則被判斷為數(shù)據(jù)傾斜。

(3) 針對(duì)RDD的數(shù)據(jù)傾斜程度及RDD本身的數(shù)據(jù)場(chǎng)景,應(yīng)用不同的代碼優(yōu)化策略。

本方法需要對(duì)代碼執(zhí)行兩次。第一次是為了獲取處于不同階段的RDD,從中間分析出有數(shù)據(jù)傾斜的RDD。所以第一次運(yùn)行的時(shí)候,可以在代碼中對(duì)RDD進(jìn)行數(shù)據(jù)采樣,只運(yùn)行少量的數(shù)據(jù)集合,這樣在小數(shù)據(jù)集對(duì)代碼和RDD進(jìn)行分析,不會(huì)影響運(yùn)行的性能。代碼優(yōu)化后,再進(jìn)行代碼的第二次運(yùn)行。

方法的整體流程如圖7所示。

圖7 優(yōu)化算法整體架構(gòu)

下面是關(guān)于如何進(jìn)行代碼優(yōu)化的具體操作:

(1) 增加隨機(jī)后綴,并增加分區(qū)數(shù)。為數(shù)據(jù)集中分布不均勻的key分配一個(gè)隨機(jī)的后綴或者前綴,力圖將shuffle的key進(jìn)行離散化,使數(shù)據(jù)的分布更加均勻。待每個(gè)新的key聚合完成以后,把添加的前綴或者后綴去掉,恢復(fù)成原本的key, 再重新計(jì)算一個(gè)reduce操作。

(2) 生成廣播變量。針對(duì)兩個(gè)RDD join的情況,將其中一個(gè)略小的RDD轉(zhuǎn)化為broadcast對(duì)象,然后分發(fā)到執(zhí)行任務(wù)的分布式集群的每個(gè)node中。最后在RDD的flatMapToPair的函數(shù)中利用map完成RDD之間的聚合操作。

(3) 分拆傾斜RDD。兩個(gè)RDD Join,但是其中一個(gè)RDD存在數(shù)據(jù)傾斜的問(wèn)題,我們對(duì)有數(shù)據(jù)傾斜的RDD進(jìn)行隨機(jī)前綴操作。對(duì)另一個(gè)RDD進(jìn)行類似于數(shù)據(jù)膨脹的擴(kuò)容操作。然后和第一種模式的流程一樣,重新進(jìn)行Map-Reduce操作。

4 實(shí)驗(yàn)驗(yàn)證

4.1 實(shí)驗(yàn)環(huán)境

實(shí)驗(yàn)采用的云計(jì)算集群是亞馬遜公司的EMR(Elastic Map Reduce)集群。EMR是亞馬遜公司提供的彈性Map-Reduce服務(wù)[13],服務(wù)內(nèi)容包括用戶可以自動(dòng)配置分布式計(jì)算集群,集群上可以動(dòng)態(tài)部署Hadoop、Hive、Spark等分布式計(jì)算框架,也可以動(dòng)態(tài)部署Pig、Phoenix、Presto等分布式查詢引擎,減少了數(shù)據(jù)開發(fā)人員大量的配置分布式開發(fā)環(huán)境的時(shí)間成本。同時(shí),作為彈性分布式集群服務(wù),用戶可以按需求動(dòng)態(tài)地啟動(dòng)集群服務(wù),配置集群規(guī)模,提交數(shù)據(jù)分析任務(wù),改善了整個(gè)數(shù)據(jù)分析流程的效率。同時(shí),作為亞馬遜的云服務(wù),EMR可以使用亞馬遜提供的其他云服務(wù)組件配合使用,比如分布式文件存儲(chǔ)系統(tǒng)S3(Simple Storage Service)或者亞馬遜EC2(Elastic Computer Cloud)[14]等。綜上所述,EMR具有配置靈活,服務(wù)類型豐富,運(yùn)維成本低等優(yōu)勢(shì)。因?yàn)?,我們?cè)趯?shí)驗(yàn)中采用了亞馬遜的EMR云服務(wù)作實(shí)驗(yàn)環(huán)境。

我們基于亞馬遜的EMR服務(wù)分別進(jìn)行了三組實(shí)驗(yàn),使用的集群配置都是相同的。我們使用了20臺(tái)節(jié)點(diǎn)配置的集群。配置如下:機(jī)器類型亞馬遜EC2 m4.xlarge,4核CPU,16 GB內(nèi)存,帶寬450 Mbit/s。使用的軟件及其版本為:EMR 4.7,Spark 2.1,Hadoop 1.7。

4.2 實(shí)驗(yàn)設(shè)計(jì)

實(shí)驗(yàn)?zāi)康氖菫榱蓑?yàn)證根據(jù)數(shù)據(jù)場(chǎng)景提出的代碼優(yōu)化策略是否可以減少程序的運(yùn)行時(shí)間,優(yōu)化任務(wù)運(yùn)行效果。

為了保證數(shù)據(jù)集合和數(shù)據(jù)傾斜度的大小可控,實(shí)驗(yàn)采用了模擬數(shù)據(jù)。具體模擬方法是:針對(duì)每個(gè)數(shù)據(jù)集合的RDD大小以及數(shù)據(jù)傾斜度大小,采用擴(kuò)充或者采樣RDD的方式來(lái)控制數(shù)據(jù)集合大小,采用key分配隨機(jī)前綴或者key進(jìn)行歸一化操作來(lái)控制數(shù)據(jù)傾斜度的大小。最終,利用現(xiàn)有的數(shù)據(jù)集合,我們就能模擬出合適的數(shù)據(jù)集合用于任務(wù)優(yōu)化效率[16]的實(shí)驗(yàn)。

首先我們將任務(wù)優(yōu)化效率定義為可量化的指標(biāo)。通過(guò)對(duì)任務(wù)優(yōu)化效率的分析,驗(yàn)證對(duì)于相同的數(shù)據(jù)分析任務(wù),本文提出的方法是否可以自動(dòng)選擇數(shù)據(jù)場(chǎng)景并針對(duì)任務(wù)進(jìn)行了優(yōu)化,同時(shí)在運(yùn)行效率上確實(shí)達(dá)到了任務(wù)優(yōu)化的效果。

通過(guò)對(duì)比相同的Spark任務(wù)在算法優(yōu)化前后的任務(wù)運(yùn)行時(shí)間,我們對(duì)任務(wù)優(yōu)化效率提出了定義。值得注意的是,在進(jìn)行代碼優(yōu)化前后的運(yùn)行時(shí)間對(duì)照實(shí)驗(yàn)的時(shí)候,針對(duì)優(yōu)化后的程序運(yùn)行時(shí)間,我們應(yīng)當(dāng)將第一部分試驗(yàn)中進(jìn)行數(shù)據(jù)場(chǎng)景判斷的時(shí)間也計(jì)算在內(nèi)。也就是需要將數(shù)據(jù)場(chǎng)景判斷所花費(fèi)的時(shí)間和優(yōu)化后程序的運(yùn)行時(shí)間結(jié)合在一起才是優(yōu)化方法的真正運(yùn)行時(shí)間。關(guān)于任務(wù)優(yōu)化效率如公式所示:

(2)

式中:E代表任務(wù)優(yōu)化效率;ti表示優(yōu)化前的任務(wù)運(yùn)行時(shí)間;tj表示優(yōu)化后的任務(wù)運(yùn)行時(shí)間。針對(duì)相同的數(shù)據(jù)分析任務(wù),通過(guò)優(yōu)化前的任務(wù)運(yùn)行時(shí)間減去優(yōu)化后的運(yùn)行時(shí)間,即得到總體任務(wù)的優(yōu)化時(shí)間。再得到優(yōu)化時(shí)間和優(yōu)化前任務(wù)運(yùn)行時(shí)間的比值,即是任務(wù)的優(yōu)化效率。優(yōu)化效率越接近于1,則表示優(yōu)化效果越好。

針對(duì)任務(wù)優(yōu)化效率的實(shí)驗(yàn),本文設(shè)計(jì)了三種不同的任務(wù)優(yōu)化實(shí)驗(yàn)。

第一個(gè)實(shí)驗(yàn)是為了驗(yàn)證在不同的數(shù)據(jù)場(chǎng)景下本文方法都能夠達(dá)到一定的任務(wù)優(yōu)化效率,縮短任務(wù)的運(yùn)行時(shí)間。實(shí)驗(yàn)內(nèi)容是針對(duì)三種不同的數(shù)據(jù)傾斜場(chǎng)景分別是用未優(yōu)化的代碼和優(yōu)化后的代碼進(jìn)行數(shù)據(jù)分析,對(duì)比兩個(gè)任務(wù)的運(yùn)行時(shí)間,最后計(jì)算任務(wù)優(yōu)化效率。同時(shí),在實(shí)驗(yàn)過(guò)程中保證三種不同的場(chǎng)景下數(shù)據(jù)集合和數(shù)據(jù)傾斜度都保持相同。這里,數(shù)據(jù)集合的大小是10 GB,數(shù)據(jù)傾斜度是0.5。

第二個(gè)實(shí)驗(yàn)是為了驗(yàn)證在數(shù)據(jù)集合數(shù)據(jù)傾斜度相同的情況下,數(shù)據(jù)集合大小對(duì)任務(wù)運(yùn)行時(shí)間和任務(wù)優(yōu)化效率的影響。實(shí)驗(yàn)內(nèi)容是在數(shù)據(jù)分析任務(wù)中采用的數(shù)據(jù)集合是數(shù)據(jù)集合大小不同但是數(shù)據(jù)傾斜程相同的數(shù)據(jù),同樣對(duì)比的是代碼優(yōu)化前后的程序運(yùn)行時(shí)間。三組實(shí)驗(yàn)的數(shù)據(jù)集合大小都是100 GB。這里,所有數(shù)據(jù)集的數(shù)據(jù)傾斜度都是0.5,數(shù)據(jù)集合的大小從1 GB到1 TB不等。

第三個(gè)實(shí)驗(yàn)是為了驗(yàn)證在數(shù)據(jù)集合大小相同的情況下,數(shù)據(jù)傾斜程度對(duì)任務(wù)運(yùn)行時(shí)間和任務(wù)優(yōu)化效率的影響。實(shí)驗(yàn)內(nèi)容是在數(shù)據(jù)分析任務(wù)中采用的數(shù)據(jù)集合是數(shù)據(jù)傾斜程度不同但是數(shù)據(jù)集大小相同的數(shù)據(jù)。同樣對(duì)比的是代碼優(yōu)化前后的程序運(yùn)行時(shí)間。這里,數(shù)據(jù)集合的大小都是100 GB,數(shù)據(jù)集合的數(shù)據(jù)傾斜度從0.1到0.9不等。

4.3 實(shí)驗(yàn)結(jié)果

第一個(gè)實(shí)驗(yàn)中,針對(duì)三種不同的數(shù)據(jù)場(chǎng)景,優(yōu)化后的代碼運(yùn)行效率都要比優(yōu)化前的代碼片段有了不同程度的提升。實(shí)驗(yàn)結(jié)果如圖8所示,橫軸表示三種不同的數(shù)據(jù)場(chǎng)景,縱軸表示方法優(yōu)化前后程序的運(yùn)行時(shí)間,單位是分鐘。橫軸中每個(gè)數(shù)據(jù)場(chǎng)景對(duì)應(yīng)兩個(gè)柱狀圖,左邊直方圖代表優(yōu)化前的任務(wù)運(yùn)行時(shí)間,中間的直方圖表示分析數(shù)據(jù)場(chǎng)景的時(shí)間,右側(cè)的直方圖表示優(yōu)化后的任務(wù)運(yùn)行時(shí)間。根據(jù)優(yōu)化前后的任務(wù)運(yùn)行時(shí)間以及任務(wù)運(yùn)行效率的定義,可以計(jì)算出隨機(jī)前綴、廣播變量和分拆RDD的方法的優(yōu)化效率分別提升了40%、47%和58%。從而證明本文方法對(duì)Spark數(shù)據(jù)傾斜任務(wù)具有一定的優(yōu)化效果。同時(shí),數(shù)據(jù)場(chǎng)景分析只占總體運(yùn)行時(shí)間的小部分。因此可以得出結(jié)論,優(yōu)化方法的前期數(shù)據(jù)采樣工作對(duì)優(yōu)化算法的運(yùn)行效率沒(méi)有影響。

圖8 不同數(shù)據(jù)場(chǎng)景下代碼優(yōu)化后運(yùn)行效率有了大幅度提升

第二個(gè)實(shí)驗(yàn),針對(duì)相同的數(shù)據(jù)傾斜度,在數(shù)據(jù)集合大小不同的情況下,對(duì)比本文方法的優(yōu)化效率。其中圖9是實(shí)驗(yàn)結(jié)果。其中X軸表示的是數(shù)據(jù)集合的大小,Y軸表示優(yōu)化前后代碼的優(yōu)化效率。實(shí)驗(yàn)二的所有數(shù)據(jù)集的數(shù)據(jù)傾斜度都是0.5。實(shí)驗(yàn)結(jié)果表明,在數(shù)據(jù)傾斜度大小相等或者近似相等的情況下。優(yōu)化后的代碼相比優(yōu)化前的代碼,運(yùn)行效率有了明顯提升。同時(shí),隨著數(shù)據(jù)大小的增加,當(dāng)數(shù)據(jù)集合達(dá)到100 GB以后,優(yōu)化效率曲線開始保持平穩(wěn)。

圖9 實(shí)驗(yàn)結(jié)果

第三個(gè)實(shí)驗(yàn)是采用數(shù)據(jù)集大小相同但是數(shù)據(jù)傾斜程度不同的數(shù)據(jù),同樣對(duì)比的是優(yōu)化前后的代碼效率。三組實(shí)驗(yàn)的數(shù)據(jù)集合大小都是100 GB。圖10是對(duì)應(yīng)的結(jié)果。其中X軸表示的是數(shù)據(jù)傾斜度的變化,Y軸表示優(yōu)化前后代碼的優(yōu)化效率。實(shí)驗(yàn)結(jié)果表明,在數(shù)據(jù)集合大小相等或者近似相等的情況下。優(yōu)化后的代碼相比較優(yōu)化前的代碼,運(yùn)行效率有了大幅度提升。同時(shí),隨著數(shù)據(jù)傾斜度的增加,優(yōu)化效率一直在不斷提高。

圖10 隨著數(shù)據(jù)傾斜程度增加,算法優(yōu)化效率顯著提升

5 結(jié) 語(yǔ)

場(chǎng)景選擇代碼優(yōu)化策略。實(shí)驗(yàn)結(jié)果表明,這種調(diào)優(yōu)方法具有一定的可行性。在接下來(lái)的研究中我們可以獲取更多Spark運(yùn)行時(shí)的數(shù)據(jù),從調(diào)度策略和內(nèi)存資源分配等不同的方向豐富數(shù)據(jù)模型,進(jìn)一步提高算法的優(yōu)化性能。

[1] Dean J,Ghemawat S.MapReduce:Simplified Data Processing on Large Clusters[C]//Conference on Symposium on Opearting Systems Design & Implementation.DBLP,2004:137-150.

[2] 林海銘.基于Hadoop MapReduce的大規(guī)模線性有限元法并行實(shí)現(xiàn)[J].計(jì)算機(jī)應(yīng)用與軟件,2017,34(3):21-26.

[3] 倪麗萍,馬馳宇,劉小軍.社會(huì)化信息對(duì)股市波動(dòng)影響分析—基于SparkR平臺(tái)的實(shí)現(xiàn)[J].計(jì)算機(jī)應(yīng)用與軟件,2017,34(3):181-188,266.

[4] 余雙雙,曾一,劉慧君,等.基于UML模型的多態(tài)性與Java接口代碼信息一致性檢測(cè)的方法[J].計(jì)算機(jī)應(yīng)用與軟件,2017,34(2):8-13,47.

[5] 陳康,王彬,馮琳.Spark計(jì)算引擎的數(shù)據(jù)對(duì)象緩存優(yōu)化研究[J].中興通訊技術(shù),2016,22(2):23-27.

[6] 陳僑安,李峰,曹越,等.基于運(yùn)行數(shù)據(jù)分析的Spark任務(wù)參數(shù)優(yōu)化[J].計(jì)算機(jī)工程與科學(xué),2016,38(1):11-19.

[7] 韓海雯.MapReduce計(jì)算任務(wù)調(diào)度的資源配置優(yōu)化研究[D].華南理工大學(xué),2013.

[8] Gufler B,Augsten N,Reiser A,et al.Handling Data Skew in MapReduce[C]//Closer 2011-Proceedings of the,International Conference on Cloud Computing and Services Science,Noordwijkerhout,Netherlands,7-9 May.DBLP,2011:574-583.

[9] 李成,許胤龍,郭帆,等.基于MapReduce的內(nèi)存并行Join算法研究[J].計(jì)算機(jī)應(yīng)用與軟件,2016,33(7):257-260,277.

[10] 裔傳俊,劉亮.采用邊緣分類和平均偏差比較的分形圖像編碼[J].計(jì)算機(jī)應(yīng)用與軟件,2015,32(2):211-214.

[11] 陳龍,蘇厚勤.BPEL文檔基于DAG自動(dòng)生成框架的研究與實(shí)現(xiàn)[J].計(jì)算機(jī)應(yīng)用與軟件,2016,33(5):87-89,143.

[12] 李濤,劉斌.Spark平臺(tái)下的高效Web文本分類系統(tǒng)的研究[J].計(jì)算機(jī)應(yīng)用與軟件,2016,33(11):33-36.

[13] 黃賽杰,陳銘松,金乃詠.一種基于約束求解的Verilog語(yǔ)言靜態(tài)分析方法[J].計(jì)算機(jī)應(yīng)用與軟件,2015,32(12):1-3,87.

[14] Keshavjee K,Bosomworth J,Copen J,et al.Best Practices in EMR Implementation:A Systematic Review[J].AMIA.Annual Symposium proceedings/AMIA Symposium.AMIA Symposium,2006,2006(3):982.

[15] Simson L.Garfinkel.An Evaluation of Amazon’s Grid Computing Services:EC2,S3 and SQS[C]//Center for.2007.

[16] 程慧敏,李學(xué)俊,吳洋,等.云環(huán)境下基于多目標(biāo)優(yōu)化的科學(xué)工作流數(shù)據(jù)布局策略[J].計(jì)算機(jī)應(yīng)用與軟件,2017,34(3):1-6.

猜你喜歡
傾斜度代碼節(jié)點(diǎn)
盆底超聲量化膀胱頸移動(dòng)度與尿道傾斜度對(duì)女性壓力性尿失禁的診斷價(jià)值分析
牙齒近遠(yuǎn)中傾斜度的影像學(xué)評(píng)價(jià)方法進(jìn)展
概念格的一種并行構(gòu)造算法
結(jié)合概率路由的機(jī)會(huì)網(wǎng)絡(luò)自私節(jié)點(diǎn)檢測(cè)算法
采用貪婪啟發(fā)式的異構(gòu)WSNs 部分覆蓋算法*
Crosstalk between gut microbiota and antidiabetic drug action
創(chuàng)世代碼
創(chuàng)世代碼
創(chuàng)世代碼
創(chuàng)世代碼