王 剛,李盛恩
(山東建筑大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,山東濟(jì)南 250101)
MapReduce中數(shù)據(jù)傾斜解決方法的研究
王 剛,李盛恩
(山東建筑大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,山東濟(jì)南 250101)
隨著移動(dòng)互聯(lián)網(wǎng)和物聯(lián)網(wǎng)的飛速發(fā)展,數(shù)據(jù)規(guī)模呈爆炸性增長(zhǎng)態(tài)勢(shì),人們已經(jīng)進(jìn)入大數(shù)據(jù)時(shí)代。MapReduce是一種分布式計(jì)算框架,具備海量數(shù)據(jù)處理的能力,已成為大數(shù)據(jù)領(lǐng)域研究的熱點(diǎn)。但是MapReduce的性能嚴(yán)重依賴于數(shù)據(jù)的分布,當(dāng)數(shù)據(jù)存在傾斜時(shí),MapReduce默認(rèn)的Hash劃分無(wú)法保證Reduce階段節(jié)點(diǎn)負(fù)載平衡,負(fù)載重的節(jié)點(diǎn)會(huì)影響作業(yè)的最終完成時(shí)間。為解決這一問(wèn)題,利用了抽樣的方法。在用戶作業(yè)執(zhí)行前運(yùn)行一個(gè)MapReduce作業(yè)進(jìn)行并行抽樣,抽樣獲得key的頻次分布后結(jié)合數(shù)據(jù)本地性實(shí)現(xiàn)負(fù)載均衡的數(shù)據(jù)分配策略。搭建了實(shí)驗(yàn)平臺(tái),在實(shí)驗(yàn)平臺(tái)上測(cè)試WordCount實(shí)例。實(shí)驗(yàn)結(jié)果表明,采用抽樣方法實(shí)現(xiàn)的數(shù)據(jù)劃分策略性能要優(yōu)于MapReduce默認(rèn)的哈希劃分方法,結(jié)合了數(shù)據(jù)本地性的抽樣劃分方法的效果要優(yōu)于沒(méi)有考慮數(shù)據(jù)本地性的抽樣劃分方法。
大數(shù)據(jù);MapReduce;負(fù)載均衡;抽樣
伴隨著互聯(lián)網(wǎng)、物聯(lián)網(wǎng)和移動(dòng)互聯(lián)網(wǎng)的快速發(fā)展,每天會(huì)產(chǎn)生海量數(shù)據(jù),數(shù)據(jù)處于爆炸式的增長(zhǎng)狀態(tài),這預(yù)示著大數(shù)據(jù)時(shí)代的到來(lái)。MapReduce[1]是Google提出的一種分布式計(jì)算框架。由于其具有高可擴(kuò)展性、高效性和容錯(cuò)性等特點(diǎn),在大規(guī)模數(shù)據(jù)處理中得到了廣泛應(yīng)用。用戶使用MapReduce處理海量數(shù)據(jù)時(shí),只需根據(jù)業(yè)務(wù)邏輯編寫(xiě)Map和Reduce函數(shù)即可,并行化、容錯(cuò)、數(shù)據(jù)分發(fā)和負(fù)載平衡等復(fù)雜的技術(shù)由MapReduce運(yùn)行時(shí)庫(kù)自動(dòng)完成。Hadoop是MapReduce的開(kāi)源實(shí)現(xiàn),在云計(jì)算和大數(shù)據(jù)處理等領(lǐng)域應(yīng)用廣泛,成為了研究的熱點(diǎn)。
性能優(yōu)化的重點(diǎn)就是負(fù)載均衡。在MapReduce分布式計(jì)算框架下,用戶提交的作業(yè)被劃分成若干個(gè)塊,每個(gè)塊被分配給一個(gè)Mapper執(zhí)行,Map階段產(chǎn)生的中間結(jié)果經(jīng)過(guò)劃分函數(shù)交給Reducer執(zhí)行并產(chǎn)生最終的結(jié)果。整個(gè)作業(yè)的完成時(shí)間由Reducer運(yùn)行最慢的決定。當(dāng)節(jié)點(diǎn)的負(fù)載出現(xiàn)不均衡時(shí),負(fù)載重的節(jié)點(diǎn)會(huì)制約作業(yè)的完成時(shí)間。因此,每個(gè)節(jié)點(diǎn)能否在一致的時(shí)間內(nèi)完成是影響分布式計(jì)算性能的關(guān)鍵因素。
文中首先通過(guò)抽樣獲取key的頻率分布信息,然后根據(jù)數(shù)據(jù)本地性特征,利用貪心策略實(shí)現(xiàn)Reduce節(jié)點(diǎn)的負(fù)載均衡。
1.1 MapReduce
Google過(guò)去在處理海量數(shù)據(jù)時(shí),采用了高配置服務(wù)集群的方法,但是隨著數(shù)據(jù)規(guī)模越來(lái)越大,傳統(tǒng)方法在性能方面表現(xiàn)不足。Google設(shè)計(jì)了新的分布式計(jì)算模型MapReduce,MapReduce可以部署在廉價(jià)的商用機(jī)器上,提高了處理海量數(shù)據(jù)的性能,降低了硬件成本。MapReduce最大的優(yōu)勢(shì)就是簡(jiǎn)單易用。
Hadoop開(kāi)源實(shí)現(xiàn)了Google的MapReduce模型,并且提供了分布式文件系統(tǒng)HDFS。用戶提交作業(yè)后,數(shù)據(jù)被等大小切分給Map節(jié)點(diǎn)處理,Map節(jié)點(diǎn)執(zhí)行Map函數(shù)產(chǎn)生中間結(jié)果并根據(jù)劃分函數(shù)保存在本地磁盤。Reduce節(jié)點(diǎn)讀取中間結(jié)果執(zhí)行Reduce函數(shù)產(chǎn)生最終的結(jié)果。在這個(gè)過(guò)程中,用戶不必關(guān)注分布式處理的細(xì)節(jié),作業(yè)調(diào)度、數(shù)據(jù)劃分以及容錯(cuò)處理這些細(xì)節(jié)由MapReduce自動(dòng)完成。除了Google的內(nèi)部實(shí)現(xiàn)外,MapReduce還有一個(gè)應(yīng)用廣泛的開(kāi)源實(shí)現(xiàn)Hadoop。
1.2 已有工作
MapReduce默認(rèn)的劃分方法是把哈希值相同的key分配給同一個(gè)Reducer節(jié)點(diǎn),在數(shù)據(jù)傾斜的情況下,容易造成Reducer端負(fù)載不均,影響任務(wù)的完成時(shí)間。目前研究人員針對(duì)Reducer端負(fù)載不平衡做了大量的研究工作。文獻(xiàn)[2]提出的SkewTune系統(tǒng)對(duì)Hadoop進(jìn)行功能增強(qiáng),當(dāng)有空閑節(jié)點(diǎn)時(shí),系統(tǒng)會(huì)將當(dāng)前負(fù)載最重的任務(wù)分配給空閑節(jié)點(diǎn),從而縮短整個(gè)作業(yè)的執(zhí)行時(shí)間。文獻(xiàn)[3]提出了LEEN算法。該算法設(shè)計(jì)了最優(yōu)的劃分函數(shù),通過(guò)把一個(gè)key分配到最合適的分組來(lái)實(shí)現(xiàn)負(fù)載均衡。這些方法都是在MapReduce運(yùn)行過(guò)程中進(jìn)行調(diào)整,操作復(fù)雜,通用性低。
除了修改MapReduce框架來(lái)消除負(fù)載不均外,目前還有一種常用的方法就是抽樣。文獻(xiàn)[4]通過(guò)抽樣把key劃分成大、中、小三種負(fù)載,劃分函數(shù)根據(jù)負(fù)載大小的不同會(huì)有不同的處理方式。文獻(xiàn)[5]先執(zhí)行一個(gè)MapReduce作業(yè),抽樣統(tǒng)計(jì)key的分布情況,從而給出自定義的劃分函數(shù)。文獻(xiàn)[6]基于range partition提出了改進(jìn)的方法。文獻(xiàn)[7]在簡(jiǎn)單采樣的基礎(chǔ)上提出性能更優(yōu)的動(dòng)態(tài)劃分方法。
基于以上研究工作,文中嘗試?yán)脭?shù)據(jù)本地性和抽樣來(lái)完善Reduce負(fù)載均衡機(jī)制。首先通過(guò)抽樣獲取key的分布,其次理論分析Hash算法的不足,結(jié)合數(shù)據(jù)本地性提出貪心策略的Reduce端負(fù)載均衡,最后通過(guò)大規(guī)模的數(shù)據(jù)驗(yàn)證算法的有效性。
2.1 數(shù)據(jù)傾斜
MapReduce的性能很大程度上依賴于數(shù)據(jù)的分布[8],如果數(shù)據(jù)分布不均勻性能就無(wú)法保證。但是科學(xué)數(shù)據(jù)往往都是存在傾斜的,MapReduce在處理傾斜數(shù)據(jù)時(shí),Map階段的中間結(jié)果利用哈希函數(shù)分配給Reduce階段的節(jié)點(diǎn),MapReduce哈希劃分:partition-Num=key.hashCode()%REDUCER_NUM。這種方法可以保證每個(gè)Reducer處理的劃分中包含的分組數(shù)目相同,但無(wú)法保證分組內(nèi)部記錄總數(shù)相同,特別是在數(shù)據(jù)傾斜的情況下[9]。使用 Hash算法時(shí),多個(gè) key的hashcode與Reduce節(jié)點(diǎn)數(shù)量求余數(shù)之后可能具有相同的值,從而使數(shù)據(jù)劃分集中于某一個(gè)Reduce節(jié)點(diǎn),造成數(shù)據(jù)分布不均衡。Hash算法沒(méi)有考慮key的頻次,可能存在一些頻次大的key被劃分到同一Reduce節(jié)點(diǎn),造成數(shù)據(jù)不均衡[10]。
如圖1所示,有3個(gè)數(shù)據(jù)節(jié)點(diǎn),Map端輸入數(shù)據(jù)有9個(gè)key值,每個(gè)key值的數(shù)據(jù)量不相等,但是每個(gè)數(shù)據(jù)節(jié)點(diǎn)的總量是相等的。圖中Node1的key值K3的數(shù)據(jù)量為12,表示為K3:12。計(jì)算可得Node1的數(shù)據(jù)總量為70。則由Hadoop默認(rèn)的哈希劃分函數(shù)分區(qū)之后,Reduce端輸入的數(shù)據(jù)量不相等,出現(xiàn)了數(shù)據(jù)傾斜,三個(gè)Reducer的數(shù)據(jù)量分別為34,56,120,Reducer3的數(shù)據(jù)量比其他兩個(gè)節(jié)點(diǎn)的數(shù)據(jù)量多,數(shù)據(jù)傾斜會(huì)影響任務(wù)的最終完成時(shí)間。
圖1 哈希劃分不平衡示例
2.2 抽 樣
從總體單位中抽取部分單位作為樣本的方法就是抽樣[11]。其基本要求是要保證所抽取的樣品單位對(duì)全部樣品具有充分的代表性。文中算法針對(duì)大規(guī)模的數(shù)據(jù)進(jìn)行處理,如果對(duì)所有的數(shù)據(jù)進(jìn)行統(tǒng)計(jì),成本太高,因此采用抽樣方法,獲取key的頻率分布[12]。
在抽樣前,需要總體單位有序,根據(jù)樣本容量確定抽樣間隔。假設(shè)總體單位容量為M,樣本容量為N,則抽樣間隔為K=M/N。從總體中隨機(jī)確定一個(gè)單位作為第一個(gè)樣本,然后每隔K個(gè)距離確定一個(gè)樣本單位,達(dá)到樣本容量即停止。抽取的樣本容量越多,抽樣的準(zhǔn)確度越高。
2.3 負(fù)載均衡方法
文中提出基于采樣的方法,對(duì)Mapper的輸出結(jié)果進(jìn)行采樣,增加一個(gè)mapreduce job來(lái)獲得key的分布信息。這個(gè)過(guò)程主要包括兩個(gè)步驟:
步驟1:運(yùn)行一個(gè)mapreduce job獲得中間數(shù)據(jù)集樣本,然后統(tǒng)計(jì)key的分布,根據(jù)樣本分布產(chǎn)生劃分。劃分結(jié)果用一個(gè)映射數(shù)據(jù)結(jié)構(gòu)表示:(k,p)鍵為k被劃分到了p。
步驟2:運(yùn)行真正的數(shù)據(jù)處理任務(wù)。劃分函數(shù)根據(jù)步驟1獲得的(k,p)產(chǎn)生劃分策略而不再利用散列劃分。
然而,在MapReduce現(xiàn)有的調(diào)度策略中并未充分考慮數(shù)據(jù)本地性[13],在任務(wù)的調(diào)度過(guò)程中只是簡(jiǎn)單地從隊(duì)列中取出第一個(gè)待分配的任務(wù)給當(dāng)前可用節(jié)點(diǎn)而忽略了中間數(shù)據(jù)的分布特點(diǎn),因此可能導(dǎo)致大量的中間文件必須跨網(wǎng)絡(luò)傳輸?shù)皆摴?jié)點(diǎn),如圖2所示。
在圖2中,Partion2和Partion3都要跨網(wǎng)絡(luò)傳輸,增加了時(shí)間開(kāi)銷。為了減少網(wǎng)絡(luò)傳輸帶來(lái)的開(kāi)銷,減少作業(yè)運(yùn)行時(shí)間,文中提出了數(shù)據(jù)本地性感知的抽樣劃分算法。
定義1:設(shè)M表示輸入數(shù)據(jù)的總量,可以用輸入數(shù)據(jù)的行數(shù)近似表示,N表示參與計(jì)算的節(jié)點(diǎn)數(shù),則劃分算法應(yīng)該使節(jié)點(diǎn)的負(fù)載接近M/N。設(shè)P表示鍵值key被劃分的分區(qū),V表示節(jié)點(diǎn)已分配的數(shù)據(jù)總量,TK表示鍵為key的總記錄數(shù),元組(key,sum,node)表示節(jié)點(diǎn)node上鍵key的數(shù)量為sum。
結(jié)合抽樣技術(shù)和數(shù)據(jù)本地性算法的具體執(zhí)行過(guò)程如下:
Step1:在每個(gè)節(jié)點(diǎn)上進(jìn)行抽樣,抽樣的結(jié)果形式為(key,sum,node)。
Step2:統(tǒng)計(jì)所有節(jié)點(diǎn)上鍵為key的總數(shù),用 TK表示。
Step3:將中間結(jié)果按數(shù)量從大到小排序。
Step4:遍歷(key,sum,node),如果(sum+V)小于M/N,則將key劃分到節(jié)點(diǎn)node。
Step5:遍歷處理步驟4中沒(méi)有涉及的 key,把鍵key分配到V最小的節(jié)點(diǎn)中。
Step6:在抽樣過(guò)程中,沒(méi)有抽取的key認(rèn)為是小概率數(shù)據(jù),不影響Reduce端的負(fù)載均衡。沒(méi)有抽取的key使用Hadoop默認(rèn)的哈希劃分。
圖1的例子利用文中的負(fù)載均衡算法進(jìn)行數(shù)據(jù)劃分之后的結(jié)果如圖3所示。可以看出,文中所提的利用數(shù)據(jù)本地性的抽樣方法獲得了較好的Reduce端負(fù)載均衡,優(yōu)化了默認(rèn)的哈希劃分和簡(jiǎn)單的抽樣劃分。
算法1:基于數(shù)據(jù)本地性的抽樣劃分。
Input:pairs of(key,sum,node),M,N
Output:partition result P
1.T←total rows of each key value in all node,put all key into K ,initialize the map P and list V
2.while K is not null
3.if P[key]is not partition and V[node]+TK[key]<=M/N
4.P.add(key,node)
5.V[node]+=TK[key]
6.remove the key from K
7.end if
8.end while
9.while K is not null
10.node←search minimum V[node]in V
11.if P[key]is not partition
12.P.add(key,node)
13.V[node]+=TK[key]
14.remove the key form K
15.end if
16.end while
17.return P
文中搭建Hadoop集群來(lái)驗(yàn)證算法的有效性。實(shí)驗(yàn)集群由6臺(tái)計(jì)算機(jī)組成,每臺(tái)計(jì)算機(jī)內(nèi)存2 G,磁盤空間500 G,奔騰處理器。Hadoop版本1.0.0,操作系統(tǒng)為CentOS6.6,JDK1.6。實(shí)驗(yàn)所采用的測(cè)試方法為利用Hadoop進(jìn)行WordCount計(jì)算,并分別與默認(rèn)Hadoop和文獻(xiàn)[6]中的方法進(jìn)行比較。
實(shí)驗(yàn)結(jié)果如圖4所示。從圖4中可以看到,當(dāng)數(shù)據(jù)分布均勻時(shí),即傾斜度為0時(shí),Hash劃分的性能是最好的。隨著數(shù)據(jù)的傾斜度上升,抽樣的方法運(yùn)行時(shí)間上升緩慢,而Hash上升很快。原因在于當(dāng)數(shù)據(jù)分布均勻時(shí),Hash劃分可以保證負(fù)載均勻,而抽樣的方法增加了抽樣過(guò)程的代價(jià),導(dǎo)致運(yùn)行時(shí)間增加,但是代價(jià)很小。當(dāng)傾斜嚴(yán)重時(shí),抽樣的劃分使負(fù)載均衡,性能受傾斜影響不大。
從圖4中還可以看出,基于數(shù)據(jù)本地性的抽樣劃分性能比僅簡(jiǎn)單抽樣劃分的性能要好。
文中研究了數(shù)據(jù)傾斜下的負(fù)載均衡優(yōu)化問(wèn)題,分析了MapReduce中導(dǎo)致節(jié)點(diǎn)負(fù)載不均的原因,提出了基于數(shù)據(jù)本地性的抽樣劃分方法。實(shí)驗(yàn)結(jié)果表明,與傳統(tǒng)的Hash劃分和只簡(jiǎn)單抽樣的劃分相比,文中提出的方法具有更高的效率。
[1] Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51 (1):107-113.
[2] Kwon Y C,Balazinska M,Howe B,et al.Skewtune:mitigating skew in MapReduce applications[C]//Proceedings of the 2012 ACM SIGMOD international conference on management of data.[s.l.]:ACM,2012:25-36.
[3] Ibrahim S,Jin H,Lu L,et al.Handling partitioning skew in MapReduce using LEEN[J].Peer-to-Peer Networking and Applications,2013,6(4):409-424.
[4] Ramakrishnan S R,Swart G,Urmanov A.Balancing reducer skew in MapReduce workloads using progressive sampling[C]//Proceedings of the third ACM symposium on cloud computing.[s.l.]:ACM,2012.
[5] Xu Y,Zou P,Qu W,et al.Sampling-based partitioning in MapReduce for skewed data[C]//ChinaGrid annual conference. [s.l.]:IEEE,2012:1-8.
[6] 韓 蕾,孫徐湛,吳志川,等.MapReduce上基于抽樣的數(shù)據(jù)劃分最優(yōu)化研究[J].計(jì)算機(jī)研究與發(fā)展,2013,50(S): 77-84.
[7] 周家?guī)?,?琦,高 軍.一種基于動(dòng)態(tài)劃分的MapReduce負(fù)載均衡方法[J].計(jì)算機(jī)研究與發(fā)展,2013,50(S):369-377.
[8] 宛 婉,周國(guó)祥.Hadoop平臺(tái)的海量數(shù)據(jù)并行隨機(jī)抽樣[J].計(jì)算機(jī)工程與應(yīng)用,2014,50(20):115-118.
[9] 萬(wàn) 聰,王翠榮,王 聰,等.MapReduce模型中reduce階段負(fù)載均衡分區(qū)算法研究[J].小型微型計(jì)算機(jī)系統(tǒng),2015,36(2):240-243.
[10]傅 杰,都志輝.一種周期性MapReduce作業(yè)的負(fù)載均衡策略[J].計(jì)算機(jī)科學(xué),2013,40(3):38-40.
[11]李 喬,鄭 嘯.云計(jì)算研究現(xiàn)狀綜述[J].計(jì)算機(jī)科學(xué),2011,38(4):32-37.
[12]劉寒梅,韓宏瑩.基于反饋調(diào)度的MapReduce負(fù)載均衡分區(qū)算法研究[J].信息通信,2015(10):41-42.
[13]李航晨,秦小麟,沈 堯.數(shù)據(jù)本地性感知的MapReduce負(fù)載均衡策略[J].計(jì)算機(jī)科學(xué),2015,42(10):50-56.
Research on Handling Data Skew in MapReduce
WANG Gang,LI Sheng-en
(School of Computer Science and Technology,Shandong Jianzhu University,Jinan 250101,China)
With the rapid development of mobile Internet and the Internet of Things,the data size explosively grows,and people have been in the era of big data.As a distributed computing framework,MapReduce has the ability of processing massive data and becomes a focus in big data.But the performance of MapReduce depends on the distribution of data.The Hash partition function defaulted by MapReduce can’t guarantee load balancing when data is skewed.The time of job is affected by the node which has more data to process.In order to solve the problem,sampling is used.It does a MapReduce job to sample before dealing with user’s job in this paper.After learning the distribution of key,load balance of data partition is achieved using data locality.The example of WordCount is tested in experimental platform.Results show that data partition using sample is better than Hash partition,and taking data locality is much better than that using sample but no data locality.
big data;MapReduce;load balancing;sampling
TP301
A
1673-629X(2016)09-0201-04
10.3969/j.issn.1673-629X.2016.09.045
2015-10-22
2016-02-24< class="emphasis_bold">網(wǎng)絡(luò)出版時(shí)間:
時(shí)間:2016-08-01
國(guó)家自然科學(xué)基金資助項(xiàng)目(61170052)
王 剛(1990-),男,碩士研究生,CCF會(huì)員,研究方向?yàn)榇髷?shù)據(jù)、數(shù)據(jù)庫(kù);李盛恩,教授,研究方向?yàn)閿?shù)據(jù)庫(kù)、數(shù)據(jù)挖掘。
http://www.cnki.net/kcms/detail/61.1450.TP.20160801.0842.012.html