廖旺堅,黃永峰,包從開
(1.清華大學電子工程系,北京 100084;2.清華大學信息科學與技術國家實驗室(籌),北京 100084)
集群并行計算是指在大量性能較低、可靠性較差的通用計算機上并行完成巨大的計算任務,是大數據時代滿足低成本/高計算能力需求的最佳選擇。Hadoop是一個開源的集群并行計算框架,包含MapReduce計算模型和Hadoop分布式文件系統(tǒng)HDFS(Hadoop Distributed File System)存儲模型,具有硬件要求低、吞吐量大、容錯性好的特點,滿足了大數據計算的需要,成為當前工業(yè)界最流行的計算框架,也是學術界研究熱點。但是,隨著數據量迅猛增長,計算復雜度不斷增加,計算實時性越來越高,Hadoop計算速度較慢的弱點逐漸顯現。于是,Zaharia等[1 - 3]基于Hadoop,在前人不斷改進的基礎上重新實現其調度模型,形成了Spark計算框架。
Spark是一個快速、通用的大規(guī)模數據處理引擎[4],具有快速、易用、通用、兼容性好的特點。它使用有向無環(huán)圖DAG(Directed Acyclic Graph)進行作業(yè)調度,根據依賴關系將不同作業(yè)分解成大量子任務組成的DAG,減少了不同作業(yè)之間的數據傳遞過程,并且將前后作業(yè)間的中間數據使用內存緩存?zhèn)鬟f,使用線程池運行計算任務,使得計算速度相比傳統(tǒng)的Hadoop MapReduce有100倍量級的提升[3]。
目前對Spark性能的研究主要有三類:第一類是在Spark模型層面對作業(yè)調度、底層資源管理等方面做改進和優(yōu)化;第二類是在提交Spark作業(yè)時改變不同的參數,找到最優(yōu)值;第三類是在應用層面,編程時使用不同的實現方法,提高作業(yè)運行性能。Ousterhout等[5]對Spark性能做了詳盡測試和分析,認為CPU可以優(yōu)化的地方較少,而網絡使用遠未達到飽和狀態(tài),其優(yōu)化對性能的影響很小,但該文獻關于內存對性能的影響很少提及。陳僑安等[6]對大量舊任務提交參數和性能進行分析,對比任務相似度,預測出新任務最適合的參數,屬于第二類的方法。楊志偉等[7]改變任務調度策略,優(yōu)先將任務分發(fā)給資源空余度高的結點,提高集群整體運行性能,屬于第一類的方法。Gog等[8]采用內存分區(qū)管理的思路,給應用中處于生命周期的數據對象劃定內存區(qū)塊,運行時不對此區(qū)塊進行GC(Garbage Collector)操作,但這樣需要給每個對象指定明確的生命周期,使得編程更為復雜。Gidra等[9]針對分布式系統(tǒng)的GC進行改造,使各結點GC進程互相發(fā)布消息,避免無效的GC操作,提高運行效率,但增加了結點間通信消耗。
Spark的特點是大量使用內存提高計算速度,因此內存資源是Spark最基本的瓶頸,通過內存優(yōu)化,可以提升Spark的性能。本文針對第二類和第三類方法存在的不足,圍繞內存在Spark作業(yè)中的分配方法和作用原理,提出不同的內存優(yōu)化策略,并設計了相應的實驗進行論證。
Spark的作業(yè)執(zhí)行流程如圖1所示,用戶向Driver結點提交需要運行的作業(yè)后,Driver結點創(chuàng)建SparkContext。SparkContext根據程序中的Action操作,將作業(yè)劃分為連續(xù)的Job流,并依次提交到有向無環(huán)圖調度器DAGScheduler。DAGScheduler對提交的Job進行解析,生成由不同Stage流組成的有向無環(huán)圖,然后依次將每個Stage提交到線程調度器TaskSchedulerImpl。TashSchedulerImpl為Stage生成線程集TaskSet,并將TaskSet分發(fā)到各個工作結點Worker的線程管理器TaskManager。TaskManager根據收到的信息啟動Task開始運算,運算結束后得到的數據可能被下一個Stage所使用,稱為中間數據,由數據塊管理器BlockManager進行管理。BlockManager在內存充足的時候將中間數據存儲到內存,以提高訪問速度,在內存不夠的時候將中間數據存儲到本地磁盤、HDFS或丟棄。
Figure 1 Spark application execution flow圖1 Spark作業(yè)執(zhí)行流程
Spark作業(yè)運行在Java進程中,一個Java進程對應一個JVM。JVM在運行過程中管理的內存空間叫堆空間,用于存儲程序中創(chuàng)建的對象,在運行過程中可根據配置動態(tài)變化。如圖2所示,JVM堆空間分為年輕代(Young Generation)、年老代(Old Generation)和持久代(Permanent Generation),一般年輕代存放短生命周期的對象,年老代存放長生命周期的對象,持久代存放類、方法等元數據,年輕代又可分為Eden、Survivor1(S1)和Survivor2(S2)區(qū)間。
Figure 2 Memory management comparison between Spark and JVM圖2 Spark和JVM的內存管理對應關系
JVM運行過程中會使用垃圾收集器GC對內存數據進行清理。新生成的對象存放在Eden區(qū)間,Eden區(qū)間滿了后,會觸發(fā)Minor GC,將Eden和Survivor1區(qū)間仍然存活的對象復制到Survivor2區(qū)間。下一次Eden區(qū)間再滿,再觸發(fā)Minor GC,將Eden和Survivor2區(qū)間仍然存活的對象復制到Survivor1區(qū)間,如此不斷往復。當對象生命周期足夠長或Survivor區(qū)間滿了后,對象會被轉移到年老代區(qū)域。當年老代快滿的時候,會觸發(fā)Full GC,清理年老代不需要使用的對象。當持久代有類不需再使用并且需要空間存放別的數據時,會觸發(fā)Major GC,清理持久代的數據。
當GC被觸發(fā)時,JVM中所有運行的線程會暫時停止運行,等待GC完成后繼續(xù)。如果一次作業(yè)中GC被觸發(fā)的次數特別多,就會顯著延長作業(yè)執(zhí)行總時間,降低系統(tǒng)性能。其中Full GC需要時間比Minor GC更長,對性能影響更大。Java官方文檔[10]顯示,在理想情況下,使用30%的時間作為GC消耗,在使用2個CPU內核時,系統(tǒng)吞吐量會下降到80%,而使用32個CPU內核時,系統(tǒng)吞吐量會下降到10%。而Spark作業(yè)處理大規(guī)模數據,通常都會使用較多的CPU內核,因此GC對性能的影響需要引起重視。
Spark框架同樣對所使用的內存進行管理,其管理的內存主要有兩種用途。如圖2所示,第一種是用于程序執(zhí)行,如join、sort、shuffle、aggregation等算子執(zhí)行時使用的內存;第二種是用于存儲,如RDD(Resilient Distributed Datasets)數據緩存和廣播變量存儲等。用于這兩種用途的內存使用一個統(tǒng)一的空間(如圖2中的M,其存儲空間大小為M),Spark使用一個參數spark.memory.fraction設置M的大?。?/p>
spark.memory.fraction=M/(Java堆空間-300 MB)(默認0.6)
在內存M中,一部分用于任務執(zhí)行(如圖2中的Execution,其空間大小為E),一部分用于緩存數據(如圖2的Storage,其存儲空間大小為R)。Spark使用參數spark.memory.storageFraction設置兩者比例:
spark.memory.storageFraction=R/M(默認0.5)
從圖2可以看出,Spark管理的內存和JVM管理的堆空間屬于同一段物理內存空間,只是在不同層面上進行管理。Spark管理的內存基本等于JVM堆空間的年輕代和年老代的空間,一部分用于執(zhí)行運算和存儲(M),另一部分空間用于Spark的類等元數據的存儲。
我們對影響Spark作業(yè)執(zhí)行性能的主要因素進行分類,如圖3所示。
Figure 3 Influencing factors of Spark application performance圖3 Spark作業(yè)性能影響因素示意圖
圖3中可以看出,目前對于Spark的優(yōu)化一般從修改源代碼改變系統(tǒng)作業(yè)的執(zhí)行模型,提交作業(yè)時調整運行參數和編寫作業(yè)程序時算法優(yōu)化等幾個方面入手。系統(tǒng)運行參數又可分為CPU參數、內存參數和I/O網絡參數等。CPU參數配置包括單執(zhí)行容器使用的核數、單核的線程數等。網絡和I/O的配置包括緩沖區(qū)大小、數據分塊大小和文件并發(fā)數量等。內存的運行參數配置方面,可以通過改變單容器內存大小、對緩存數據進行預處理、改變JVM和Spark管理內存的分配比例等,對作業(yè)性能產生影響。
一般數據從磁盤讀取到內存以后,占用空間都會不同程度地增大。這主要是由于Java的對象存儲格式所決定的,如每個String對象會占用40 Bytes空間存儲文本宏信息,則平均每行40個字符的中文數據集轉化成RDD后,占用內存會比原始數據大50%。而對于英文文本來說,內存中存儲的UNICODE格式或UTF-16格式的字符,占用內存空間會比磁盤上存儲的UTF-8或ASCII碼格式大一倍。
Spark將作業(yè)運行過程中產生的RDD緩存到內存時,RDD中每一行的數據指針等頭數據都會緩存下來,而這會占用大量內存空間。序列化可以將RDD轉化為字節(jié)序列存儲,減小緩存數據占用空間,如果序列化后再使用壓縮算法對字節(jié)序列進行壓縮,能更大程度減少空間占用。特別是在內存空間緊張時,減少緩存數據大小,更多內存可以用于代碼執(zhí)行,減少系統(tǒng)的GC觸發(fā)次數,有效避免或減少對外存的使用,提高運行性能。
使用硬件資源越多,系統(tǒng)性能越好,這是業(yè)界普遍的認識。數據在內存中緩存是以RDD分塊的方式存在的,分塊的大小默認跟HDFS存儲文件的分塊大小一致。作業(yè)運行過程中,如果空閑內存不夠存下整個分塊,系統(tǒng)會移除掉部分舊的緩存數據,存放新的數據。如果移除掉舊的部分數據還不夠的話,系統(tǒng)會放棄使用內存緩存,改用外部存儲緩存或不使用緩存。
對于數據量大而計算代價小的作業(yè)而言,使用足夠大的內存,可以使得內存在緩存全部數據后還有足夠的空間執(zhí)行代碼,作業(yè)執(zhí)行過程不會觸發(fā)GC,系統(tǒng)性能可以達到最佳。而在可使用的運行內存較小時,系統(tǒng)調度策略會發(fā)生改變,減少數據的緩存操作,改由重新計算得到原來緩存的數據。如果減少緩存的數據很大,則騰出來的內存空間更多,反而會降低JVM的GC觸發(fā)次數,在重新計算消耗小的情況下,會使得整個系統(tǒng)運行過程穩(wěn)定,性能提高。
文獻[10]指出,JVM的GC算法是基于“Java程序中大部分的對象都是短生命周期”這一假設,即大部分的對象在Minor GC時就會被清理掉,沒機會轉入到年老代,因此GC過程中Full GC較少,對系統(tǒng)性能影響不大。而Spark中緩存數據在整個作業(yè)期間會一直存在,對于JVM來說都是長生命周期對象,并且普遍數據量大,占用大量的JVM堆空間。大數據背景下的計算任務,一個數據塊很容易就占滿所有的年老代空間,導致傳統(tǒng)GC不能起到應有的作用,反而降低系統(tǒng)性能。通過改變JVM中年輕代和年老代的比例,改變Spark中執(zhí)行內存和存儲內存的比例,可以調節(jié)JVM的GC消耗,減少不利影響,提高系統(tǒng)性能。
下一節(jié)我們運行大量Spark作業(yè),使用上述不同策略優(yōu)化,驗證不同優(yōu)化策略下參數配置對系統(tǒng)性能的影響。
為驗證和分析優(yōu)化策略的效果,搭建了Spark運行平臺。Spark計算層設置4個結點,其中1個主結點,3個工作結點,每個工作結點32 GB內存,8個CPU內核。HDFS存儲層設置6個結點,其中1個主結點,5個數據結點,HDFS文件塊大小設置為128 MB。
為驗證優(yōu)化策略,設計了兩類計算任務:
任務A對不同大小的中文微博數據集(1 GB~100 GB)進行詞頻統(tǒng)計。任務要求將數據集讀出后使用不同的方式緩存,然后做詞頻統(tǒng)計工作。
任務B對2 227萬條60維的向量數據聚類計算,數據以文本的格式存儲在HDFS上,共9.82 GB,79個分塊。任務要求將數據讀入后,轉化為Vector向量,然后再以不同方式緩存,最后進行迭代次數不大于8次的KMeans聚類操作。
實驗過程中,采用不同的參數配置運行A類任務和B類任務,觀察任務運行時間、GC詳情、線程執(zhí)行時間、數據塊存取、資源占用情況等指標,并作記錄和分析。
表1顯示了任務A的數據經過不同方式處理后,緩存占用的空間大小和作業(yè)運行時間。任務中數據集為1 096 MB的中文微博文本數據,共8 546萬行??梢钥闯觯琒park將數據導入后,占用空間比在磁盤占用空間增加17%,而序列化以后,空間減少14%,如果序列化后再進行壓縮,則占用空間比不處理減少42%。在運行時間上,壓縮比不壓縮平均降低運行時間2 s,同時GC時間減少80%,說明緩存空間的減少能夠有效降低GC消耗。但是,最終不處理情況下運行時間是最小的,因為序列化和壓縮,包括讀取緩存后的反序列化和解壓縮過程本身需要占用資源。序列化后再壓縮相比序列化后不壓縮,雖然壓縮過程需要時間,但壓縮后占用空間更少,GC消耗大大降低,降低的GC消耗時間遠超過壓縮過程增加的時間,總作業(yè)運行時間反而比序列化后不壓縮降低18%。
Table 1 Effects of different processingmethods on data size and performance表1 不同處理方式對數據大小和運行性能的影響
圖4顯示了對任務B用不同緩存方式處理后的性能。任務中數據直接緩存占用空間約10.7 GB,序列化后占用空間約10.3 GB。如果使用壓縮后再緩存的方式,占用空間約7.8 GB,減少24%,與文本數據的空間占用規(guī)律基本一致??梢钥闯觯谥苯邮褂孟到y(tǒng)內存和HDFS緩存中間數據的情況下,隨著提交內存的增加,系統(tǒng)運行性能提高,這是因為隨著內存的增加減少了對HDFS的使用,所以運行速度增加。如果將外部存儲方式由HDFS改為DISK,我們可以看到,在內存較少的情況下,系統(tǒng)運行時間比HDFS方式降低很多,因為去除了網絡消耗和多備份消耗后,本地硬盤的讀寫比HDFS讀寫消耗的時間要短。
Figure 4 Performance comparison of different caching modes圖4 不同緩存方式性能對比
從以上兩例中可以看出,減小緩存占用的空間可以減少讀寫時間,降低GC消耗,提高系統(tǒng)性能。但序列化和壓縮兩種減少空間的過程本身需要時間,在“一次緩存,多次使用”和內存嚴重受限的情況下,適用于這種方式提高系統(tǒng)性能。而在內存不夠時,用本地磁盤代替HDFS存儲中間數據,能夠降低讀寫時間消耗,在通常的情況下都能比較適用。
圖5顯示了任務B在提交作業(yè)時使用不同內存大小,對任務運行的總時間和GC占用的時間的影響??梢钥闯觯珿C時間和運行總時間有明顯的正相關性,GC耗時長則運行總時間長。在內存小的時候,GC用時明顯較大,在內存大的時候,GC時間則非常之少。當內存從1 500 MB到4 000 MB時,GC時間逐步下降,任務運行時間也隨內存增多而下降。內存大于6 000 MB以后,GC用時基本可以忽略不計,而運行總時間趨于穩(wěn)定。在內存小于1 600 MB時,提交內存越大反而運行時間越長,這符合策略3分析的結果。不過直接計算速度比不上內存緩存,所以相比最佳性能還是差一些。當內存在1 500 MB左右時,GC用時和任務運行總時間達到最高,GC用時占到了總用時的30%。
從這里可以看出,在提交Spark作業(yè)的時候,在內存足夠的情況下,應盡量使用更多的內存,而內存有限的情況下,也并不是內存越少性能就越差,同時也要避免如實驗中“1 500 MB”這樣的區(qū)間。
Figure 5 Task running time and GC time scatter distribution圖5 任務運行時間與GC時間散點分布
圖6展示了任務B運行過程中所有緩存的數據量,包括緩存在內存和磁盤的,以及系統(tǒng)因為內存空間不夠,為緩存新數據而從磁盤移除的老數據量??梢钥闯觯谶\行內存較小的時候,系統(tǒng)使用內存緩存和磁盤緩存都小,但都隨內存增加而增大。而到1 600 MB以后,內存緩存基本保持穩(wěn)定,磁盤緩存則顯著下降。說明內存足夠大以后,只用內存就能滿足所有緩存要求,所以磁盤使用量下降,同時也越來越不需要移除內存給新的數據騰空間,所以內存緩存移除數據同步下降,最后趨于零。前面內存越小,內存緩存也越小,但磁盤緩存也越小,同圖5類似,說明系統(tǒng)根據參數減少了緩存的使用,使用重新計算的方式得到需要的數據,這一部分需要消耗部分時間,但比使用磁盤緩存耗時反而更低。
Figure 6 Cache data trends圖6 緩存數據變化趨勢
綜合圖5和圖6可以看出,Spark作業(yè)運行時間與GC時間、存儲設備讀寫時間等都密切相關,由多種因素決定。在數據量太大而內存遠遠不夠的情況下運行任務,酌情減少內存的使用,可能會達到更好的效果。
圖7展示了使用不同的JVMNewRatio參數運行任務B的結果。NewRatio參數代表JVM中年老代和年輕代大小的比例,我們分別設置為1、2、4、8、15,對比運行效果。從圖7中可以看出,當NewRatio參數越來越大,也就是堆空間中年老代占比越來越多的時候,系統(tǒng)的Minor GC時間增加,而Full GC時間減少,任務運行的總時間減少,在參數為8時達到最小值,此時性能比參數為1時提升25%,比默認參數提升10%。
Figure 7 Running time and GC time in different heap space ratio圖7 不同堆空間比例時作業(yè)運行時間和GC時間
這是因為Spark任務中緩存的數據一般會保持較長時間,并且比較大,在NewRatio較大,即年輕代較小、年老代較大的情況下,會很快占滿年輕代的Eden區(qū)間,引發(fā)Minor GC,所以Minor GC次數比較多,時間比較長,但單次Minor GC的時間會短,因為年輕代空間較小,清理對象較快,如圖8所示。而由于年老代空間較大,對象的生命周期容忍度比較大,所以年老代的Full GC次數比較少。從圖8中可以看見,單次Full GC時間大約是單次Minor GC時間的10~20倍,因此Full GC的次數更能影響系統(tǒng)的性能。
Figure 8 Average time of single GC in different heap space ratio圖8 不同堆空間比例時平均單次GC時間對比
Spark有兩個參數調節(jié)管理內存的分配,spark.memory.fraction表示Spark管理的內存中執(zhí)行和存儲部分占的比例,實驗使用任務B在0.9~0.002測試,結果如圖9所示。從圖9中可以發(fā)現,當比例在0.001時運行時間達到最小值,也就是說提交運行內存為2 200 MB時,只使用不到20 MB作為執(zhí)行代碼和存儲使用的內存,此時性能比最低水平提升50%,比平均水平提升30%。查找監(jiān)測數據可以發(fā)現,該比例時運行作業(yè),JVM只有Minor GC,同時也幾乎沒有中間數據存儲在內存和硬盤,因為此時Spark使用的內存大小遠小于JVM年老代大小,所以根本不會觸發(fā)Full GC,在內存緩存很小很小的情況下,GC時間最短的性能最高。
Figure 9 Running time of different Spark memory execution storage ratio圖9 不同Spark內存執(zhí)行存儲比例時運行時間
本文分析Spark運行過程中JVM層面和Spark層面對內存的管理機制,從減少數據量、改變提交內存大小、改變JVM堆空間內存分配比例、改變Spark管理內存分配比例幾個方面探討了優(yōu)化Spark作業(yè)運行的方法,并通過實驗對方法進行了驗證。
通過實驗發(fā)現,總的來說,使用更大的內存能夠提升系統(tǒng)性能。但是,在大數據處理背景下,內存的增長跟不上數據的增長,在有限的內存條件下,適當減少內存使用促使系統(tǒng)用重新計算代替緩存,根據Spark緩存對象和作業(yè)性質的特點,優(yōu)化JVM中堆內存的分配和Spark管理內存的大小,能夠有效降低I/O和GC消耗,提高系統(tǒng)性能。
本文的貢獻點在于結合Spark對內存的管理和JVM對內存的管理,分析兩層之間的內存管理的相互聯(lián)系,根據數據對象在內存中的存在方式變化,提出優(yōu)化策略。實驗結果說明優(yōu)化策略能夠有效提升系統(tǒng)性能,具有一定的代表性,分析的結論對如何更好地使用和改進Spark也有一定的啟發(fā)作用。
參考文獻:
[1] Zaharia M.An architecture for fast and general data processing on large clusters[M].New York:Association for Computing Machinery and Morgan & Claypool,2013.
[2] Zaharia M,Chowdhury M,Franklin M J,et al.Spark: Cluster computing with working sets[C]∥Proc of the 2nd USENIX Conference on Hot Topics in Cloud Computing,2010:10.
[3] Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C]∥Proc of the 9th USENIX Conference on Networked Systems Design and Implementation,2012:141-146.
[4] Apache Spark-Lightning-fast cluster computing[EB/OL].[2016-09-14].http://spark.apache.org/.
[5] Ousterhout K,Rasti R,Ratnasamy S,et al.Making sense of performance in data analytics frameworks[C]∥Proc of the 12th USENIX Conference on Networked Systems Design & Implementation,2015:293-307.
[6] Chen Qiao-an,Li Feng,Cao Yue,et al.Parameter optimization for Spark jobs based on runtime data analysis[J].Computer Engineering & Science,2016,38(1):11-19.(in Chinese)
[7] Yang Zhi-wei,Zheng Quan,Wang Song,et al.Adaptive task scheduling strategy for heterogeneous spark cluster [J].Computer Engineering,2016,42(1):31-35.(in Chinese)
[8] Gog I,Giceva J,Schwarzkopf M,et al.Broom: Sweeping out garbage collection from big data systems[C]∥Proc of the 15th Usenix Conference on Hot Topics in Operating Systems,
2015:2.
[9] Gidra L, Thomas G,Sopena J,et al.NumaGiC: A garbage collector for big data on big NUMA machines[J].Acm Sigplan Notices,2015,50(4):661-673.
[10] Java SE 6 HotSpot[tm] Virtual machine garbage collection tuning[EB/OL].[2016-09-16].http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html.
附中文參考文獻:
[6] 陳僑安,李峰,曹越,等.基于運行數據分析的Spark任務參數優(yōu)化[J].計算機工程與科學,2016,38(1):11-19.
[7] 楊志偉,鄭烇,王嵩,等.異構Spark集群下自適應任務調度策略[J].計算機工程,2016,42(1):31-35.