李俊麗
(晉中學(xué)院 計算機科學(xué)與技術(shù)系,山西 晉中 030619)
隨著大數(shù)據(jù)時代的到來,數(shù)據(jù)量以驚人的速度增長。大數(shù)據(jù)應(yīng)用的出現(xiàn)給數(shù)據(jù)處理帶來了巨大的挑戰(zhàn)[1,2],越來越多的高效并行計算平臺,如MapReduce[3]和Spark[4-6],被廣泛采用來處理大數(shù)據(jù)?;バ畔⑹菍蓚€隨機變量之間共享的信息量的度量?;バ畔⒌挠嬎懔亢艽螅貏e對于處理大規(guī)模的類別數(shù)據(jù)。互信息可以廣泛應(yīng)用于數(shù)據(jù)挖掘[7,8]算法中。為了提高互信息計算的效率,Spark內(nèi)存計算模型是最好的選擇,但要面對Spark數(shù)據(jù)傾斜的性能優(yōu)化問題。針對Spark中的數(shù)據(jù)傾斜問題,近年來提出了很多算法和模型。例如,文獻[9]提出了Spark平臺上基于特征分組的并行離群挖掘算法。SCID算法[10]設(shè)計了一種Pond-sampling算法來收集數(shù)據(jù)分布信息,并對總體數(shù)據(jù)分布進行估計。在數(shù)據(jù)劃分過程中,SCID實現(xiàn)了Bin-packing算法對Map任務(wù)的輸出進行桶狀處理。此外,在分區(qū)過程中,還會進一步切割大型分區(qū)。SP-Partitioner算法[11]將到達的批次數(shù)據(jù)作為候選樣本,在系統(tǒng)抽樣的基礎(chǔ)上選擇樣本,預(yù)測中間數(shù)據(jù)的特征。該方法根據(jù)預(yù)測結(jié)果生成參考表,指導(dǎo)下一批數(shù)據(jù)的均勻分布。文獻[12]優(yōu)化了笛卡爾(笛卡兒積)算子。由于計算笛卡爾積需要連接操作,因此可能會出現(xiàn)數(shù)據(jù)傾斜。文獻[13]提出了SASM(Spark adaptive skew mitigation),通過將大分區(qū)遷移到其它節(jié)點,同時平衡各任務(wù)之間的大小,來緩解數(shù)據(jù)傾斜問題。與這些現(xiàn)有的方法不同,DVP算法針對文獻[9]中并行互信息計算中出現(xiàn)的數(shù)據(jù)傾斜問題進行研究和改進。DVP算法探索了數(shù)據(jù)虛擬劃分,其中虛擬前綴附加在一個大分區(qū)中的所有鍵之前,然后是一個輔助散列。DVP中的虛擬分區(qū)確保消除了大分區(qū)。DVP算法是在Spark計算平臺上設(shè)計并實現(xiàn)的一種數(shù)據(jù)虛擬劃分的方案,主要針對數(shù)據(jù)傾斜情況下大規(guī)模類別數(shù)據(jù)的互信息并行計算,解決了數(shù)據(jù)分布不均勻?qū)е碌臄?shù)據(jù)傾斜問題。
互信息是信息論中對兩個隨機變量關(guān)聯(lián)程度的統(tǒng)計描述,可以表示為這兩個隨機變量概率的函數(shù)。
假設(shè)DS是一個包含n個對象的數(shù)據(jù)集,每個對象都由m個特征表示。我們使用H(yi,yj)和MI(yi;yj)分別表示集合DS上計算的特征yi和yj之間的熵和互信息。熵可以表示如下
(1)
其中,Pij(yi=vik∧yj=vjl)為特征yi和yj分別等于vik和vjl的概率。式(1)中di和dj為特征yi和yj的取值個數(shù);vik和vjl可以在集合D(yi)和D(yj)中找到,其中D(yi)={vi1,…,>vidi},D(yj)={vj1,…,>vjdj}。熵H(yi,>yj)是概率Pij和logPij的乘積的函數(shù)。
MI(yi;yj)作為特征yi和yj之間的互信息。我們將互信息MI(yi;yj)表示為
(2)
其中,概率Pij,特征yi和yj,值vik和vjl,域di和dj,集合D(yi)和D(yj)與式(1)中的相同,Pi和Pj分別為特征yi和yj等于vik和vjl的概率。
互信息可以廣泛應(yīng)用于數(shù)據(jù)挖掘算法中,DVP算法中的互信息是作為度量指標來量化類別數(shù)據(jù)特征之間的相似性。
在Spark Shuffle階段,Spark必須將相同的鍵從每個節(jié)點拉到節(jié)點上的任務(wù)中。這樣的過程可能會給單個節(jié)點帶來沉重的負載。此時,如果某個鍵對應(yīng)的數(shù)據(jù)量特別大,就會出現(xiàn)傾斜。
圖1描述了分區(qū)2總體上比分區(qū)1和分區(qū)3大。由于輸入數(shù)據(jù)分布不均勻,使用系統(tǒng)的默認哈希分區(qū)可能導(dǎo)致子RDD中每個分區(qū)的大小存在較大差異,從而導(dǎo)致數(shù)據(jù)傾斜。當遇到數(shù)據(jù)傾斜問題時,整個Spark作業(yè)的執(zhí)行時間由運行時間最長的任務(wù)控制,這使得Spark作業(yè)運行得相當慢。在最壞的情況下,由于最慢的任務(wù)處理了過多的數(shù)據(jù),Spark作業(yè)可能耗盡內(nèi)存。
圖1 Spark Shuffle數(shù)據(jù)分布
接下來建立了一個數(shù)據(jù)傾斜模型來量化由Spark創(chuàng)建的分區(qū)之間的數(shù)據(jù)傾斜度引起的問題。
圖1描述了Spark集群中默認的哈希分布機制,該機制執(zhí)行以下3個步驟。首先,Map任務(wù)檢索輸入數(shù)據(jù)。然后,這些數(shù)據(jù)由Map任務(wù)處理,Map任務(wù)生成以鍵值對格式組織的中間結(jié)果。最后,使用鍵將中間結(jié)果分組到分區(qū)中。最后一步中的一個障礙是,由于數(shù)據(jù)傾斜,這些分區(qū)的大小不均勻。
假設(shè)根據(jù)鍵值聚合數(shù)據(jù)時有p個唯一的鍵,我們設(shè)K表示鍵,K={k1,>…,>kp}。我們把V表示為集合k中所有鍵的值
(3)
假設(shè)有p個分區(qū),每個分區(qū)中的值共享一個鍵。值得注意的是,所有分區(qū)的大小可能不同。例如,第i和第j個分區(qū)的大小分別為li和lj。這兩個分區(qū)的大小可能不同(即li≠lj)。
現(xiàn)在使用域dom(ki)的大小來度量鍵ki的第i個分區(qū)的大小,它的形式是|dom(ki)|。平均分區(qū)大小由|dom(K)|avg表示,|dom(K) |avg由平均域大小來衡量,具體如下表示
(4)
分區(qū)之間的數(shù)據(jù)傾斜度定義為分區(qū)大小的偏差(即,|dom(ki)|)。設(shè)(ki)為第i個分區(qū)或域dom(ki)的傾斜度。在形式上,域dom(ki)的傾斜度s(ki)如下表示
(5)
聚合操作符是Spark Shuffle階段的性能瓶頸。并行計算互信息的一個關(guān)鍵挑戰(zhàn)在于countByKey或reduceByKey操作符(參見算法1第(4)行和第(12)行),它引入了包含兩個階段的shuffle。在shuffle過程中,第一階段執(zhí)行shuffle write操作分區(qū)數(shù)據(jù)。具有相同鍵的已處理數(shù)據(jù)被寫入相同的磁盤文件。
一旦countByKey或reduceByKey操作符執(zhí)行,第二階段中的每個任務(wù)都會執(zhí)行shuffle read操作。執(zhí)行此操作的任務(wù)提取屬于前一階段任務(wù)節(jié)點的鍵,然后對同一鍵執(zhí)行全局聚合或連接操作。在這個場景中,鍵值被累積。如果數(shù)據(jù)分布不均勻,就會發(fā)生數(shù)據(jù)傾斜。
數(shù)據(jù)虛擬劃分是一種針對shuffle操作(例如,reduceByKey)可能引起數(shù)據(jù)傾斜而進行的虛擬分區(qū)機制。為了減少shuffle操作中的數(shù)據(jù)傾斜,DVP算法只在統(tǒng)計單個特征的取值時進行虛擬分區(qū),因為特征對的取值不容易發(fā)生數(shù)據(jù)傾斜。圖2描述了虛擬分區(qū)的過程。
圖2 虛擬分區(qū)過程
在這里,首先為RDD中的每個鍵添加一個隨機前綴,然后是reduceByKey聚合操作。通過向同一個鍵添加隨機前綴并將其更改為幾個不同的鍵,一個任務(wù)最初處理的數(shù)據(jù)被分散到多個任務(wù)中,以便進行本地聚合。這種虛擬分區(qū)的策略減少了單個任務(wù)處理的過量數(shù)據(jù)。刪除每個鍵的前綴后,再次執(zhí)行全局聚合操作以獲得最終結(jié)果。
DVP算法主要由以下基本步驟完成:首先,使用關(guān)鍵字val定義一個可變長數(shù)組doubleCol用于存放特征對的計算結(jié)果。其次,使用map映射操作將RDD數(shù)據(jù)datapre轉(zhuǎn)換為鍵值對的形式,即pair((x(m);x(n));1)。值得注意的是,((x(m);x(n))是特征對m和n的取值;1表示特征對的值出現(xiàn)一次,并且記錄每一對特征對取值的整體出現(xiàn)情況。然后,使用關(guān)鍵字val定義另一個可變長數(shù)組singleCol用于存放單個特征的計算結(jié)果,由于在計算單特征值時容易出現(xiàn)數(shù)據(jù)傾斜,為了緩解數(shù)據(jù)傾斜問題,最后需要對單個特征的取值進行數(shù)據(jù)虛擬劃分。
具體算法如下:
算法1:DVP算法
輸入:數(shù)據(jù)集DS(nobjects ×mfeatures),由數(shù)據(jù)集生成的名為 datapre的RDD
輸出:兩個變長數(shù)組
(1) val doubleCol = new Array [ArrayBuffer [Map [(String, String), Long] ] ](dimension) //關(guān)鍵字val定義了一個可變長度數(shù)組,即doubleCol
(2) for (m= 0;m≤dimension;m++)
(3) for (n= 0;n≤dimension;n++)
(4) doubleCol(m)(n)+ = datapre:map(x≥((x(m);x(n)); 1))>.countByKey()>.toMap //將RDD數(shù)據(jù)的datapre轉(zhuǎn)換為pair ((x(m); >x(n));>1)使用映射轉(zhuǎn)換,并計算特征對取值的整體出現(xiàn)情況
(5) end for
(6) end for
(7)val singleCol = ArrayBuffer[Map[String,Long]]() //關(guān)鍵字val定義了一個可變長度數(shù)組,即singleCol
(8) for (k= 0;k≤dimension;k++)
(9) singleCol+= datapre.map
(10) val random:Random = new Random()
(11) val prefix:Int = random.nextInt(10)
(12) prefix+-+x(k),1)).reduceByKey(-+-).map (line ≥(line.-1.split("-")(1), line.-2)).reduceByKey (-+-).collectAsMap().toMap // 數(shù)據(jù)虛擬劃分
(13) end for
DVP算法在一個配備了24個節(jié)點的Spark集群中實現(xiàn)并驗證,每個節(jié)點都有一個Intel處理器(即,E5-1620 v2系列3.7 GHz),4芯16 GB RAM。主節(jié)點硬盤配置為500 GB;其它節(jié)點的磁盤容量是2 TB。集群中的所有數(shù)據(jù)節(jié)點都通過千兆以太網(wǎng)連接;使用SSH協(xié)議保證節(jié)點之間的通信。我們在Spark的standalone模式下實現(xiàn)了DVP算法。
在DVP實現(xiàn)中使用的編程語言是Scala,這是一種在Java虛擬機(JVM)上運行的函數(shù)式面向?qū)ο笳Z言。Scala無縫集成了現(xiàn)有的Java程序。利用集成開發(fā)環(huán)境IntelliJ IDEA開發(fā)了DVP算法。
表1中列出了DVP算法中所用到的Spark集群的配置情況。
表1 Spark集群中的軟件配置
DVP使用人工合成類別數(shù)據(jù)集來進行性能評估。為了評估DVP算法,構(gòu)造了兩種類型的合成數(shù)據(jù)集:均勻分布數(shù)據(jù)集和正態(tài)分布數(shù)據(jù)集。通過以下兩個步驟生成數(shù)據(jù)集。首先,創(chuàng)建一個相對較小的類別屬性數(shù)據(jù)集。接下來,不斷復(fù)制第一步中創(chuàng)建的數(shù)據(jù)集,以擴大數(shù)據(jù)集的大小。合成數(shù)據(jù)集包含100個特征,這些數(shù)據(jù)集的大小分別為8 GB、16 GB、24 GB和32 GB。數(shù)據(jù)集見表2。
表2 人工合成數(shù)據(jù)集
DEFH是使用最廣泛的哈希算法,是Spark中的一種默認機制。當鍵值呈現(xiàn)均勻分布時,可以獲得較好的性能。
(1)不同數(shù)據(jù)大小下的執(zhí)行時間:圖3為DVP和DEFH算法處理不同數(shù)據(jù)大小的均勻分布數(shù)據(jù)和正態(tài)分布數(shù)據(jù)所使用的運行時間。分別將數(shù)據(jù)大小設(shè)置為8 GB、16 GB、24 GB和32 GB。計算節(jié)點的數(shù)量配置為24個。
圖3 不同數(shù)據(jù)大小下均勻分布和正態(tài) 分布數(shù)據(jù)的執(zhí)行時間
由圖3(a)可以看出,在分布均勻的數(shù)據(jù)集中,由于虛擬分區(qū)的副作用,DVP算法的運行時間比DEFH稍長。然而,從圖3(b)可以看出,對于正態(tài)分布的數(shù)據(jù)集,DVP算法優(yōu)于DEFH。這是預(yù)期的結(jié)果,因為正態(tài)分布數(shù)據(jù)集包含了分布不均勻的數(shù)據(jù),導(dǎo)致了數(shù)據(jù)傾斜,從而耗費了時間。而由虛擬分區(qū)支持的DVP算法可以很好地處理傾斜數(shù)據(jù)。
另外,從圖3(a)和圖3(b)還可以看出,增加數(shù)據(jù)量會導(dǎo)致所有算法的運行時間增加。直觀地說,這是因為處理大規(guī)模數(shù)據(jù)需要更長的時間。
(2)不同計算節(jié)點下的執(zhí)行時間:圖4展示了DVP和DEFH算法在不同數(shù)量的計算節(jié)點上處理均勻分布數(shù)據(jù)和正態(tài)分布數(shù)據(jù)所使用的時間。節(jié)點的數(shù)量分別配置為4、8、16和24。數(shù)據(jù)大小設(shè)置為8 GB。圖4(a)顯示,由于虛擬分區(qū)的開銷,我們的DVP算法在均勻分布數(shù)據(jù)中的運行時間要比DEFH的運行時間長。這一趨勢與圖3(a)所示一致。圖4(b)顯示,在不均勻分布的情況下,正態(tài)分布數(shù)據(jù)集中DVP算法的性能要明顯優(yōu)于DEFH。DVP在DEFH上的性能改進歸功于數(shù)據(jù)虛擬劃分,它有效地緩解了數(shù)據(jù)的傾斜。同樣,這些結(jié)果和圖3(b)所描述是一致的。
圖4 不同數(shù)量節(jié)點下均勻分布和正態(tài) 分布數(shù)據(jù)的執(zhí)行時間
另外,從圖4(a)和圖4(b)還可以看出,隨著計算節(jié)點數(shù)量的不斷增加,兩個算法的運行時間都有所減少。這主要是因為集群計算能力的不斷增加。
(3)數(shù)據(jù)傾斜度的影響:由于均勻分布數(shù)據(jù)集中不會發(fā)生數(shù)據(jù)傾斜,因此選擇正態(tài)分布數(shù)據(jù)集進行實驗。從處理時間的角度對數(shù)據(jù)傾斜度的影響進行了評價。
圖5顯示了不同數(shù)據(jù)傾斜度下DVP和DEFH算法的處理時間,傾斜度從1到3不等,增量為0.5。我們觀察到,DVP算法的處理時間對數(shù)據(jù)傾斜度的敏感性小于DEFH。例如,當我們將傾斜度從1.5提高到3時,DVP和DEFH算法的處理時間分別增加了7.2%和28.4%。實驗結(jié)果表明,DVP算法利用數(shù)據(jù)虛擬劃分有效地緩解了數(shù)據(jù)傾斜帶來的性能問題。因此,在不平衡數(shù)據(jù)集中,DVP算法優(yōu)于DEFH。在較高的數(shù)據(jù)傾斜度下,DVP算法對數(shù)據(jù)傾斜的改善更為顯著。
圖5 數(shù)據(jù)傾斜度對DVP和DEFH處理時間的影響
(4)Shuffling-Cost分析:通過改變計算節(jié)點的數(shù)量來比較DVP和DEFH算法的Shuffling-Cost成本。以節(jié)點的shuffle-write-size作為對算法的Shuffling-Cost進行監(jiān)控。以正態(tài)分布數(shù)據(jù)集(8 G)為例,圖6是兩個算法Shuffling-Cost對比。
圖6 不同數(shù)量計算節(jié)點上的DVP和DEFH的 Shuffling-Cost
顯而易見,所有測試用例中DVP算法的shuffle-write-size都明顯小于DEFH。更重要的是,隨著計算節(jié)點數(shù)量的不斷增加,兩種解決方案之間的shuffle-write-size差距也在擴大。DEFH依賴于Spark的默認哈希分區(qū),導(dǎo)致任務(wù)頻繁地跨多個節(jié)點訪問數(shù)據(jù)。例如,當節(jié)點數(shù)量從4個更改為24個時,DEFH算法的shuffle-write-size從667.0 MB上升到1590.0 MB。與DEFH算法不同的是,DVP算法利用了數(shù)據(jù)虛擬劃分技術(shù)來減少Spark環(huán)境中的數(shù)據(jù)傳輸量。因此,DVP算法的shuffle-write-size僅僅從260.0 MB跳到628.0 MB。就shuffling-cost而言,這比DEFH的情況要好得多。
(5)可擴展性分析:在這組實驗中,通過增加計算節(jié)點的數(shù)量(分別設(shè)置為4、8、16和24)和調(diào)節(jié)數(shù)據(jù)集大小(配置分別為8 GB、16 GB、24 GB和32 GB),對DVP算法進行可擴展性分析,評估DVP算法在集群系統(tǒng)中處理大規(guī)模數(shù)據(jù)的能力。
圖7(a)顯示了Spark集群中節(jié)點數(shù)量對并行互信息計算時間的影響。由圖7(a)可以看出,隨著計算節(jié)點數(shù)量的增加,DVP算法的執(zhí)行時間明顯減少。大數(shù)據(jù)集(如32 GB)的下降趨勢非常明顯。當數(shù)據(jù)集很小(例如4 GB)時,集群擴展性能提高很微弱。結(jié)果表明,DVP是一種對大數(shù)據(jù)集具有高擴展性的并行計算方法。
圖7 DVP的可擴展性分析
圖7(b)展示了計算節(jié)點數(shù)量對系統(tǒng)加速的影響。從圖7(b)可以看出,對于大多數(shù)數(shù)據(jù)集來說,DVP的加速率接近線性。例如,在32 GB的情況下,DVP的加速性能幾乎與線性性能相當。結(jié)果表明,我們的并行計算算法能夠保持大規(guī)模高維類別數(shù)據(jù)集的計算性能。
上述DVP的高可擴展性主要歸功于以下幾個因素。首先,并行互信息計算的時間在很大程度上取決于任意兩個特征之間的互信息計算,這種互信息計算時間與分配給節(jié)點的數(shù)據(jù)對象數(shù)量成正比。其次,所有計算節(jié)點都獨立地并行計算。最后,由于數(shù)據(jù)虛擬劃分,DVP在所有節(jié)點之間保持了良好的負載平衡性能。
本文基于Spark平臺開發(fā)了DVP算法,在大規(guī)模類別數(shù)據(jù)的背景下并行計算互信息。DVP的核心是數(shù)據(jù)虛擬分區(qū)方案。更具體地說,虛擬分區(qū)技術(shù)緩解了shuffle過程中出現(xiàn)的數(shù)據(jù)傾斜問題。最后在一個24節(jié)點驅(qū)動的Spark集群上采用人工合成類別數(shù)據(jù)集驗證了DVP算法。
大量的實驗結(jié)果表明,該算法在效率和負載均衡等方面優(yōu)于Spark集群默認的DEFH算法。此外, 在Spark處理大型類別數(shù)據(jù)集時,DVP能夠很好地減輕數(shù)據(jù)傾斜,從而優(yōu)化網(wǎng)絡(luò)性能。
在未來的工作中,將重點從內(nèi)存資源的角度優(yōu)化shuffle性能。當數(shù)據(jù)分布變得不均勻時,分配給分區(qū)的數(shù)據(jù)量就不平衡。因此,任務(wù)所需的內(nèi)存空間本質(zhì)上是不同的。如果給每個任務(wù)分配固定比例的內(nèi)存空間,任務(wù)中頻繁的內(nèi)存溢出將是不可避免的。這樣的內(nèi)存資源問題會對Spark的整體性能產(chǎn)生負面影響。打算研究一種分配內(nèi)存資源的方法,以進一步優(yōu)化所有任務(wù)之間的shuffle進程。該技術(shù)有望從內(nèi)存計算的角度提高Spark應(yīng)用程序的性能。