曹云鵬,王海峰
(1.臨沂大學(xué) 信息科學(xué)與工程學(xué)院,山東 臨沂 276002;2.山東省網(wǎng)絡(luò)環(huán)境智能計(jì)算技術(shù)重點(diǎn)實(shí)驗(yàn)室 臨沂大學(xué)研究所,山東 臨沂 276002)(*通信作者電子郵箱gadfly7@126.com)
隨著互聯(lián)網(wǎng)、物聯(lián)網(wǎng)、云計(jì)算等技術(shù)的迅猛發(fā)展,信息空間中各類應(yīng)用層出不窮,這些應(yīng)用在改變?nèi)祟惿罘绞降耐瑫r產(chǎn)生了巨大數(shù)據(jù)資源。全球數(shù)據(jù)量以指數(shù)增長,并呈現(xiàn)出爆炸式的發(fā)展趨勢,根據(jù)國際數(shù)據(jù)公司(International Data Corporation, IDC)的報告,預(yù)計(jì)到2020年全球數(shù)據(jù)總量達(dá)到45 ZB。一個大規(guī)模生產(chǎn)、分享和應(yīng)用數(shù)據(jù)的時代已經(jīng)來臨,在電子政務(wù)、智能交通、智能制造、精準(zhǔn)醫(yī)學(xué)、海洋監(jiān)測、現(xiàn)代農(nóng)業(yè)、電子商務(wù)和物流管理等應(yīng)用領(lǐng)域中都需要對海量數(shù)據(jù)進(jìn)行處理和深入分析,海量數(shù)據(jù)的分析與查詢成為一個重要研究應(yīng)用領(lǐng)域。MapReduce是谷歌公司提出的一種分布式大數(shù)據(jù)處理的計(jì)算模型,基于MapReduce的應(yīng)用程序能運(yùn)行在大規(guī)模異構(gòu)集群中,采用典型的主從模式分割大數(shù)據(jù)處理作業(yè),映射到各個從屬節(jié)點(diǎn)中并行計(jì)算,最后匯聚和合并計(jì)算結(jié)果。因此普通用戶在MapReduce處理海量數(shù)據(jù)時根據(jù)具體算法邏輯編寫Map和Reduce函數(shù),并行化、容錯、數(shù)據(jù)分發(fā)和負(fù)載均衡等復(fù)雜的管理工作由底層框架自動完成?,F(xiàn)有的Hadoop、Spark都是基于MapReduce計(jì)算模式的開源框架,用戶提交的作業(yè)被劃分成若干數(shù)據(jù)塊,每個數(shù)據(jù)塊分配到集群節(jié)點(diǎn)的一個Mapper執(zhí)行。當(dāng)Map計(jì)算階段完成后會產(chǎn)生大量中間結(jié)果,再把中間數(shù)據(jù)作為Reducer的輸入來匯總計(jì)算結(jié)果,整個計(jì)算作業(yè)的性能由Reducer運(yùn)行最慢的來決定。MapReduce計(jì)算模式是大數(shù)據(jù)并行處理應(yīng)用最廣泛的模型,因此具有較大研究價值。
現(xiàn)有研究和工程實(shí)踐中發(fā)現(xiàn)MapReduce計(jì)算模型的中間數(shù)據(jù)處理階段(Shuffle過程)會產(chǎn)生海量通信數(shù)據(jù)。在Facebook的數(shù)據(jù)中心內(nèi)Shuffle匯聚階段產(chǎn)生的網(wǎng)絡(luò)流量占到數(shù)據(jù)中心總流量的46%。MapReduce計(jì)算模型的性能主要影響因素是Map階段的計(jì)算性能和Shuffle過程中的網(wǎng)絡(luò)通信性能[1]。在Shuffle階段集群中大量計(jì)算節(jié)點(diǎn)利用網(wǎng)絡(luò)來交換并重新定位中間數(shù)據(jù),為Reduce階段的匯總計(jì)算提供輸入數(shù)據(jù)。例如在Facebook數(shù)據(jù)中心的MapReduce計(jì)算集群中,所有MapReduce作業(yè)Shuffle網(wǎng)絡(luò)傳輸占總運(yùn)行時間的33%,其中26%的作業(yè)Shuffle傳輸占到50%以上,有16%的作業(yè)網(wǎng)絡(luò)傳輸占據(jù)70%以上的運(yùn)行時間[1],因此中間數(shù)據(jù)的通信成為性能優(yōu)化的瓶頸。
本文針對多租戶的異構(gòu)集群計(jì)算環(huán)境,研究Map密集型的大數(shù)據(jù)處理作業(yè)的中間數(shù)據(jù)通信局部性,利用機(jī)器學(xué)習(xí)的方法來挖掘MapReduce作業(yè)的通信局部性,嘗試探索中間數(shù)據(jù)通信活躍度與作業(yè)特征之間的相關(guān)性,通過作業(yè)分類來實(shí)施通信優(yōu)化策略,降低集群中跨機(jī)架交換機(jī)數(shù)據(jù)通信量。實(shí)驗(yàn)結(jié)果表明,本文提出的通信優(yōu)化方案對Shuffle密集型作業(yè)的計(jì)算性能優(yōu)化效果達(dá)到4%~5%,而且數(shù)據(jù)量越大的優(yōu)化效果越明顯。此外,隨著異構(gòu)集群規(guī)模的增加,通信優(yōu)化效果比較穩(wěn)定,表明該方法對集群擴(kuò)展性具有很好的適應(yīng)性。
MapReduce是一個典型的大數(shù)據(jù)并行處理計(jì)算模型。從開源框架Hadoop推出以來,學(xué)術(shù)界開展了大量性能參數(shù)調(diào)優(yōu)的研究。在MapReduce計(jì)算模型中Shuffle過程需要從各個節(jié)點(diǎn)匯聚大量Map任務(wù)的輸出數(shù)據(jù),再傳送到Reduce任務(wù)作為輸出,這個過程會產(chǎn)生大量磁盤I/O讀寫和網(wǎng)絡(luò)讀寫,因此Shuffle階段是典型的I/O密集型任務(wù)。本文關(guān)注Shuffle階段中間數(shù)據(jù)的通信優(yōu)化問題,現(xiàn)有研究主要從以下三個角度來優(yōu)化Shuffle中間數(shù)據(jù)的讀寫性能:1)從軟件整體架構(gòu)和流程優(yōu)化角度來研究。Yu等[2]提出優(yōu)化的Hadoop框架來實(shí)現(xiàn)Shuffle、Merge和Reduce階段的交替,減少數(shù)據(jù)復(fù)制和網(wǎng)絡(luò)傳輸,提高數(shù)據(jù)移動性能;Rahman等[3]針對Shuffle密集型的MapReduce計(jì)算作業(yè)設(shè)計(jì)了一個利用Lustre文件存儲系統(tǒng)來保存中間計(jì)算數(shù)據(jù)的體系優(yōu)化方案,并利用RDMA(Remote Direct Memory Access)機(jī)制的權(quán)限目錄選擇來實(shí)時處理中間數(shù)據(jù)。2)從任務(wù)調(diào)度角度來優(yōu)化中間數(shù)據(jù)通信。Shi等[4]提出智能Shuffle任務(wù)調(diào)度策略,用來解決中間數(shù)據(jù)重新定位產(chǎn)生的網(wǎng)絡(luò)擁塞和Reduce任務(wù)不均衡問題,該調(diào)度方法適合多種網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu);Arslan等[5]考慮數(shù)據(jù)局部性和網(wǎng)絡(luò)拓?fù)鋼砣麅蓚€因素,設(shè)計(jì)了一個Reduce任務(wù)的調(diào)度策略,把Reduce任務(wù)盡量調(diào)度到離Map產(chǎn)生的中間數(shù)據(jù)近的節(jié)點(diǎn)中,以此提高通信性能和能效;Zheng等[6]根據(jù)Map和Shuffle階段對計(jì)算資源的需求不同,對CPU密集和輸入/輸出(I/O)密集的任務(wù)采用聯(lián)合調(diào)度方式來提高資源復(fù)用率。3)從MapReduce計(jì)算模式中考慮細(xì)粒度的數(shù)據(jù)劃分、放置策略等。Ke等[7]針對Shuffle中間數(shù)據(jù)通信問題,設(shè)計(jì)了一種新的數(shù)據(jù)分區(qū)和聚合方式來減少中間數(shù)據(jù)通信;此后該研究團(tuán)隊(duì)在Hadoop基礎(chǔ)上增加了一個中間數(shù)據(jù)聚合測試體系設(shè)計(jì),以減少Shuffle階段的網(wǎng)絡(luò)通信量[8];對Shuffle再進(jìn)行更細(xì)粒度的分析,分成排序、分組和數(shù)據(jù)傳輸?shù)入A段,通過優(yōu)化Shuffle中操作序列的方式來提高中間數(shù)據(jù)通信性能[9];Yu等[10]引入虛擬Shuffle的思想,通過虛擬Shuffle來解決磁盤讀寫競爭和中間數(shù)據(jù)移動性能瓶頸的問題;從MapReduce框架中數(shù)據(jù)放置策略優(yōu)化的角度來減少跨數(shù)據(jù)中心的數(shù)據(jù)傳輸和跨機(jī)架的數(shù)據(jù)通信[11];Lee等[12]挖掘MapReduce計(jì)算模式中數(shù)據(jù)塊的深度局部性,通過分析數(shù)據(jù)局部性來調(diào)整數(shù)據(jù)塊放置策略,解決Shuffle階段的數(shù)據(jù)重定位引起的通信瓶頸問題。
本文方案從MapReduce任務(wù)調(diào)度的角度來優(yōu)化Shuffle階段的數(shù)據(jù)傳輸性能,采用統(tǒng)計(jì)分類的方法來挖掘作業(yè)中間數(shù)據(jù)網(wǎng)絡(luò)通信的局部性,比Arslan方案[5]有更好的準(zhǔn)確性,最后通過重新部署Map任務(wù)的分配將通信活躍的作業(yè)都集中到同一個機(jī)架內(nèi),減少跨機(jī)架數(shù)據(jù)傳輸?shù)耐ㄐ帕俊1疚姆桨溉コ紤]數(shù)據(jù)局部性和網(wǎng)絡(luò)擁塞的復(fù)雜性,具有較好的應(yīng)用價值。
本節(jié)中的優(yōu)化方法不考慮網(wǎng)絡(luò)擁塞和數(shù)據(jù)傳輸?shù)募?xì)節(jié),將MapReduce作業(yè)中通信活躍的作業(yè)調(diào)度到同一個集群機(jī)架中,這樣節(jié)點(diǎn)之間大量的Shuffle數(shù)據(jù)傳輸被控制在機(jī)架之內(nèi),通過減少跨機(jī)架數(shù)據(jù)傳輸來提高中間數(shù)據(jù)傳輸性能。
先提取MapReduce作業(yè)的特征,然后建立貝葉斯分類模型。利用分類模型把作業(yè)分成通信活躍和通信不活躍的兩大類,重新調(diào)度通信活躍作業(yè),將其各任務(wù)部署到集群同一機(jī)架內(nèi),以此減少中間數(shù)據(jù)跨機(jī)架的傳輸延遲。另外為了實(shí)現(xiàn)負(fù)載均衡,將通信不活躍的作業(yè)依據(jù)比例向性能排序的計(jì)算節(jié)點(diǎn)上調(diào)度。
該方案需要解決以下四個問題:1)Map密集型作業(yè)特征提取。提取出大數(shù)據(jù)實(shí)時處理作業(yè)的運(yùn)行前靜態(tài)特征,建立作業(yè)特征向量為下一步的分類做好準(zhǔn)備。2)異構(gòu)集群中通信活躍度的度量方式。為了研究作業(yè)特征與集群內(nèi)跨機(jī)架通信局部性的非線性關(guān)系,建立通信活躍度模型以及設(shè)計(jì)可行的測量方法。3)采用機(jī)器學(xué)習(xí)方法構(gòu)造一個作業(yè)分類預(yù)測模型,通過預(yù)測模型實(shí)現(xiàn)大數(shù)據(jù)作業(yè)的通信局部性的預(yù)判。4)在作業(yè)分類預(yù)測模型基礎(chǔ)上提出優(yōu)化的大數(shù)據(jù)布局方案來解決中間數(shù)據(jù)的通信性能瓶頸問題。
在MapReduce模型中計(jì)算作業(yè)初始化后,根據(jù)集群中資源配置情況形成若干個Map任務(wù)和Reduce任務(wù),這些計(jì)算任務(wù)要分布到集群中不同節(jié)點(diǎn)并發(fā)執(zhí)行。對于Map密集型作業(yè)有大量的Map任務(wù)處理原始輸入數(shù)據(jù),并且在各節(jié)點(diǎn)中形成中間計(jì)算結(jié)果;Reduce任務(wù)則以中間結(jié)果為輸入,計(jì)算得出最終處理結(jié)果。目前MapReduce系統(tǒng)中Map任務(wù)分發(fā)是靜態(tài)的,其分配結(jié)果是固定的分布。然而Reduce任務(wù)的分配是動態(tài)的,具有不確定性。現(xiàn)有研究中使用較多的作業(yè)特征是MapReduce作業(yè)的運(yùn)行時性能參數(shù),例如Map、Reduce階段的執(zhí)行時間,Reduce階段的數(shù)據(jù)復(fù)制、排序的時間,運(yùn)行時刻集群資源的利用率(CPU、內(nèi)存、磁盤和網(wǎng)卡的利用率)。然而運(yùn)行時刻特征很難應(yīng)用到作業(yè)分類預(yù)測中,因此必須使用作業(yè)運(yùn)行前的Map任務(wù)分配信息作為特征。提取靜態(tài)作業(yè)特征的思路如下:一個MapReduce作業(yè)被劃分成多個任務(wù),然后被分配到集群的各個計(jì)算節(jié)點(diǎn)中;將集群計(jì)算節(jié)點(diǎn)視為一個矩陣,矩陣中各元素的值為分配到該節(jié)點(diǎn)的Map任務(wù)數(shù);最后以Map任務(wù)矩陣作為作業(yè)的特征向量。
下面舉例說明作業(yè)特征提取的具體方案。
假設(shè)異構(gòu)集群G是一個由計(jì)算節(jié)點(diǎn)組成的集合,其中包括機(jī)架G={R1,R2,…,Rp},每個機(jī)架中包括若干節(jié)點(diǎn)Ri={n1,n2,…,nq}。MapReduce作業(yè)J根據(jù)Map任務(wù)處理的數(shù)據(jù)塊來實(shí)現(xiàn)Map任務(wù)劃分和映射,比如Map任務(wù)Mij表示運(yùn)行在集群G中第i個機(jī)架上第j個節(jié)點(diǎn)nij中的Map任務(wù)。任何實(shí)現(xiàn)MapReduce計(jì)算模型的系統(tǒng),或許使用多種任務(wù)分發(fā)和調(diào)度算法,但是最終每個作業(yè)調(diào)度后都會形成一個Map任務(wù)矩陣J=[Mij]p×q,該矩陣表示作業(yè)在集群運(yùn)行之前的分發(fā)狀態(tài),矩陣中的元素Mij表示映射到該節(jié)點(diǎn)nij中的Map任務(wù)數(shù)量;若集群中的某節(jié)點(diǎn)失效或者不存在,則設(shè)置Mij為空(Null)。在此以一個5個機(jī)架每個機(jī)架有6節(jié)點(diǎn)的集群為例,假設(shè)一個作業(yè)Ji的Map任務(wù)矩陣如式(1)所示:
(1)
在任務(wù)矩陣中非零元素表示分配到到該節(jié)點(diǎn)的Map任務(wù)數(shù)量,比如M02=7為集群中R0機(jī)架中第三個節(jié)點(diǎn)n02中分配了7個Map計(jì)算任務(wù);所有零元素表示該節(jié)點(diǎn)未分配到任務(wù),比如M00=0;Null節(jié)點(diǎn)表示該節(jié)點(diǎn)不存在或者處于故障狀態(tài)。以該任務(wù)矩陣為基礎(chǔ)提取作業(yè)的運(yùn)行前特征。由于Null節(jié)點(diǎn)是一種隨機(jī)狀態(tài)并且在樣本訓(xùn)練時完全可以確定,因此將Null節(jié)點(diǎn)簡化為零元素節(jié)點(diǎn),即為處于工作狀態(tài)的節(jié)點(diǎn)視為未分配到任務(wù)階段。然后再用一個較大的Map任務(wù)數(shù)來規(guī)范化矩陣(Max=100),把矩陣中的非零元素規(guī)范到區(qū)間[0,1]內(nèi),應(yīng)用上述轉(zhuǎn)化規(guī)則后任務(wù)矩陣由式(1)變?yōu)槭?2):
(2)
總之,Map任務(wù)矩陣是任務(wù)節(jié)點(diǎn)之間并行特征的表現(xiàn)形式,在此提取節(jié)點(diǎn)并行特征并轉(zhuǎn)化成特征向量,比如式(2)矩陣轉(zhuǎn)化為特征向量組如下:
Ti=(t0,t1,t2,t3,t4)
(3)
向量組的每個向量是式(2)矩陣中的一行,例如:
t0=(0,0.04,0.07,0,0,0)
另外,該特征向量提取方法不失一般性,可以適應(yīng)各種MapReduce生態(tài)系統(tǒng)的調(diào)度算法。由于在相同的作業(yè)調(diào)度算法前提下研究作業(yè)的分類以及通信優(yōu)化,MapReduce作業(yè)的靜態(tài)特征只受作業(yè)數(shù)據(jù)規(guī)模大小影響,因此該提取特征方法通用性較好。此外,MapReduce作業(yè)特征是一種由調(diào)度算法生成的預(yù)先調(diào)度方案,這種預(yù)調(diào)度方案用于作業(yè)的分類判斷中,根據(jù)分類判斷再重新生成能夠提高中間數(shù)據(jù)通信局部性的調(diào)度方案,最后實(shí)施的是優(yōu)化后的調(diào)度方案。
接下來需要對MapReduce作業(yè)分類。為了有效控制中間數(shù)據(jù)的通信局部性,本文將作業(yè)分成通信活躍和不活躍兩大類,因此要解決對中間數(shù)據(jù)通信活躍度的量化問題。在此引入集群通信活躍度,然后根據(jù)通信活躍度進(jìn)行作業(yè)分類預(yù)測。設(shè)一個大數(shù)據(jù)作業(yè)Ji的計(jì)算節(jié)點(diǎn)集合Ri={n1,n2,…,np},然而這個計(jì)算節(jié)點(diǎn)集合Ri也是一個通信集合。若該通信集合中一對跨機(jī)架節(jié)點(diǎn)ni與nj的數(shù)據(jù)通信量為dj:
dj(〈ni,nj〉,ni∈Ri,nj∈Rj,i≠j)
(4)
則作業(yè)Ji的中間數(shù)據(jù)通信量為Di:
(5)
其中:k是作業(yè)Ji通信集合中跨機(jī)架通信的節(jié)點(diǎn)對數(shù)目。因此一個作業(yè)Ji的集群通信活躍度CAi定義如下:
(6)
其中:取一個作業(yè)跨機(jī)架通信量的平均值作為其集群通信活躍度的量化指標(biāo),并且使用單個跨機(jī)架通信的最大值dmax把CA歸一化到區(qū)間[0,1]。接著要確定通信活躍度的閾值參數(shù)。為了減少閾值確定的主觀性,使用樣本數(shù)據(jù)分布分析的方式。首先從Hadoop基準(zhǔn)程序集合和修改現(xiàn)有的大數(shù)據(jù)分析作業(yè)選擇50個樣本作業(yè),然后采用Hadoop的作業(yè)并行分布方法運(yùn)算,再具體測量作業(yè)中間數(shù)據(jù)通信量。具體方法如下:Hadoop中通過事件來確定Map階段開始和結(jié)束,并且反饋到Web監(jiān)控頁面中。在此監(jiān)聽Map結(jié)束事件來統(tǒng)計(jì)Map結(jié)束到整個作業(yè)結(jié)束期間的數(shù)據(jù)通信(暫時忽略Shuffle階段結(jié)束到作業(yè)結(jié)束階段的傳輸數(shù)據(jù),這個階段的網(wǎng)絡(luò)通信量非常小)。再監(jiān)聽每個物理節(jié)點(diǎn)Shuffle handler進(jìn)程的端口,統(tǒng)計(jì)該端口以超文本傳輸協(xié)議(HyperText Transfer Protocol, HTTP)傳輸?shù)闹虚g數(shù)據(jù)量。最后統(tǒng)計(jì)出機(jī)架內(nèi)和跨機(jī)架的通信量。最后通過式(6)計(jì)算出每個作業(yè)的通信活躍度,統(tǒng)計(jì)結(jié)果如圖1所示。
圖1 樣本作業(yè)通信活躍度分布
通過觀察樣本作業(yè)通信活躍度的分布可看出,在0.3~0.4存在一個明顯的分界線。利用樣本通信活躍度置信區(qū)間分析,接受30%的作業(yè)作為通信活躍作業(yè)時閾值可設(shè)定為0.38。
本節(jié)建立MapReduce作業(yè)分類器,通過樣本作業(yè)的分類結(jié)果來訓(xùn)練分類器,然后使用分類器預(yù)測未來出現(xiàn)MapReduce作業(yè)。該分類器的輸入是作業(yè)的預(yù)調(diào)度Map任務(wù)分配矩陣信息,輸出為通信活躍類型的判斷結(jié)果。分類是機(jī)器學(xué)習(xí)中一個有監(jiān)督學(xué)習(xí)的方法。通過分析歷史數(shù)據(jù)的分類情況來構(gòu)造一個分類函數(shù)或者分類模型,再利用該模型把待預(yù)測數(shù)據(jù)映射到某一特定類別中。構(gòu)造分類器模型需要一個樣本數(shù)據(jù)作為訓(xùn)練學(xué)習(xí)的輸入,每個樣本數(shù)據(jù)包含若干屬性,由屬性組成特征向量。另外訓(xùn)練數(shù)據(jù)集的樣本數(shù)據(jù)除特征向量之外還有與之對應(yīng)的類標(biāo)簽。在此大數(shù)據(jù)作業(yè)Ti=((t0,t1,t2,t3,t4),C),其中:(t0,t1,t2,t3,t4)表示作業(yè)的特征向量;C為類標(biāo)簽(C1為通信活躍作業(yè),C0為通信惰性作業(yè))。
樸素貝葉斯分類是當(dāng)前公認(rèn)的一種簡單而有效的概率分類方法,在一般貝葉斯算法的基礎(chǔ)上通過假定各種因素之間不存在關(guān)系,即各個因素完全獨(dú)立而得到的一種簡化貝葉斯分類法。本文工作選用樸素貝葉斯模型主要有兩個原因:1)從訓(xùn)練學(xué)習(xí)角度而言,樸素貝葉斯分類模型非常適合小樣本訓(xùn)練集合,而本文工作樣本數(shù)據(jù)采集難度較大,很難形成較大的訓(xùn)練數(shù)據(jù)集。2)樸素貝葉斯算法簡單而且性能好,而計(jì)算的高效性和高精度能夠滿足本研究MapReduce作業(yè)分類的性能需求。
樸素貝葉斯分類的思想是利用貝葉斯定理來預(yù)測一個未知類別數(shù)據(jù)屬于各個類別的可能性,并且選擇可能性最大的一個類別作為該樣本的預(yù)測結(jié)果。假設(shè)有k個類的集合{C1,C2,…,Ck}和一個未知類別的樣本X={x1,x2,…,xk},則貝葉斯定理為:
(7)
由于P(X)對所有類別都是常數(shù),因此通過式(8)可以計(jì)算出X屬于每一個類的概率,并取最大概率值的類為預(yù)測值。
(8)
為了簡化研究異構(gòu)集群采用固定布局,比如4機(jī)架8節(jié)點(diǎn),每個作業(yè)特征向量則為Ti=(t0,t1,t2,t3),類別C1表示通信活躍作業(yè),C0表示通信惰性作業(yè)。貝葉斯分類器利用未知作業(yè)的預(yù)調(diào)度信息,即由特定調(diào)度算法生成的Map任務(wù)分配矩陣。再將該任務(wù)分配矩陣轉(zhuǎn)化成分類器的輸入向量,由分類器判斷作業(yè)的類別,為下一節(jié)的調(diào)度策略作準(zhǔn)備。
本節(jié)的調(diào)度算法利用作業(yè)分類預(yù)測結(jié)果,對MapReduce作業(yè)的子任務(wù)重新實(shí)施分配部署,減少集群中跨機(jī)架通信傳輸量。先由作業(yè)分類預(yù)測模型來判斷作業(yè)的通信類型,結(jié)果為通信活躍和通信不活躍兩類;再根據(jù)通信作業(yè)類型實(shí)行調(diào)度,對通信活躍的作業(yè)盡量部署到同一機(jī)架,以減少跨機(jī)架通信的性能損失;對于通信惰性大的作業(yè)則維持原來的分配方案。總之,作業(yè)優(yōu)化調(diào)度的內(nèi)涵與反饋控制模型相似,利用作業(yè)的預(yù)調(diào)度信息實(shí)現(xiàn)作業(yè)類型判斷,然后重新調(diào)整最終的調(diào)度方案,這是利用預(yù)調(diào)度信息反饋優(yōu)化最終調(diào)度策略的思想。具體作業(yè)優(yōu)化調(diào)度算法如算法1所示。
在算法1中算法輸出是每個作業(yè)最終的Map部署方案,該部署方案是一個作業(yè)在集群中各節(jié)點(diǎn)上部署的Map任務(wù)數(shù)量。首先取出作業(yè)隊(duì)列的一個作業(yè)J,根據(jù)常規(guī)方法進(jìn)行預(yù)先調(diào)度(行3)),然后提取作業(yè)特征并送入分類預(yù)測函數(shù)處理(行4))。若作業(yè)預(yù)測類型是通信活躍型C1(行5)),則從集群中隨機(jī)選擇一個機(jī)架R,將作業(yè)的所有任務(wù)部署到該機(jī)架的節(jié)點(diǎn)中(行6)~10))。在部署的過程中,先對該中各節(jié)點(diǎn)根據(jù)性能排序(行7)),然后依據(jù)比例對機(jī)架Rj中的m個節(jié)點(diǎn)分配Map任務(wù)(比例公式如行9)所示),最后更新該任務(wù)的分配矩陣(行11))。另一個方面,對于通信惰性大的作業(yè)(作業(yè)類型為C0),則維持原有的調(diào)度方案(行13)~15))。算法1的創(chuàng)新之處在于充分利用Map密集型作業(yè)的通信局部性原理,根據(jù)作業(yè)類型預(yù)測來控制跨機(jī)架通信量。該算法復(fù)雜性為O(n),其中n為集群機(jī)架內(nèi)節(jié)點(diǎn)數(shù)目。由于物理集群擴(kuò)展性限制,機(jī)架內(nèi)節(jié)點(diǎn)數(shù)在一個固定范圍內(nèi),而且數(shù)目較少,因此作業(yè)調(diào)度算法引起的性能損失非常小,與大數(shù)據(jù)作業(yè)的運(yùn)行時間比較而言可忽略。
算法1 Map密集型作業(yè)優(yōu)化部署算法。
輸入 大數(shù)據(jù)作業(yè)隊(duì)列Q(J1,J2,…,Jn)。
輸出 作業(yè)Ji的任務(wù)矩陣。
1) While(Queue(J) is not Null)
2)Ji=Deque(J)
3)Si=Scheduling(Ji)
4)Ji.type=Prediction(Si)
5) IFJi.type=C1Then
6)Rj=Random(R)
7) Sort(Rj)
8) For(i=0 tominRj) Do
9)
10) EndFor
11) update(Ji.S)
12) EndIF
13) IFJi.type=C0Then
14)Ji.S=S
15) EndWhile
本文使用32個節(jié)點(diǎn)的集群,通過三層交換機(jī)建立仿真數(shù)據(jù)中心的仿真實(shí)驗(yàn)環(huán)境,4個機(jī)架,每個機(jī)架交換機(jī)下8個節(jié)點(diǎn)。每個計(jì)算節(jié)點(diǎn)配置兩個Intel Xeon E5620 2.4 GHz的CPU,每個CPU擁有獨(dú)立的一兩級片內(nèi)緩存,共享三級緩存;每個節(jié)點(diǎn)配有16 GB DDR RAM和至少500 GB的SATA硬盤空間。軟件操作系統(tǒng)使用Ubuntu15.0, JDK1.8, Hadoop1.2.1等。
在MapReduce作業(yè)中,Map密集型作業(yè)占比例非常高,因此本文關(guān)注Map密集型作業(yè),實(shí)驗(yàn)選擇的基準(zhǔn)作業(yè)程序都是Map密集型的,分別是Hibench基準(zhǔn)程序集中的Sort、WordCount、TeraSort、Bayesian Classification和K-means Cluster。實(shí)驗(yàn)選擇4個機(jī)架,每個機(jī)架連接4個節(jié)點(diǎn)的集群規(guī)模。
表1中列出每個實(shí)驗(yàn)作業(yè)的數(shù)據(jù)規(guī)模以及運(yùn)行后的數(shù)據(jù)特點(diǎn)。為了方便表述,實(shí)驗(yàn)中本文通信優(yōu)化方案記為IO(Intermediate Optimization),基準(zhǔn)方案為Hadoop中的默認(rèn)計(jì)算方式簡記為MR(MapReduce)。
MapReduce作業(yè)計(jì)算性能主要由計(jì)算時間和中間數(shù)據(jù)傳輸時間來決定的,而本文關(guān)注優(yōu)化中間數(shù)據(jù)傳輸時間,在保證計(jì)算時間不變的情況下作業(yè)的運(yùn)行時間能夠體現(xiàn)數(shù)據(jù)傳輸時間的優(yōu)化效果。首先對比兩種方法的作業(yè)運(yùn)行時間。如圖2所示,基準(zhǔn)作業(yè)st、ts、bc都出現(xiàn)大約有4%~5%性能提升,然而wc和kc兩個作業(yè)并未有明顯的性能改善。
圖2 通信優(yōu)化方案與基準(zhǔn)方法運(yùn)行時間比較
作業(yè)名簡稱數(shù)據(jù)量/GBMap輸入中間數(shù)據(jù)Reduce輸出Sortst120120.000120.0000WordCountwc20011.2304.1000TeraSortts1000140.0001000.0000Bayesian Classificationbc7849.00043.0000K-means Clusterkc660.3300.0046
從表1可知,在5個MapReduce基準(zhǔn)作業(yè)中,st、ts、bc三個作業(yè)會產(chǎn)生大量中間數(shù)據(jù),屬于Shuffle密集型作業(yè),本文的通信優(yōu)化方案針對中間數(shù)據(jù)通信進(jìn)行優(yōu)化,因此對這三個基準(zhǔn)作業(yè)能帶來一定的性能提升;wc和kc兩個作業(yè)不是Shuffle密集型的作業(yè),計(jì)算過程中產(chǎn)生的中間數(shù)據(jù)量非常小,因此性能提升效果并不明顯。由于本文是優(yōu)化作業(yè)網(wǎng)絡(luò)傳輸延遲,因此為了提高實(shí)驗(yàn)的針對性,對Shuffle密集型的作業(yè)TeraSort作進(jìn)一步分析以展示性能優(yōu)化效果。如圖3所示,數(shù)據(jù)集合的規(guī)模分別為32 GB、64 GB、128 GB、512 GB和1 TB,縱坐標(biāo)表示Shuffle階段占整個作業(yè)計(jì)算時間的比例,比例相對減小則說明通信過程得到優(yōu)化。隨著數(shù)據(jù)規(guī)模的增加,所提優(yōu)化方案的Shuffle比例相對減小,說明隨著數(shù)據(jù)規(guī)模的增加通信優(yōu)化對整個計(jì)算性能起了明顯的優(yōu)化作用。數(shù)據(jù)量越大中間傳輸優(yōu)化效果越明顯,體現(xiàn)了本文調(diào)度方案對大數(shù)據(jù)密集計(jì)算的性能提升具有較好的應(yīng)用價值。
圖3 通信優(yōu)化方案與基準(zhǔn)方案TeraSort作業(yè)性能對比
本節(jié)中驗(yàn)證作業(yè)優(yōu)化調(diào)度方案對集群擴(kuò)展的適應(yīng)性,分析集群擴(kuò)展性對調(diào)度算法的影響。實(shí)驗(yàn)中一直使用4個機(jī)架,每個機(jī)架的交換機(jī)分別連接2、4和8個節(jié)點(diǎn),分別構(gòu)造成8、16和32個節(jié)點(diǎn)的實(shí)驗(yàn)床。在每種集群規(guī)模情況下執(zhí)行Shuffle密集型的作業(yè)TeraSort來觀察Shuffle所占比例的變化,如圖3所示。隨著節(jié)點(diǎn)數(shù)量的增加,Shuffle所占比例有了下降趨勢,表明隨著節(jié)點(diǎn)增加通信優(yōu)化的效果才能表現(xiàn)出來。當(dāng)每個機(jī)架只有2個節(jié)點(diǎn)時,Shuffle所占比例最大;然而每個機(jī)架有8個節(jié)點(diǎn)時,作業(yè)處理的數(shù)據(jù)規(guī)模較小時(32 GB),Shuffle所占比例較大,通信優(yōu)化的效果也不明顯,但是隨著數(shù)據(jù)規(guī)模增加通信優(yōu)化效果開始明顯;當(dāng)集群規(guī)模增加,但是計(jì)算作業(yè)數(shù)據(jù)量小時,集群管理的復(fù)雜性提高而中間數(shù)據(jù)通信量小,因此網(wǎng)絡(luò)背景流量掩蓋了網(wǎng)絡(luò)通信的優(yōu)化效果。
圖4 集群擴(kuò)展性對通信優(yōu)化方案影響
在計(jì)算集群或者數(shù)據(jù)中心中,多租戶提交MapReduce作業(yè)是一種常見的應(yīng)用場景,為了驗(yàn)證中間數(shù)據(jù)調(diào)度優(yōu)化算法的應(yīng)用效果,在本節(jié)實(shí)驗(yàn)中模擬集群生產(chǎn)環(huán)境的多租戶多作業(yè)的應(yīng)用場景,并且觀察性能表現(xiàn)。先選用表1中的基準(zhǔn)作業(yè)5類Map密集的作業(yè),然后每個作業(yè)選擇不同的10個數(shù)據(jù)集來模擬50個用戶作業(yè)。采用隨機(jī)算法來模擬多個用戶提交給32節(jié)點(diǎn)集群來計(jì)算。從作業(yè)響應(yīng)時間和作業(yè)計(jì)算時間兩個角度來觀察性能優(yōu)化效果,以進(jìn)入作業(yè)隊(duì)列到開始執(zhí)行的間隔作為響應(yīng)時間,作業(yè)開始執(zhí)行到執(zhí)行完畢為作業(yè)計(jì)算時間。分別統(tǒng)計(jì)5類作業(yè)的平均響應(yīng)時間和作業(yè)計(jì)算時間,如表2~3所示。
表2 作業(yè)平均響應(yīng)時間對比
從表2可看出,在多作業(yè)環(huán)境中本文優(yōu)化方案對降低平均響應(yīng)時間有一定的效果,平均響應(yīng)時間降低了2%~4%;對于中間數(shù)據(jù)通信量小的作業(yè),平均響應(yīng)時間優(yōu)化效果不明顯。但是通過平均響應(yīng)時間的方差數(shù)據(jù)可看出,通信優(yōu)化方案的計(jì)算過程更加穩(wěn)定。
表3 作業(yè)執(zhí)行時間對比
表3顯示本文方案對作業(yè)的平均計(jì)算時間也有一定程度的優(yōu)化效果,而且計(jì)算過程更加穩(wěn)定,對于Shuffle密集的作業(yè)計(jì)算性能優(yōu)化效果明顯。由于本文重點(diǎn)考慮中間數(shù)據(jù)傳輸?shù)男阅軆?yōu)化,對中間數(shù)據(jù)量小的作業(yè)性能效果不顯著。
MapReduce計(jì)算模式的大數(shù)據(jù)處理作業(yè)中以Map密集型作業(yè)為主,Map密集型作業(yè)又能分成Shuffle密集和Shuffle稀疏兩種類型。其中Shuffle密集類型作業(yè)中間計(jì)算數(shù)據(jù)量大,有本文提出的通信優(yōu)化方法對該類型作業(yè)優(yōu)化效果明顯。首先以MapReduce作業(yè)的預(yù)調(diào)度信息為依據(jù),提取作業(yè)運(yùn)行前的靜態(tài)特征;然后采用貝葉斯分類器建立集群機(jī)架通信局部性與作業(yè)特征之間的非線性關(guān)系;再根據(jù)未知作業(yè)特征信息來預(yù)測作業(yè)分類,根據(jù)作業(yè)類別重新優(yōu)化調(diào)度方案并且執(zhí)行優(yōu)化后的調(diào)度方案。實(shí)驗(yàn)結(jié)果表明,本文通信優(yōu)化方案不僅減少了作業(yè)的整體計(jì)算時間,并且減小了Shuffle階段的比例,體現(xiàn)出對中間數(shù)據(jù)網(wǎng)絡(luò)傳輸延遲的優(yōu)化效果;而且該優(yōu)化方案隨著數(shù)據(jù)規(guī)模的增加優(yōu)化效果更好,適合大數(shù)據(jù)密集型的應(yīng)用場景。另外,該方案對集群擴(kuò)展具有良好的適應(yīng)性。在仿真的多租戶多作業(yè)場景中,通信優(yōu)化方案減少了作業(yè)響應(yīng)時間和執(zhí)行時間,因此在工程實(shí)際中有較好的應(yīng)用價值。目前通信優(yōu)化方案對網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)依賴性較大,并且對各種網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)的適應(yīng)性并未得到驗(yàn)證,這也是下一步研究方向。
參考文獻(xiàn)(References)
[1] CHOWDHURY M, ZAHARIA M, MA J, et al. Managing data transfers in computer clusters with orchestra[J]. ACM SIGCOMM Computer Communication Review, 2011, 41(4): 98-109.
[2] YU W, WANG Y, QUE X, et al. Design evaluation of network-levitated merge for Hadoop acceleration[J]. IEEE Transactions on Parallel and Distributed Systems, 2014, 25(3): 602-611.
[3] RAHMAN M W, ISLAM N S, LU X, et al. A comprehensive study of MapReduce over lustre for intermediate data placement and shuffle strategies on HPC clusters[J]. IEEE Transactions on Parallel and Distributed Systems, 2017, 28(3): 633-646.
[4] SHI W, WANG Y, CORRIVEAU J, et al. Smart shuffling in MapReduce: a solution to balance network traffic and workloads[C]// Proceedings of the 2015 IEEE/ACM 8th International Conference on Utility and Cloud Computing. Piscataway, NJ: IEEE, 2015: 35-44.
[5] ARSLAN E, SHEKHAR M, KOSAR T. Locality and network-aware reduce task scheduling for data-intensive applications[C]// DataCloud 2014: Proceedings of the 5th International Workshop on Data-Intensive Computing in the Clouds. Piscataway, NJ: IEEE, 2014: 14-24.
[6] ZHENG H, WAN Z, WU J. Optimizing MapReduce framework through joint scheduling of overlapping phases[C]// Proceedings of the 2016 25th International Conference on Computer Communication and Networks. Piscataway, NJ: IEEE, 2016: 1-9.
[7] KE H, LI P, GUO S, et al. On traffic-aware partition and aggregation in MapReduce for big data applications[J]. IEEE Transactions on Parallel and Distributed Systems, 2016, 27(3): 818-828.
[8] KE H, LI P, GUO S, et al. Aggregation on the fly: reducing traffic for big data in the cloud[J]. IEEE Network, 2015, 29(5): 17-23.
[9] WANG J H, QIU M K, GUO B, et al. Phase-reconfigurable shuffle optimization for Hadoop MapReduce[J]. IEEE Transaction on Cloud Computing, 2017,PP(99): 1.
[10] YU W, WANG Y, QUE X, et al. Virtual shuffling for efficient data movement in MapReduce[J]. IEEE Transactions on Computers, 2015, 64(2): 556-568.
[11] 荀亞玲, 張繼福, 秦嘯.MapReduce集群環(huán)境下的數(shù)據(jù)放置策略[J]. 軟件學(xué)報, 2015, 26(8): 2056-2073.(XUN Y L, ZHANG J F, QIN X. Data placement strategy for MapReduce cluster environment[J]. Journal of Software, 2015, 26(8): 2056-2073.)
[12] LEE S, JO J-Y, KIM Y. Performance improvement of MapReduce process by promoting deep data locality[C]// Proceedings of the 2016 IEEE International Conference on Data Science and Advanced Analytics. Piscataway, NJ: IEEE, 2016: 293-301.
This work is partially supported by the Natural Science Foundation of Shandong Province (ZR2017MF050, ZR2015FL014), the Higher Educational Science and Technology Program of Shandong Province (J17KA049), the Independent Innovation and Achievements Transformation Special Project of Shandong Province (2014ZZCX02702), the Primary Research and Development Project of Shandong Province (2016GGX109001).