季航旭,姜 蘇,趙宇海,吳 剛,王國仁
(1.東北大學(xué)計算機科學(xué)與工程學(xué)院,遼寧 沈陽 110819;2.北京理工大學(xué)計算機學(xué)院,北京 100081)
隨著信息技術(shù)的快速發(fā)展,各個領(lǐng)域積累的數(shù)據(jù)量日漸增多。數(shù)據(jù)量的增加以及數(shù)據(jù)挖掘算法的研究與普及,使得人們越來越重視數(shù)據(jù)中隱含的價值,因此如何快速地從數(shù)據(jù)中獲取有價值的信息成為各個研究領(lǐng)域的關(guān)注點。為了應(yīng)對快速增長的數(shù)據(jù),人們開發(fā)出了一代又一代大數(shù)據(jù)處理系統(tǒng)并產(chǎn)生了大量相關(guān)的優(yōu)化技術(shù)。目前比較流行的大數(shù)據(jù)處理系統(tǒng)有Hadoop[1]、Storm[2]、Samza[3]、Spark[4,5]和Flink[6]等,它們都采用分布式集群的方式進行平臺的搭建和系統(tǒng)的部署,并有著各自獨特的優(yōu)勢。
目前,大數(shù)據(jù)計算系統(tǒng)已經(jīng)普及,基于它們的數(shù)據(jù)查詢和數(shù)據(jù)分析等任務(wù)也日益復(fù)雜化、多樣化,如實時智能推薦、復(fù)雜事件處理等。分布式計算系統(tǒng)經(jīng)常面臨的挑戰(zhàn)是資源分配與作業(yè)調(diào)度,這是分布式環(huán)境與生俱來的問題。由于分布式環(huán)境存在計算資源異構(gòu)、帶寬異構(gòu)和單個作業(yè)內(nèi)部計算方式復(fù)雜等情況,作業(yè)執(zhí)行過程中經(jīng)常出現(xiàn)由于資源分配不合理、調(diào)度優(yōu)化不足導(dǎo)致的效率低、吞吐量低等缺點。更加令人堪憂的是,分布式計算具有計算結(jié)點規(guī)模大、計算任務(wù)復(fù)雜等特點,計算引擎往往要同時運行復(fù)雜繁多的分布式多作業(yè),也就是所謂的分布式多作業(yè)。分布式多作業(yè)相比單作業(yè)在作業(yè)執(zhí)行過程中將更加不利于計算資源的充分利用,這對于分布式大數(shù)據(jù)任務(wù)的執(zhí)行將更加雪上加霜。目前,仍然沒有一個完美的資源分配與管理機制滿足分布式多作業(yè)的需求,因此資源的合理分配與回收仍然是提升大數(shù)據(jù)處理系統(tǒng)計算性能的關(guān)鍵。
現(xiàn)在最常用的大數(shù)據(jù)計算系統(tǒng)(如Flink、Spark)都是以多層執(zhí)行圖(Graph)的方式表示作業(yè)的具體信息與執(zhí)行過程。多層執(zhí)行圖是計算系統(tǒng)在作業(yè)提交與作業(yè)執(zhí)行之間生成的一系列有向無環(huán)圖DAG(Directed Acyclic Graph),也是計算引擎中最核心的數(shù)據(jù)結(jié)構(gòu),它們決定了分布式作業(yè)在每個節(jié)點上的資源部署。也就是說,分布式任務(wù)的執(zhí)行都是根據(jù)執(zhí)行圖中的信息在每個節(jié)點上進行任務(wù)部署。因此,如何在多作業(yè)執(zhí)行過程中使DAG的合并達到最優(yōu),以及如何優(yōu)化作業(yè)的提交順序與調(diào)度策略,將是高效執(zhí)行多作業(yè)的重要保障。
本文通過對主流的大數(shù)據(jù)處理系統(tǒng)的研究和探索,結(jié)合目前流行的大數(shù)據(jù)處理系統(tǒng)優(yōu)化技術(shù),提出并實現(xiàn)了在作業(yè)層面上的多作業(yè)合并算法與調(diào)度策略。本文的主要貢獻點在于:
(1)提出了一種啟發(fā)式作業(yè)合并算法。通過采集到的作業(yè)特征,以作業(yè)并行度為基礎(chǔ)分析DAG結(jié)構(gòu)上的差異性,合并浪費資源的作業(yè),釋放占用資源較少的作業(yè)資源,提高集群資源的利用率。
(2)提出了一種基于負(fù)載均衡的多作業(yè)調(diào)度算法。根據(jù)基于作業(yè)特征的多路K-means聚類算法的分析結(jié)果使用基于負(fù)載均衡的多作業(yè)自平衡輪詢調(diào)度算法提交作業(yè),進一步達到系統(tǒng)負(fù)載均衡。
(3)使用目前最新一代大數(shù)據(jù)計算系統(tǒng)Flink對本文提出的作業(yè)合并算法與多作業(yè)調(diào)度算法的有效性進行了驗證。結(jié)果表明,2種作業(yè)合并算法都可以減少作業(yè)的運行時間,提高系統(tǒng)吞吐量;基于負(fù)載均衡的多作業(yè)調(diào)度算法在最好情況下可減少40%的線程啟動數(shù)。
DAG是分布式計算領(lǐng)域中很常見的一種數(shù)據(jù)結(jié)構(gòu),通常由一系列用戶自定義的算子組成,在各種大數(shù)據(jù)處理系統(tǒng)中都能發(fā)現(xiàn)它的身影,比如Storm、Spark和Flink等。DAG計算將計算任務(wù)分解成為若干個子任務(wù)[7],并將這些子任務(wù)之間的邏輯關(guān)系或順序構(gòu)建成DAG結(jié)構(gòu)。大數(shù)據(jù)計算引擎中的DAG計算通常可以抽象為3層結(jié)構(gòu):應(yīng)用表達層、執(zhí)行引擎層和物理執(zhí)行層。應(yīng)用表達層位于最上層,定義相關(guān)算子和轉(zhuǎn)換,將計算任務(wù)分解成由若干子任務(wù)形成的DAG結(jié)構(gòu),其優(yōu)點是表達的便捷性,便于開發(fā)者快速描述或構(gòu)建大數(shù)據(jù)應(yīng)用。執(zhí)行引擎層介于應(yīng)用表達層和物理執(zhí)行層之間,將應(yīng)用表達層構(gòu)建的DAG計劃任務(wù)通過轉(zhuǎn)換和映射,部署到下層的物理機集群中運行,任務(wù)的調(diào)度[8]、底層的容錯恢復(fù)機制、數(shù)據(jù)與集群信息的傳遞等都要依賴執(zhí)行引擎層。下層是物理執(zhí)行層,即物理集群。
Flink是Apache 開發(fā)的一個同時用于處理批數(shù)據(jù)和流數(shù)據(jù)的有狀態(tài)的計算框架和分布式處理引擎。Flink使用4層DAG結(jié)構(gòu)來描述和表達作業(yè)的執(zhí)行流程,每一層都對作業(yè)執(zhí)行流程做了不同程度的封裝、優(yōu)化和相關(guān)屬性的配置。DAG結(jié)構(gòu)是Flink作業(yè)執(zhí)行和部署的核心,主要包含數(shù)據(jù)流圖(StreamGraph)、作業(yè)圖(JobGraph)、執(zhí)行圖(ExecutionGraph)和物理執(zhí)行圖,F(xiàn)link正是通過這4層圖結(jié)構(gòu)把整個作業(yè)的優(yōu)化、資源分配和算子部署進行分離。Flink的4層DAG結(jié)構(gòu)如圖1所示。
Figure 1 Four-layer DAG structure of Flink
圖1中,數(shù)據(jù)流圖是用戶通過API接口編寫的、用來表達用戶所要執(zhí)行的計劃任務(wù)的邏輯結(jié)構(gòu)。作業(yè)圖是在數(shù)據(jù)流圖的基礎(chǔ)上進行優(yōu)化以及調(diào)整各種參數(shù)配置后的數(shù)據(jù)結(jié)構(gòu),它裹挾著作業(yè)運行期間所需的必要信息。這些信息被客戶端提交到集群中的協(xié)調(diào)中心,即作業(yè)管理器(JobManager)。執(zhí)行圖可以被視作并行化的作業(yè)圖,當(dāng)接收到一個新的作業(yè)圖時,會把其中的每一個算子按照其并行度轉(zhuǎn)化成多個可實際部署的子任務(wù)(在執(zhí)行圖中以Execution表示)。當(dāng)執(zhí)行圖中的一系列子任務(wù)真正在從結(jié)點機器上運行的時候,才會構(gòu)成物理執(zhí)行圖。
目前最流行的大數(shù)據(jù)處理平臺默認(rèn)情況下都以FIFO的形式調(diào)度作業(yè)。Wang等[9]為了解決在虛擬化云環(huán)境中同時運行的多個作業(yè)之間的干擾問題,開發(fā)了數(shù)據(jù)驅(qū)動分析模型,估計多個作業(yè)之間的干擾對作業(yè)執(zhí)行時間的影響,并為此提出了一種干擾感知作業(yè)調(diào)度算法。黃廷輝等[10]通過對分布式系統(tǒng)關(guān)鍵技術(shù)的分析,得出I/O和CPU的不匹配是影響計算性能的一個重要因素,提出合并文件的運行方式,可以減少緩存文件的數(shù)量,提高I/O效率,不過仍存在內(nèi)存成本高的弊端。
Flink系統(tǒng)中資源是按處理槽(Slot)進行劃分的,支持多種已有的成熟的資源管理器,例如Yarn和Mesos等。Storm作為曾經(jīng)最流行的流式大數(shù)據(jù)處理系統(tǒng),默認(rèn)是采用輪詢的調(diào)度方式管理作業(yè)的[11]。Qian等[12]為了解決Storm集群中擴展更多新計算機時帶來的負(fù)載不均衡問題,設(shè)計了S-Storm,為負(fù)載均衡群集中均勻分配Slot??傊壳暗姆植际阶鳂I(yè)調(diào)度算法和資源分配算法都是基于作業(yè)對資源的需求或者集群中結(jié)點資源的使用情況,進行作業(yè)的調(diào)度和資源的分配的,它們面向的是單個作業(yè),并沒有考慮作業(yè)間的關(guān)系對集群性能的影響。
一個復(fù)雜的DAG通常由多種類型的算子組成,有些算子只涉及本地運算,它們以內(nèi)存共享的方式傳輸數(shù)據(jù),運行效率高,給系統(tǒng)增加的負(fù)載小。也有些算子會通過網(wǎng)絡(luò)協(xié)議棧傳輸數(shù)據(jù),除了網(wǎng)絡(luò)本身的不可靠性會增加延遲,還有網(wǎng)絡(luò)緩沖數(shù)據(jù)、序列化、反序列化和用戶態(tài)/內(nèi)核態(tài)之間的切換等耗時操作持續(xù)地占用系統(tǒng)資源。為了便于描述,本文定義了全局算子和本地算子這2個概念。
定義1(全局算子) 全局算子指在分布式集群中,需要從其他結(jié)點獲取數(shù)據(jù)進行處理的算子,如Join和Reduce等。
定義2(本地算子) 本地算子指在分布式集群環(huán)境中,不需要從其他結(jié)點獲取數(shù)據(jù),只對本地數(shù)據(jù)進行處理的算子,如Filter、Map和FlatMap等。
本文在研究作業(yè)合并和作業(yè)調(diào)度時需要提取DAG的相關(guān)特征量,計算作業(yè)之間的差異性并通過聚類算法對作業(yè)進行區(qū)分。聚類算法是一種經(jīng)典的群分析方法[13],它以數(shù)據(jù)間距度量數(shù)據(jù)相似性[14],把相似的數(shù)據(jù)集中到一起,是一種發(fā)現(xiàn)數(shù)據(jù)集內(nèi)部結(jié)構(gòu)特征的無監(jiān)督學(xué)習(xí)算法[15]。聚類算法按聚類思想可以分為:劃分法聚類、密度法聚類[16]、圖論聚類法[17]和網(wǎng)格法聚類等。
本文采用的K-means算法是一種典型的劃分聚類法,其思想是預(yù)先指定聚類數(shù)目和聚類中心,計算點與點之間的距離,把每一個點歸類到與其距離最近的聚類中心。距離的度量方式很多,本文使用歐氏距離(式(1))和曼哈頓距離(式(2))相結(jié)合的方式度量作業(yè)之間的距離,其中n為樣本點維度。
(1)
(2)
歐氏距離從幾何空間的角度衡量元素間的距離,常用于測量度量標(biāo)準(zhǔn)一樣的數(shù)據(jù)間的距離;曼哈頓距離用來計算數(shù)據(jù)在多維屬性上的差之和,可以減弱離群數(shù)據(jù)帶來的影響。
本節(jié)詳細(xì)介紹基于啟發(fā)的作業(yè)合并算法。首先對作業(yè)進行分析,解析作業(yè)的DAG圖,以及作業(yè)任務(wù)量與作業(yè)分配到的內(nèi)存資源之間的關(guān)系;然后分別采用基于并行度的作業(yè)合并算法和基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法,對占用系統(tǒng)內(nèi)存資源較多的作業(yè)進行合并,從而提高系統(tǒng)的吞吐量。
本文采用廣度優(yōu)先遍歷的方式提取作業(yè)執(zhí)行圖中相關(guān)的信息,一個典型的作業(yè)執(zhí)行圖如圖2所示,主要包含以下信息:數(shù)據(jù)源文件路徑、作業(yè)并行度和算子總數(shù)等。
Figure 2 Job execution graph
處理的數(shù)據(jù)量和作業(yè)分配到的內(nèi)存資源需要通過計算獲得。算法根據(jù)文件路徑信息訪問文件大小,從系統(tǒng)配置文件中讀取為Slot分配的內(nèi)存大小。作業(yè)的分類貫穿于信息采集過程,算法根據(jù)數(shù)據(jù)來源、文件大小、作業(yè)分配到的內(nèi)存資源大小和作業(yè)的執(zhí)行邏輯將作業(yè)分為可合并型作業(yè)與不可合并型作業(yè)。在作業(yè)執(zhí)行流的遍歷過程中,算法以矩陣結(jié)構(gòu)存儲頂點間的連接信息,元素值的大小表示算子間的連接數(shù)。表1是對圖2的信息提取。
Table 1 Statistics of the number of connections between operators
并行度決定了作業(yè)在執(zhí)行時所占集群內(nèi)存資源的總量,且和集群中的Slot是對應(yīng)的,意味著并行度相同的作業(yè)將分配到相同大小的內(nèi)存資源。因此,對于沒有充分占用內(nèi)存資源的作業(yè),合并并行度相同的作業(yè),可使2個作業(yè)共用1個作業(yè)的內(nèi)存資源,同時不會對作業(yè)執(zhí)行邏輯造成影響。
影響作業(yè)執(zhí)行的因素有很多,定義3~定義5的3個度量:任務(wù)量大小比值(F)、DAG最大深度比值(D)和DAG全局算子數(shù)比值(G),決定作業(yè)的特征。
定義3(任務(wù)量大小比值(F)) 任務(wù)量大小比值是表示2個作業(yè)處理任務(wù)量大小差異性的重要指標(biāo)之一,其計算如式(3)所示:
(3)
其中,x和y分別表示2個作業(yè)所處理的數(shù)據(jù)集數(shù)量,wf_mi、wf_mj分別表示2個不同作業(yè)處理的文件集合中單個文件的大小。通過實驗得知,F(xiàn)的閾值取值為[0.5,2]。
定義4(DAG最大深度比值(D)) 表示2個作業(yè)的執(zhí)行圖中最長算子鏈長度的比值,它是反映2個作業(yè)DAG差異性最明顯的指標(biāo),其計算如式(4)所示:
(4)
其中,dept_m和dept_n分別表示2個作業(yè)執(zhí)行圖的最大深度。DAG深度越大的作業(yè)執(zhí)行時間越長,因此合并后的作業(yè)在數(shù)據(jù)量相當(dāng)?shù)那闆r下,其執(zhí)行時間取決于合并前DAG深度較大的作業(yè)。D的閾值取值為[0.5,2]。
定義5(DAG全局算子數(shù)比值(G)) 表示2個作業(yè)圖在全局算子數(shù)量上的差異。全局算子和數(shù)據(jù)傳輸緊密相關(guān),是影響作業(yè)執(zhí)行速度的重要指標(biāo)之一,體現(xiàn)2個作業(yè)在傳輸上的差異。其計算如式(5)所示:
(5)
其中,G表示2個并行度相同的作業(yè)的全局算子數(shù)的比值,gol_m和gol_n分別表示2個作業(yè)中全局算子的個數(shù)。DAG中全局算子的個數(shù)越多,執(zhí)行時間越長。通過實驗得知,G的閾值取值為[0.5,2]。
基于并行度的作業(yè)合并算法執(zhí)行過程如算法1所示。
算法1基于并行度的作業(yè)合并算法
輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。
輸出:合并后的作業(yè)mergeJob。
1.forjobinJobsdo
2.ifjob.parallelism==j.parallelism
3.計算j與job任務(wù)量比值F;
4.ifF∈[0.5,2]do
5. 計算j與job的DAG圖最大深度比值D;
6.ifD∈[0.5,2]do
7. 計算j與job的全局算子的比值G;
8.endif
9.ifG∈[0.5,2]do
10.mergeJob=merge(j,job);
11.removejobfromJobs,returnmergeJob;
12.endif
13.endif
14.endif
15.endfor
(1)首先從待合并作業(yè)緩沖池的作業(yè)集中取出一個作業(yè)j,然后遍歷Jobs,從中取出一個與j并行度相同的作業(yè)job。
(2)使用3個度量值衡量作業(yè)job與j的匹配程度,如果job與j在上述3個比值上都能落到對應(yīng)的閾值空間,兩者匹配,調(diào)用merge函數(shù)合并job與j,返回合并后的結(jié)果,終止循環(huán);否則繼續(xù)循環(huán)。
(3)循環(huán)結(jié)束后,檢查mergeJob的值是否為空,如果mergeJob的值為空,說明Jobs中沒有與j并行度相同并且符合3個條件的job,那么j會轉(zhuǎn)而參與基于DAG圖結(jié)構(gòu)差異性的作業(yè)合并計算。
對于作業(yè)緩沖池中剩余的由于F、D、G取值落在閾值空間以外而無法合并的作業(yè),采用基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法處理。
算法以DAG結(jié)構(gòu)差異性為切入點,Slot只隔離內(nèi)存資源,因此為了避免作業(yè)對CPU資源的爭搶,盡量選擇異構(gòu)程度高的作業(yè)進行合并。算法增加2個度量為基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法提供支持。
定義6(作業(yè)并行度比值(P)) 作業(yè)并行度是作業(yè)最明顯的特征之一,并行度比值是衡量2個作業(yè)在并行度上的差異最明顯的指標(biāo)。其計算如式(6)所示:
(6)
其中,P表示2個作業(yè)并行度的比值,parallelism_m和parallelism_n表示2個作業(yè)的并行度。并行度是對應(yīng)于集群中的Slot數(shù)量,因此基于DAG的作業(yè)合并算法在合并作業(yè)時首先需要考慮的就是作業(yè)并行度。通過實驗得知,P的閾值取值為[0.5,2]。
定義7(DAG結(jié)構(gòu)相似性(S)) DAG結(jié)構(gòu)相似性反映2個作業(yè)在執(zhí)行邏輯上的差異,以歐氏距離為基礎(chǔ)定義了DAG結(jié)構(gòu)相似性,其計算如式(7)所示:
(7)
其中,o表示算子的數(shù)量。
在特征提取過程中使用矩陣保存作業(yè)執(zhí)行流程圖的基本信息,M和N分別表示存儲作業(yè)執(zhí)行流程圖基本信息的矩陣,Mij和Nij分別表示矩陣中的元素。算法執(zhí)行過程如算法2所示。
算法2基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法
輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。
輸出:合并后的作業(yè)mergeJob。
1.按并行度大小給Jobs中的作業(yè)從小到大排序
2.中間作業(yè)集合為midJobs;
3.forjobinJobsdo
4. 計算j與job任務(wù)量比值F;
5.ifF∈[0.5,2]do
6. 計算j與job的DAG圖最大深度比值D
7.ifD∈[0.5,2]do
8. 計算j與job的全局算子的比值G
9.ifG∈[0.5,2]do
10. 計算j與job并行度比值P
11.ifP∈[0.5,2]do
12. addjobtomidJobs
13.endif
14.endif
15.endif
16.endif
17.endfor
18.forjobinmidJobs
19.計算j與jobDAG圖矩陣間的歐氏距離U;
20.更新U獲取最小值,并記錄相應(yīng)的job;
21.endfor
22.mergeJob=merge(j,job)
23.returnmergeJob
(1)從待合并作業(yè)中取出一個作業(yè)j,然后遍歷Jobs,獲取一個與j并行度相同的作業(yè)job;
(2)在循環(huán)中使用4個度量值衡量作業(yè)job與作業(yè)j的匹配程度,如果符合對應(yīng)的閾值空間,則把作業(yè)job加入到中間作業(yè)集midJobs中;
(3)遍歷中間作業(yè)集合midJobs,使用歐氏距離從中間數(shù)據(jù)集合中選出與作業(yè)j在歐氏距離上相似度最小的作業(yè)job,合并作業(yè)j與job并返回結(jié)果。
除了作業(yè)合并之外,作業(yè)的執(zhí)行順序與調(diào)度策略也是影響多作業(yè)執(zhí)行效率的重要因素。因此,本文提出基于負(fù)載均衡的多作業(yè)調(diào)度算法,其由3部分組成:
(1)預(yù)處理模塊:進行相關(guān)特征的提取工作,包括作業(yè)并行度、算子個數(shù)和算子類型等;(2)分類模塊:采用K-means聚類算法根據(jù)提取的特征信息對作業(yè)進行聚類分析,聚類算法在負(fù)載均衡方面應(yīng)用廣泛[18,19],經(jīng)過聚類把作業(yè)分成3個類別:大作業(yè)、中等作業(yè)和小作業(yè);(3)調(diào)度模塊:調(diào)度模塊根據(jù)聚類結(jié)果,使用自平衡輪詢調(diào)度算法計算作業(yè)的提交順序,同時充分利用集群的Slot資源,防止Slot閑置。
基于負(fù)載均衡的多作業(yè)調(diào)度算法主要使用作業(yè)并行度、算子總數(shù)、各類型算子個數(shù)和作業(yè)圖深度為參考,通過遍歷對信息進行采集。該算法執(zhí)行過程如算法3所示。
算法3DAG特征提取算法
輸入:作業(yè)DAG結(jié)構(gòu)圖Plan。
輸出:提取到的信息集合infoList。
1.fornodeinPlando
2.max=Math.max(max,BFS(node));
3.ifnodeis not visited
4. add node’s characters toinfoList,node.visited=true;
5.node相連接的未被訪問的頂點入隊列Q;
6.whileQis not emptydo
7.v=Q頭元素出隊列;
8.addv’s characters toinfoList,v.visited= true;
9.v相連接的未被訪問的頂點入隊列Q;
10.endwhile
11.endif
12.infoList.max=max
13.endfor
14.returninfoList
(1)使用深度優(yōu)先搜索DFS(Depth First Search)計算從Sink算子到距離最遠(yuǎn)的Source算子的距離,并記錄在max中;如果node頂點未被訪問過,將頂點信息存入infoList中。
(2)將與node頂點相連的頂點加入隊列Q,如果Q不為空,從Q中取出一個頂點v,將v的信息記錄到infoList中,與v相連的未訪問過的頂點加入隊列。
(3)更新infoList中的DAG深度,for循環(huán)直到遍歷完P(guān)lan中的所有頂點,返回infoList。
聚類分析模塊將根據(jù)特征信息對作業(yè)進行分類,使用4種數(shù)據(jù)度量作業(yè)之間的相似性,分別是作業(yè)并行度、各類算子個數(shù)、作業(yè)執(zhí)行流程圖深度和全局算子的個數(shù)。算法采用歐氏距離與曼哈頓距離相結(jié)合的方式測量作業(yè)間的距離。ope[i]是以數(shù)組的形式存儲,dept、全局算子ops的大小是衡量作業(yè)流程復(fù)雜性的度量標(biāo)準(zhǔn)。
定義8(作業(yè)在不同算子類型上的差異性) 算子及算子類型最能區(qū)分作業(yè)的不同,算子類型的差異性反映了作業(yè)的總體差異性,其計算如式(8)所示:
(8)
其中,mope[i]與nope[i]分別為作業(yè)m與作業(yè)n的不同類型的算子的個數(shù)。
定義9(作業(yè)在DAG結(jié)構(gòu)深度上的差異性) DAG結(jié)構(gòu)深度是作業(yè)最明顯的特征之一,它描述了作業(yè)運行時數(shù)據(jù)流通的最大路徑,其計算如式(9)所示:
distancedept(m,n)=|mdept-ndept|
(9)
其中,mdept與ndept分別為作業(yè)m與作業(yè)n的DAG結(jié)構(gòu)深度。
定義10(作業(yè)在Task線程數(shù)上的差異性) 作業(yè)在集群中開啟的線程數(shù)直接反映作業(yè)對系統(tǒng)CPU資源的占用量,作業(yè)在Task線程數(shù)上的差異性計算如式(10)所示:
distancetasknum(m,n)=|mpara*mops-npara*nops|
(10)
其中,mpara與npara分別為作業(yè)m與作業(yè)n的并行度,mops與nops分別為作業(yè)m與作業(yè)n的全局算子數(shù)量。
定義11(作業(yè)的差異性) 本文從3個角度衡量了作業(yè)之間的差異性,其計算如式(11)所示:
distance(m,n)=distanceope(m,n)+distancedept(m,n)+distancetasknum(m,n)
(11)
本文提出的基于作業(yè)特征的多路K-means聚類分析算法如算法4所示。
算法4基于作業(yè)特征的多路K-means聚類分析算法
輸入:作業(yè)及其特征集合PlanList。
輸出:聚類結(jié)果ClusterResult。
1. 根據(jù)并行度乘以算子總數(shù)的大小對PlanList進行排序;
2. 獲取初始聚類中心點集合;
3.fori=1 to 3do
4.center_i=K_means(PlanList,center_i);
5.endfor
6.fori=1 to 3do
7. 計算每個聚類中心點將PlanList劃分的程度;
8.endfor
9.center=K_means(PlanList,center);
10.根據(jù)center將PlanList分組并放入ClusterResult中
11.returnClusterResult
(1)對作業(yè)及其特征集合PlanList按并行度乘以算子總數(shù)大小進行排序。
(2)從排好序的PlanList中選擇3個作業(yè)作為聚類中心;以排好序的PlanList的隊列頭作業(yè)、隊列尾作業(yè)和中間作業(yè)作為聚類中心;從排好序的PlanLsit中分別取隊列頭3個作業(yè)、隊列中間3個作業(yè)、隊列尾部3個作業(yè),取其平均值作為聚類中心。
(3)調(diào)用K_means算法循環(huán)更新每個聚類中心的值;計算每個聚類中心將PlanList劃分的程度,劃分程度度量標(biāo)準(zhǔn)為,聚類結(jié)果每類作業(yè)的數(shù)量越平均越好。選取聚類結(jié)果好的2個聚類中心取其平均值,調(diào)用K_means算法進行最后聚類;計算聚類結(jié)果,并輸出結(jié)果。
通過多路聚類的方式優(yōu)化了聚類中心點的選取,通過基于作業(yè)特征的多路K-means聚類分析可以把作業(yè)集合根據(jù)聚類中心點聚集成3個作業(yè)類別,為算法提供可靠的支持。
本文以輪詢調(diào)度法[20 - 23]為基礎(chǔ)實現(xiàn)了多作業(yè)的提交優(yōu)化,目的是在不浪費集群Slot資源的情況下,使集群開啟的Task線程數(shù)量保持平穩(wěn),以此達到在多作業(yè)情況下平衡集群性能的目的。集群中作業(yè)工作的線程數(shù)量是由作業(yè)并行度和算子個數(shù)決定的,因此控制作業(yè)的提交順序,可以達到控制集群開啟的Task線程數(shù)量的目的。作業(yè)能否提交成功取決于集群剩余并行度是否滿足作業(yè)的并行度需求,如果作業(yè)的并行度比集群中可用并行度大,作業(yè)就會被拒絕,因此輪詢的作業(yè)提交方式并不會嚴(yán)格執(zhí)行,而且集群空閑的Slot資源會隨著作業(yè)的提交和結(jié)束動態(tài)地變化。針對這種情況本文設(shè)計了自平衡的輪詢調(diào)度算法,如算法5所示。
算法5基于負(fù)載均衡的多作業(yè)自平衡輪詢調(diào)度算法
輸入:聚類結(jié)果ClusterResult;最后的聚類中心center。
輸出:下一個提交的作業(yè)Job。
1. 對K-means聚類結(jié)果收集排序;
2. 平分排好序的作業(yè)到3個隊列中,并設(shè)置指針p;
3.翻轉(zhuǎn)隊列midQueue、minQueue,查詢集群剩余Slot;
4.ifslotNum> 0do
5.ifjobNum> 0do
6.pre=p;queue=Queue[p];
7.whilequeueis not emptydo
8.max= 0;
9.foreleminqueuedo
10.ifelem.parallelism≤slotNumdo
11.ifmax 12.job=elem;max=elem.parallelism; 13.endif 14.endif 15.endfor 16.endwhile 17.ifmax!= 0do 18.p=(p++)%3; 19.endif 20.ifmax== 0do 21.p=(p++)%3; 22.ifp==predo返回 4; 23.endif 24. 返回 7; 25.endif 26.endif 27.endif (1)對K-means聚類產(chǎn)生的3個集合中的元素按元素距離聚類中心點的距離大小進行排序;比較3個聚類中心點的大小,按聚類中心點的大小,從大到小合并3個排好序的作業(yè)集合。 (2)將合并后的集合平均分成3份,并放入3個隊列中,將midQueue和minQueue隊列進行逆轉(zhuǎn)。 (3)每隔5 s查詢一次集群剩余Slot資源,從指針指向的隊列開始,遍歷隊列中的元素找到集群中空閑Slot資源能滿足的最大并行度的作業(yè)提交。每次提交作業(yè)后,修改指針指向下一隊列。 (4)對3個集合進行判斷,如果出現(xiàn)隊列為空,并且總作業(yè)的數(shù)量大于2,按順序收集3個集合中的隊列,再平分所有的作業(yè)到3個集合中,并更改指針使其指向midQueue,否則不再進行作業(yè)收集。 本文使用2種類型的作業(yè)來進行對比實驗,一種是單詞統(tǒng)計(WordCount),另一種是表連接(Join)。因為全局算子中最復(fù)雜的算子就是Join類型算子,其他簡單類型的算子使用最多的是Filter、Map和FlatMap,因此WordCount作業(yè)和Join作業(yè)足以覆蓋實際應(yīng)用中的大部分場景。 本文實驗采用大數(shù)據(jù)測試基準(zhǔn)TPC-H生成的數(shù)據(jù)集,是事務(wù)性能管理委員會TPC(Transaction Processing Performance Council)發(fā)布的權(quán)威數(shù)據(jù)庫評測基準(zhǔn),可以保證生成的模擬數(shù)據(jù)具有真實性、客觀性與健壯性。在WordCount實驗中本文選用5個基本的大數(shù)據(jù)集來模擬批處理環(huán)境中的大規(guī)模數(shù)據(jù)處理。在表連接實驗中,本文選取TPC-H生成的Lineitem表和Orders表作為數(shù)據(jù)源,其中Lineitem有16個字段,前3個字段Orderkey、Partkey和Suppkey是主鍵。Orders表有9個字段,前2個字段Orderkey和Custkey是主鍵。 實驗的評估指標(biāo)有3個: (1)作業(yè)運行時間:在相同硬件條件下,任務(wù)量相同、處理邏輯相同的作業(yè)處理速度越快,表明系統(tǒng)性能越好。 (2)作業(yè)吞吐量:單位時間內(nèi)集群處理的平均數(shù)據(jù)量大小,即被處理的總數(shù)量(totalDataVolume)與運行總時間(totalProcessTime)的比值,其定義如式(12)所示: (12) (3)集群開啟的最高Task線程數(shù):本文提出的基于負(fù)載均衡的多作業(yè)調(diào)度算法以降低集群同一時刻開啟的最高Task線程數(shù)為首要目標(biāo)。 本文所描述的相關(guān)技術(shù)細(xì)節(jié)均在Flink 1.8.0版本中進行實現(xiàn),實驗運行的軟硬件環(huán)境如下所示: (1)硬件環(huán)境:采用的分布式環(huán)境由4臺服務(wù)器組成,1臺主結(jié)點,3臺從結(jié)點,結(jié)點之間通過千兆以太網(wǎng)連接。配置為:CPU:Intel Xeon E5-2603 V4 *2,核心數(shù)目:6核心;內(nèi)存:128 GB(從結(jié)點64 GB);硬盤:400 GB SSD。 (2)軟件環(huán)境:操作系統(tǒng):CentOs 7;Flink版本:1.8.0,JDK版本:1.8.0;存儲環(huán)境:Hadoop 2.7.5。 (1)基于并行度的作業(yè)合并算法實驗。 作業(yè)合并算法實驗對一對相同的WordCount作業(yè)和一對不同的Join作業(yè)分別進行順序執(zhí)行和合并執(zhí)行。表2展示了作業(yè)的基本信息。 Table 2 Job sets information for experiment 1 圖3對比了2個WordCount作業(yè)在相同實驗環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結(jié)果。其中圖3a對比了執(zhí)行時間,合并執(zhí)行的執(zhí)行時間減少了5%~23%。在內(nèi)存資源足夠使用的前提下,單個WordCount程序?qū)篊PU的利用沒有達到時刻滿負(fù)荷運行的狀態(tài),所以作業(yè)合并不僅能提高集群的內(nèi)存資源利用,也能提升集群CPU資源的利用。圖3b對比了吞吐量,采用了作業(yè)合并算法后系統(tǒng)可以更快地到達吞吐量峰值。 Figure 3 Results of WordCount job merging based on the number of parallelism 圖4對比了Join1和Join2作業(yè)在相同實驗環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結(jié)果。其中圖4a對比了運行時間,圖4b對比了系統(tǒng)吞吐量。盡管效果不如WordCount作業(yè)明顯,但基于并行度的作業(yè)合并算法對運行時間仍有一定縮減,吞吐量提升效果在4.5%~20%。 Figure 4 Results of Join job merging based on the number of parallelism (2)基于DAG結(jié)構(gòu)差異的作業(yè)合并算法實驗。 實驗先后提交了2個并行度不同的WordCount作業(yè)和Join作業(yè),來模擬基于DAG結(jié)構(gòu)差異性的作業(yè)合并。 圖5和圖6從運行時間和吞吐量2個方面展示了作業(yè)合并算法的提升效果。合并執(zhí)行的執(zhí)行時間明顯低于順序執(zhí)行的總時間,并且差距明顯,因為本實驗不是在滿并行度的條件下進行的,實際執(zhí)行時可能會出現(xiàn)不同情況,對于WordCount作業(yè),基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法具有明顯的優(yōu)勢。 Figure 5 Results of WordCount job merging based on DAG structure difference Figure 6 Results of Join job merging based on DAG structure difference (3)基于負(fù)載均衡的多作業(yè)調(diào)度算法實驗 對于多作業(yè)調(diào)度算法,實驗以4個作業(yè)為基礎(chǔ),表3列出了作業(yè)算子的基本信息,這些作業(yè)特征信息是衡量作業(yè)之間差異性的關(guān)鍵。實驗?zāi)M了7個任務(wù)量大小不同的作業(yè),采用隨機的方式模擬了作業(yè)的提交順序,將其執(zhí)行結(jié)果與多作業(yè)調(diào)度算法的結(jié)果進行比較。表4展示了作業(yè)對應(yīng)的編號以及其處理任務(wù)量信息,表5展示了優(yōu)化前后作業(yè)的提交順序。 Table 3 Job sets information for experiment 3 Table 4 Job number and corresponding processing task volume Table 5 Order of job submission 圖7展示了基于負(fù)載均衡的多作業(yè)調(diào)度算法的提升效果。從圖7a可以看出,通過調(diào)優(yōu)作業(yè)的提交順序可縮短作業(yè)處理的時間,但存在某些按FIFO提交模式的順序比優(yōu)化后的輪詢提交順序要好,該情況的出現(xiàn)是因為算法在執(zhí)行過程中并未考慮到任務(wù)量的大小。從圖7b可以看出,基于負(fù)載均衡的多作業(yè)調(diào)度算法在吞吐量性能上提升了5%左右。圖7c 展示了集群開啟的Task線程數(shù)對比,基于負(fù)載均衡的多作業(yè)調(diào)度算法執(zhí)行作業(yè)集時,集群開啟的最大線程數(shù)在多數(shù)情況下有所減少,最好情況下減少了40%的線程。 Figure 7 Running results of multi-job scheduling algorithm based on load balancing 本文通過分析作業(yè)與系統(tǒng)資源之間的關(guān)系,以及作業(yè)與作業(yè)之間的關(guān)系,提出并實現(xiàn)了提高集群資源利用率和負(fù)載均衡能力的算法,本文提出的優(yōu)化主要包含2個方面: (1)提出了啟發(fā)式的作業(yè)合并算法,通過分析作業(yè)任務(wù)量和作業(yè)分配到的集群資源之間的關(guān)系,合并對集群資源利用率低的作業(yè),使它們共用同一個作業(yè)的資源。該算法解決了集群部分作業(yè)資源利用率低的問題,并通過實驗驗證了該算法在不同類型作業(yè)上對集群性能提升的有效性。 (2)提出了基于負(fù)載均衡的多作業(yè)調(diào)度算法,首先對作業(yè)進行特征提??;然后通過多路K-means聚類算法將作業(yè)分為3類:大作業(yè)、中等作業(yè)和小作業(yè);之后采用自平衡輪詢調(diào)度算法提交分類好的作業(yè),保證大作業(yè)不會在集群中集中執(zhí)行,降低了集群由于開啟過多線程造成集群性能下降的概率,并通過實驗驗證了該算法的有效性。 分布式系統(tǒng)在多作業(yè)執(zhí)行層面還有許多需要優(yōu)化和提高的部分,未來可繼續(xù)研究的問題有: (1)動態(tài)調(diào)度。目前的分布式大數(shù)據(jù)處理系統(tǒng)未能做到在作業(yè)執(zhí)行過程中動態(tài)地調(diào)整作業(yè)的執(zhí)行流程,這種方式不利于資源的動態(tài)回收與共享。針對這一問題,系統(tǒng)需要做出相應(yīng)的優(yōu)化和改進。 (2)優(yōu)化多作業(yè)并行度。并行度是作業(yè)執(zhí)行的關(guān)鍵,目前在分布式大數(shù)據(jù)處理平臺的應(yīng)用中,一般都是從業(yè)人員根據(jù)數(shù)據(jù)與業(yè)務(wù)特性手動優(yōu)化并行度,這樣就給并行度的優(yōu)化帶來了很多困難。因此,研究和設(shè)計出一套并行度設(shè)置的優(yōu)化方案,也是分布式大數(shù)據(jù)系統(tǒng)應(yīng)用方面的一個研究課題。6 實驗
6.1 數(shù)據(jù)集與評估指標(biāo)
6.2 實驗環(huán)境設(shè)置
6.3 實驗結(jié)果與分析
7 結(jié)束語