(復(fù)旦大學(xué) 專用集成電路與系統(tǒng)國家重點實驗室,上海 200433)
隨著互聯(lián)網(wǎng)產(chǎn)業(yè)的不斷發(fā)展,數(shù)據(jù)計算的規(guī)模日趨龐大,大數(shù)據(jù)的分析處理技術(shù)也愈發(fā)凸顯其重要性。隨著智能化時代的到來,使得包括智能交通和自動駕駛中對道路畫面的實時識別和處理、智能家居中對人臉的實時檢測等在內(nèi)的各類物聯(lián)網(wǎng)和智能化應(yīng)用都對大數(shù)據(jù)的實時處理和響應(yīng)能力提出了很高的要求[1]。另一方面,現(xiàn)有的大數(shù)據(jù)實時處理框架僅挖掘了集群中的CPU計算能力,而CPU其自身的結(jié)構(gòu)特點和并發(fā)程序設(shè)計的復(fù)雜性限制了其對密集計算型應(yīng)用提供符合性價比要求的實時響應(yīng)能力。
實時處理系統(tǒng)的主要指標(biāo)在于數(shù)據(jù)處理的吞吐量和延時,降低延時的一個優(yōu)化措施在于降低系統(tǒng)中各個關(guān)鍵節(jié)點的處理時間。目前,圖形處理器(Graphic Process Unit,GPU)因其計算和緩存結(jié)構(gòu)上的眾核優(yōu)勢,越來越多地被應(yīng)用在協(xié)同計算和加速計算上[2]。GPU加速計算將程序中計算密集部分的工作負(fù)載轉(zhuǎn)移到GPU上通過多核并行的方式來加快執(zhí)行,因此將GPU資源納入到集群中參與可并行化的、數(shù)據(jù)密集型的計算,可以大大增強集群的計算能力。特別是對于大規(guī)模集群計算中的流式實時系統(tǒng),系統(tǒng)的延時取決于關(guān)鍵節(jié)點的延時,采用GPU資源參與計算可以大幅提升整個系統(tǒng)的性能。
Apache Storm[3]是實時數(shù)據(jù)處理平臺的代表,被廣泛應(yīng)用在各類實時數(shù)據(jù)分析領(lǐng)域。本文以Apache Storm為基礎(chǔ),設(shè)計開發(fā)Heterogeneous Storm(以下均稱為H-Storm)異構(gòu)計算平臺。通過引入GPU計算資源加快集群中節(jié)點的數(shù)據(jù)處理速度和降低關(guān)鍵節(jié)點的處理延時,以提升實時處理系統(tǒng)的吞吐量和計算應(yīng)用的性能。
ApacheStorm是一個分布式的大數(shù)據(jù)流處理應(yīng)用框架,理論上能夠可靠地實時處理無限的數(shù)據(jù)流。如圖1所示,Storm集群中主要分為兩類節(jié)點:控制節(jié)點和計算節(jié)點??刂乒?jié)點(Nimbus)負(fù)責(zé)管理集群資源和調(diào)度、監(jiān)控用戶任務(wù);計算節(jié)點(Supervisor)負(fù)責(zé)運行守護(hù)進(jìn)程和啟動任務(wù)執(zhí)行的Worker進(jìn)程;它們通過Zookeeper進(jìn)行數(shù)據(jù)交互。
圖1 Apache Storm集群的邏輯結(jié)構(gòu)
在Storm集群中運行的每個作業(yè)被表示為一個有向無環(huán)的拓?fù)鋱D,稱為一個Topology,如圖2所示。其中,每個結(jié)點是一個組件,組件有2種類型:Spout組件是數(shù)據(jù)源,負(fù)責(zé)生產(chǎn)消息(消息被稱為Tuple);Bolt組件封裝了數(shù)據(jù)處理邏輯。每個組件都含有一個或者多個Task,Task是最小的處理單元。消息從一個Task分發(fā)到另一個Task,Stream Grouping定義了消息分發(fā)策略。Executor線程是執(zhí)行Task任務(wù)的基本物理單元,多個Executor線程封裝成Worker進(jìn)程,每個Topology最終體現(xiàn)為多個節(jié)點上多個進(jìn)程的執(zhí)行。
圖2 Apache Storm作業(yè)的拓?fù)浣Y(jié)構(gòu)
Storm中的默認(rèn)調(diào)度算法,僅考慮了CPU計算資源。它將CPU核心和可用端口封裝為資源槽(WorkerSlot),根據(jù)每個Topology中指定的需求來進(jìn)行分配。調(diào)度算法一般分為2個階段:1)將各個組件的Executor線程分配到Worker進(jìn)程中;2)將Worker進(jìn)程分配到集群中計算節(jié)點上可用的WorkerSlot中。Apache Storm默認(rèn)的調(diào)度算法采用的是基于輪詢式(Round-Robin)的平均分配算法,不考慮任務(wù)的結(jié)構(gòu)和節(jié)點間的數(shù)據(jù)通信,即首先將Executor線程按序編號后平均分配到Worker中,然后將所有Supervisor節(jié)點上的可用WorkerSlot進(jìn)行交叉排序,直接依序選取需要的若干個WorkerSlot。默認(rèn)調(diào)度算法這種將Topology中所有Worker平均分配到所有節(jié)點上的調(diào)度機制,會導(dǎo)致集群中可能的負(fù)載不均衡。
目前對Storm平臺的研究主要集中在兩方面:Storm的任務(wù)調(diào)度策略和基于Storm的大數(shù)據(jù)處理應(yīng)用。Storm的調(diào)度策略研究主要是為了提高Storm的吞吐量和優(yōu)化處理延時,同時保證集群的負(fù)載均衡。文獻(xiàn)[4]提出了Storm的2種任務(wù)調(diào)度算法,Offline調(diào)度算法根據(jù)Topology組件間的偏序關(guān)系來進(jìn)行任務(wù)調(diào)度,Online調(diào)度算法則根據(jù)運行時監(jiān)測到的Executor線程間的通信數(shù)據(jù)來動態(tài)地調(diào)度任務(wù)。文獻(xiàn)[5]提出一種新的基于節(jié)點資源利用率的資源感知調(diào)度算法,該調(diào)度算法調(diào)度時會考慮各節(jié)點的CPU及內(nèi)存使用信息和每個Topology的資源需求信息。文獻(xiàn)[6]提出了一種基于兩層圖劃分的任務(wù)調(diào)度算法,綜合考慮了數(shù)據(jù)傳輸速率和任務(wù)間的通信模式。文獻(xiàn)[7]提出了基于節(jié)點通信數(shù)據(jù)統(tǒng)計來最小化進(jìn)程間和節(jié)點之間通信的Storm調(diào)度算法。
基于Storm的大數(shù)據(jù)應(yīng)用更是層出不窮。文獻(xiàn)[8]提出了基于Storm的實時交通畫面處理。文獻(xiàn)[9]使用Storm結(jié)合Kafka消息隊列來實現(xiàn)基于CCL算法的人臉識別系統(tǒng)。文獻(xiàn)[10]使用Storm來開發(fā)實時視頻的分析平臺。
上述研究都基于現(xiàn)有的Storm平臺,沒有涉及到GPU資源在集群中的管理和調(diào)度。文獻(xiàn)[11]提出的G-Storm,使用GPU來增強Storm的吞吐量,但只是在單機上實現(xiàn)了GPU的調(diào)用和消息的批處理,并沒有實現(xiàn)集群中GPU資源的調(diào)度。GPU與Hadoop的結(jié)合方面也有一些探索。文獻(xiàn)[12]提出在Hadoop中調(diào)用GPU進(jìn)行運算的四種方法。文獻(xiàn)[13]提出在Hadoop框架中使用GPU來對K-means聚類算法進(jìn)行加速。但由于Hadoop適用于批處理,且Hadoop與Storm存在差異性,因此無法用在Storm上。
上述對Storm和GPU資源調(diào)度的研究和應(yīng)用,都沒有能有效地將具有高度并行化計算能力的多GPU計算資源納入到大規(guī)模的計算集群中。因此,研究和設(shè)計一個多GPU的任務(wù)和計算資源調(diào)度算法,通過統(tǒng)一的管理和調(diào)度來進(jìn)一步增強Storm系統(tǒng)的并行計算和實時處理能力,具有很強的研究和應(yīng)用價值。
本文提出和設(shè)計開發(fā)的H-Storm集群同時包含CPU和GPU資源,對應(yīng)地,用戶Topology也包含CPU作業(yè)和GPU作業(yè)。因此,作為一個異構(gòu)的計算集群,會需要一個統(tǒng)一的資源管理和任務(wù)調(diào)度算法。H-Storm的實現(xiàn)主要分為2個部分:GPU資源的發(fā)現(xiàn)與H-Storm異構(gòu)集群的任務(wù)調(diào)度策略。
GPU資源的發(fā)現(xiàn)分為GPU資源的量化、上報和調(diào)用3個部分。GPU資源的量化用于挖掘GPU的并行任務(wù)處理能力,上報用于在H-Storm中管理GPU資源,調(diào)用即為在H-Storm中運行GPU程序。
2.1.1 GPU資源量化
大規(guī)模的Storm集群中往往存在很多Topology同時運行,每個Topology又由若干進(jìn)程組成,而集群中的GPU資源有限,因此需要將GPU資源量化以提供給多個進(jìn)程共享使用。本文采用類似原生Storm中CPU資源槽的概念來量化GPU資源。GPU板卡上的主要資源為計算核心和顯存,要劃分為資源槽時,必須首先考慮GPU的并行執(zhí)行能力。
本文主要采用Nvidia公司的GPU板卡。Nvidia公司的GPU產(chǎn)品自Fermi架構(gòu)起實現(xiàn)了并發(fā)核心執(zhí)行(Concurrent Kernel Execution)能力[14],這個特性被稱為Hyper-Q特性。并發(fā)核心的執(zhí)行是通過為不同的Kernel指定不同的Stream來實現(xiàn)的,但是僅限于相同GPU Context的不同Stream才能并發(fā)執(zhí)行,意味著一塊GPU板卡同一時刻只能被一個進(jìn)程占用,無法滿足Storm集群的并行計算需求。對此,Nvidia提供了多進(jìn)程服務(wù) (Multi-process Service,MPS)來解決這個問題[15]。
MPS服務(wù)允許來自不同進(jìn)程的核函數(shù)能夠并發(fā)地在GPU上運行,以提高GPU板卡的計算資源共享和帶寬利用率,但是要求GPU的計算能力[16]大于3.5,且要求客戶端數(shù)目不多于16個。MPS服務(wù)通過MPS Server來響應(yīng)和處理來自多個客戶端的GPU執(zhí)行請求,所有的信息傳遞接口(Message Passing Interface,MPI)子進(jìn)程通過MPS Server共用同一個Context,通過并發(fā)和分時調(diào)度來實行多進(jìn)程的并發(fā)執(zhí)行。
基于MPS,可以實現(xiàn)Storm中多進(jìn)程的并發(fā)。假設(shè)有N個Supervisor節(jié)點Si(i=1,2,…,N),每個Supervisor節(jié)點有Mi個GPU板卡,定義每個GPU板卡Gij(i=1,2,…,N,j=1,2,…,Mi)上的并發(fā)度為Pij,則在分配時對于每個節(jié)點Si需要滿足下列約束:
Pij≤1,SMarch<3.5
(1)
(2)
因為GPU程序一般需要CPU和GPU的協(xié)同計算,即CPU負(fù)責(zé)拷貝輸入輸出數(shù)據(jù),GPU負(fù)責(zé)計算,所以在衡量GPU的計算資源時,會為每個GPU資源槽分配一個對應(yīng)的CPU資源槽,并綁定一個端口用于CPU收發(fā)處理數(shù)據(jù)。假設(shè)Supervisor節(jié)點Si上可用的資源槽數(shù)目為Ai(i=1,2,…,N),已經(jīng)分配的資源槽數(shù)目為Ui(i=1,2,…,N),它們之間存在如下的約束關(guān)系:
(3)
因此,在分配GPU Worker到計算節(jié)點時,必須滿足式(1)~式(3)所示的Supervisor節(jié)點上GPU/CPU資源槽數(shù)目的約束,實現(xiàn)GPU資源的量化和并發(fā)任務(wù)處理,其資源槽的數(shù)目在集群配置文件中指定。
2.1.2 GPU資源上報
在存在GPU板卡的情況下,GPU資源上報和任務(wù)調(diào)度流程如圖3所示。首先通過配置文件將GPU資源量化為GPU資源槽。Supervisor守護(hù)進(jìn)程通過CUDA API自動將查詢到的GPU資源信息通過心跳包存儲到Zookeeper中。Nimbus根據(jù)讀取到的GPU資源信息,按照指定的異構(gòu)調(diào)度算法分配GPU任務(wù)到指定Supervisor節(jié)點上的指定GPU板卡,并將調(diào)度信息寫回Zookeeper。Supervisor再根據(jù)任務(wù)信息,啟動相應(yīng)的GPU Worker,創(chuàng)建指定GPU板卡的Context,執(zhí)行作業(yè)。
圖3 H-Storm中GPU資源發(fā)現(xiàn)與調(diào)度流程
2.1.3 GPU資源調(diào)用
CUDA作為C語言的擴展,用于開發(fā)GPU程序,本文使用JCuda庫利用Java的JNI(Java Native Interface)來實現(xiàn)在Java中對CUDA程序的調(diào)用。
在H-Storm中,GPU的調(diào)用層次如圖4所示。GPU Executor運行在GPU Worker中,是實際執(zhí)行GPU代碼的主體。GPU Executor分為兩部分,一部分是對數(shù)據(jù)進(jìn)行并行處理的CUDA代碼(.cu文件),另一部分是實現(xiàn)Spout/Bolt功能的功能實現(xiàn)代碼。功能實現(xiàn)代碼通過JCuda調(diào)用編譯和執(zhí)行CUDA代碼。JCuda通過本地的CUDA Runtime和Driver API來執(zhí)行GPU代碼,最終通過MPS服務(wù)調(diào)用GPU執(zhí)行。
圖4 Storm中GPU組件執(zhí)行調(diào)用層次
在H-Storm集群中,部分Supervisor節(jié)點存在一塊或者多塊GPU板卡,用戶作業(yè)可能包含CPU組件和GPU組件。GPU組件對節(jié)點GPU資源的依賴性和MPS服務(wù)的并行度限制,導(dǎo)致H-Storm集群中的調(diào)度策略更加復(fù)雜。因此,本文設(shè)計了H-Storm異構(gòu)集群中的任務(wù)調(diào)度策略,主要包括:綜合考慮GPU性能及負(fù)載的異構(gòu)任務(wù)調(diào)度算法;協(xié)同計算時CPU/GPU節(jié)點的自適應(yīng)流分發(fā)決策機制。
2.2.1 異構(gòu)任務(wù)調(diào)度算法
異構(gòu)任務(wù)調(diào)度算法實現(xiàn)將Topology中定義的所有GPU和CPU線程分配到節(jié)點資源槽中運行,算法考慮了Topology的結(jié)構(gòu)和當(dāng)前集群的可用資源信息,其偽代碼如算法1所示。
算法1H-Storm的異構(gòu)任務(wù)調(diào)度算法
Require:cluster集群信息,topologies作業(yè)信息
1. functionschedule(cluster,topologies)
2. needSchedulingTopos←cluster.needsScheduling Topologies (topologies)
3. for each topology in needSchedulingTopos do
4. GPUExecutors←getNeedAssignGpuExecutors(topology)
5. GPUWorkerNum←getNumGpuWorkers (topology)
6. executorToGpuWorker←assignExecutorToGpuWorker(topology,GPUExecutors,GPUWorkerNum)
7. executorToNodeSlot←assignGpuWorkerToNodeSlot(cluster,GPUWorkerNum,executorToGpuWorker)
8. assignGpuExecutors(cluster,executorToNodeSlot)
9. end for
10. assignRemainCPUExecutors(cluster,topologies)
11. return
12. end function
首先從Zookeeper中讀取并判斷當(dāng)前所有需要調(diào)度的Topology,對于每一個Topology執(zhí)行調(diào)度程序。調(diào)度程序在獲取當(dāng)前Topology中所有需要調(diào)度的GPU線程以及需要的GPU資源槽數(shù)目后,通過assignExecutorToGpuWorker函數(shù)將其GPU線程分配到指定數(shù)目的GPU Worker中。隨后,讀取當(dāng)前所有可用的節(jié)點GPU資源槽,通過assignGpu WorkerToNodeSlot將已有的GPU Worker進(jìn)程分配到Supervisor節(jié)點可用的GPU資源槽中。
在完成GPU線程分配后,需要分配CPU線程。如果CPU線程和GPU線程完全被分配到不同的進(jìn)程,那么進(jìn)程乃至節(jié)點之間的通信會導(dǎo)致系統(tǒng)性能的下降。因此,本文采用了CPU和GPU Worker部分復(fù)用的策略,即將部分CPU線程分配到空閑的GPU Worker中,以節(jié)省通信開銷。
1)線程分配到進(jìn)程的算法
將GPU線程匯聚到有限的Worker進(jìn)程中所采用的算法參考了文獻(xiàn)[4]中提出的偏序思想。2個組件之間存在連接,代表它們之間有流消息的傳遞,如果將2個存在連接關(guān)系的組件Task分配到同一個進(jìn)程中,可以減少進(jìn)程或節(jié)點的通信數(shù)據(jù)。
Topology的本質(zhì)是一張有向無環(huán)圖,組件之間存在偏序關(guān)系,對組件進(jìn)行拓?fù)渑判蚝罂梢缘玫狡湟蕾囮P(guān)系。因此,簡單的分配策略是,按拓?fù)漤樞虮闅v組件,依次將組件中的線程采用輪詢方式放置到Worker進(jìn)程中。對于每個組件,如果其輸入組件已經(jīng)分配到Worker進(jìn)程中,那么優(yōu)先將其按照輪詢方式分配到對應(yīng)的所有Worker進(jìn)程列表中。拓?fù)渑判虮WC了輸出組件不會早于輸入組件進(jìn)行分配。相應(yīng)的偽代碼如算法2所示。
算法2H-Storm任務(wù)調(diào)度算法中線程分配到進(jìn)程的偽代碼
Require:topology作業(yè)信息,GPUExecutors待調(diào)度GPU線程,GPUWorkerNumGPUWorker數(shù)目
1.functionassignExecutorToGpuWorker(topology,GPUExecutors,GPUWorkerNum)
2. componentConnections←topology.getGpuComponents()
3. oderedComponentList←DFSComponent(component Connections)
4. Min←executorCount/GPUWorkerNum
5. Max←(executorCount-GPUWorkerNum + 1)
6. maxExecutorNumPerWorker←(Min +α×(Max-Min))
7. executorToSlotMap<-newMap
8. for each component in orderedComponentList do
9. inputComponents←componentConnections.get(component)
10. assignedInputSlots←getAssignedSlotsForInputComp(input Components)
11. if assignedInputSlots exist and not empty and not full then
12. assign executors to slots in round-robin
13. else
14. assign executors to empty slots or not full slots in round-robin
15. end if
16. end for
17. return executorToSlotMap
18. end function
但是這種策略會導(dǎo)致存在連接關(guān)系的線程在一個Worker中的聚集,從而導(dǎo)致分配的Worker數(shù)目少于用戶初始指定的Worker數(shù)目。這種情況可以通過引入調(diào)控因子α來解決。假設(shè)Executor數(shù)目為E,指定的Worker數(shù)目為W,在不存在空閑Worker的情況下任意一個Worker中的最大線程數(shù)Nmax可以表達(dá)為:
Nmax=E-W+1
(4)
同樣,在負(fù)載平均的情況下任意一個Worker中的最小線程數(shù)Nmin可以表達(dá)為:
Nmin=「E/W?
(5)
因此,引入α后,每個Worker中的最大線程數(shù)N為:
N=Nmin+α×(Nmax-Nmin)
(6)
當(dāng)α取0時,N等于Nmin,所有的線程平均分配到各進(jìn)程中,意味著重點考慮所有線程在進(jìn)程中的負(fù)載均衡,但忽視了任務(wù)間的數(shù)據(jù)傳輸,會引起更多的傳輸開銷和延時。當(dāng)α取1時,N等于Nmax,意味著側(cè)重考慮減少進(jìn)程間的通信開銷,但由于進(jìn)程中的線程數(shù)可能較多,首先會造成頻繁的內(nèi)核線程切換;其次多個線程對相同計算資源的占用會增加等待時間;再者,大量的數(shù)據(jù)拷貝和發(fā)送會導(dǎo)致單個發(fā)送線程因繁忙而難以及時響應(yīng),也會導(dǎo)致Java虛擬機頻繁的垃圾回收,從而降低系統(tǒng)性能。因此,針對不同的系統(tǒng)部署,可以通過α在0~1之間的不同取值來獲得對上述2種情況的合理權(quán)衡。
2)進(jìn)程分配到節(jié)點GPU的算法
Executor線程分配到固定數(shù)目的Worker進(jìn)程中后,需要將Worker進(jìn)程分配到Supervisor節(jié)點上的GPU資源槽中。已知存在N個Supervisor節(jié)點Si(i=1,2,…,N),每個Supervisor節(jié)點上的GPU板卡數(shù)目為Mi(i=1,2,…,N),第i個Supervisor節(jié)點上的第j塊GPU板卡記為Gij(i=1,2,…,N,j=1,2,…,Mi) (Mi=0代表該節(jié)點上不存在GPU板卡),Aij代表Gij上可容納的總的并行度,Uij代表Gij上已使用的并行度,則Gij板卡的空閑率可表示為:
(7)
GPU板卡的型號不同,其運算性能也不同,H-Storm中使用每秒10億次的浮點運算數(shù)(Giga Floating-point Operations Per Second,GFlops)來衡量GPU板卡的性能。GFlops可按式(8)計算,即GPU板卡中SM(Streaming Multiprocessor)的數(shù)目與每個SM中SP(Streaming Processor)處理單元數(shù)目的乘積,再乘以GPU的時鐘頻率。在衡量性能時,同時會考慮GPU的架構(gòu),GPU架構(gòu)一般使用Major表示,越新的架構(gòu)Major值越大,計算能力越強。
performanceij=multiProcessorCount×
cudaCoresPerMP×clockRate
(8)
H-Storm集群中可能存在多個不同類型的板卡,需要根據(jù)其性能對其計算能力進(jìn)行歸一化。在分配GPU Worker到GPU板卡的過程中,通過引入調(diào)控因子β(0≤β≤1) 來實現(xiàn)對GPU的性能和空閑度的綜合權(quán)衡和計算負(fù)載的合理分配。定義一塊GPU板卡的綜合得分為:
scoreij=β×performanceij+(1-β)×spareij
(9)
其中,β參數(shù)的選取依賴于集群中的GPU配置,當(dāng)集群中所有的GPU型號相同時,顯然只需考慮空閑度,此時取β為0;當(dāng)集群中存在性能差異較大的GPU時,計算應(yīng)用在繁忙但性能很強的GPU上的運行時間反而可能小于在較空閑但是性能很差的GPU上的運行時間,此時應(yīng)優(yōu)先考慮性能,即β取值更接近1。因此,面向不同的集群配置,可以通過選取不同的β值來找到合適的均衡點。
算法依據(jù)綜合得分對GPU板卡進(jìn)行排序,在選取需要的GPU Worker時從高到低依次選取可用的資源槽。并且,每次選擇后必須更新GPU的空閑率。整個過程的偽代碼如算法3所示。
算法3H-Storm調(diào)度算法中進(jìn)程分配到節(jié)點的偽代碼
Require:cluster集群信息,GPUWorkerNumGPUWorker數(shù)目,executorToGpuWorker線程分配結(jié)果
1.function assignGpuWorkerToNodeSlot(cluster,GPUWorkerNum,executorToGpuWorker)
2. allGpuInfos ← cluster.getAllGpuInfos()
3. GPUScore ← newMap < uuid,score >
4. for GPUInfo in allGpuInfos do
5. uuid←GPUInfo.getUUID()
6. performance←computeGpuPerformance(GPUInfo)
7. spareRatio←cluster.getGpuSpareRatio(GPUInfo)
8. score←(β*performance + (1-β)*spareRatio)
9. GPUScore.add(uuid,score)
10. end for
11. slotIndexToGpuUUID←newMap
12. while (GPUWorkerNum--) do
13. sortGpuScoreByScore(GPUScore)
14. GPUWorkerNumToUUID←GPUScore.remove(0)
15. updateGpuSpareAndScore(GPUScore)
16. slotIndexToGpuUUID.add(GPUWorkerNumTo UUID)
17. end while
18. executorToNodeSlot←computeAssignment(slotIndex ToGpuUUID,executorToGpuWorker)
19. return executorToNodeSlot
20. end function
2.2.2 自適應(yīng)流分發(fā)決策機制
CPU/GPU的協(xié)同計算,意味著同一個任務(wù),既可以在CPU上運行,也可以在GPU上運行。協(xié)同計算的目的在于充分挖掘集群中異構(gòu)資源的計算能力,使得H-Storm總的計算能力等于CPU資源和GPU資源計算能力的疊加。在具體實現(xiàn)上,由于CPU和GPU的差異,需要分別用Java和JCuda來實現(xiàn)相同的功能。由于CPU和GPU計算速度的不一致,協(xié)同計算的難點在于任務(wù)分配的均衡性,在H- Storm中體現(xiàn)為:對于接收到的每一個數(shù)據(jù)流,應(yīng)該做何種決策來將數(shù)據(jù)分發(fā)給下游的異構(gòu)Bolt進(jìn)行處理。
決策分流是通過為CPU和GPU聲明不同的消息流,并使下游的異構(gòu)Bolt分別從不同的消息流獲取數(shù)據(jù)來實現(xiàn)的。本文采用固定時間窗口內(nèi)Spout消息的響應(yīng)延時來衡量CPU和GPU的計算能力,響應(yīng)延時同時也決定了系統(tǒng)的吞吐量,是較為準(zhǔn)確的衡量吞吐量的指標(biāo)(不選擇吞吐量的原因在于系統(tǒng)的吞吐量在計算能力充足的情況下受限于上游的發(fā)送速度,不能用吞吐量來反向調(diào)控發(fā)送速度,否則性能會受制于起始的分發(fā)比例)。假設(shè)CPU Bolt和GPU Bolt從相同的數(shù)據(jù)源Spout互斥地獲取數(shù)據(jù),每隔時間τ進(jìn)行一次采樣,每隔T時間進(jìn)行一次決策概率調(diào)整,決策概率記為γ,代表占比多少的數(shù)據(jù)將會發(fā)送給GPU Bolt,γ的初始值取為50%(即初始時給CPU和GPU Bolt分發(fā)等量的消息流)。T時間內(nèi)CPU流和GPU流的平均響應(yīng)延時分別記為avg_lat_cpu和avg_lat_gpu,這樣相應(yīng)窗口內(nèi)的GPU分發(fā)占比(記為δ)可表示為:
(10)
于是,綜合考慮當(dāng)前窗口的消息分發(fā)比和歷史分發(fā)比,決策概率可以調(diào)整為:
γ′=γ+θ×(δ-γ)
(11)
其中,θ用于調(diào)控決策概率γ的更新速度,θ的取值范圍為0~1。θ越大,表明決策概率越側(cè)重于當(dāng)前窗口的響應(yīng)延時比,也更容易使γ受到局部窗口內(nèi)突變的影響。
為將調(diào)整后的決策概率動態(tài)地寫入Zookeeper,本文設(shè)計了Spout和Bolt之間的信號通知機制,即組件之間可以通過Zookeeper來發(fā)送信號,信號通過Zookeeper節(jié)點的監(jiān)控回調(diào)機制來實現(xiàn)。Spout組件在回調(diào)時會動態(tài)修改決策概率。
Spout中分發(fā)到不同消息流的機制采用決策概率γ來調(diào)控,通過將γ量化為布爾決策窗來提高決策速度。假設(shè)γ=0.8,意味著GPU每處理4條消息,CPU將會處理一條消息,因此設(shè)置大小為5的決策窗,5條消息按照4∶1的比例分發(fā)。整個自適應(yīng)流分發(fā)決策機制如圖5所示。
圖5 CPU/GPU自適應(yīng)流分發(fā)決策機制
本文設(shè)計了Topology來測試H-Storm的性能,這些測試Topology中的部分組件具有大量的浮點數(shù)運算,采用GPU并行計算對這些組件中的計算任務(wù)進(jìn)行加速,進(jìn)而將H-Storm中實現(xiàn)的異構(gòu)任務(wù)調(diào)度器和Storm中原生的調(diào)度器進(jìn)行性能對比。
本文所設(shè)計和實現(xiàn)的H-Storm基于Apache Storm 的1.0.2版本,采用7臺服務(wù)器搭建一個充分異構(gòu)的H-Storm集群,服務(wù)器均為32 GB內(nèi)存,節(jié)點之間的帶寬為100 MB/s。其中一個節(jié)點配置為Nimbus節(jié)點(記為S0),其他6個為Supervisor節(jié)點(記為S1~S6)。S1~S3這3個節(jié)點上配置了GPU板卡,另外3個節(jié)點上不配置GPU板卡,如表1所示。
表1 集群環(huán)境中CPU/GPU配置情況
本文設(shè)計了一組測試用例用于調(diào)度策略中的α參數(shù)和β參數(shù)的性能調(diào)優(yōu)(α參數(shù)見2.2.1節(jié)中1)),β參數(shù)見2.2.1節(jié)中2)),這些參數(shù)的取值很大程度上影響了H-Storm的性能。
α參數(shù)調(diào)控進(jìn)程中的最大線程數(shù),影響GPU線程在GPU進(jìn)程中的聚集。為優(yōu)化H-Storm集群中的α參數(shù),本文設(shè)計了如圖6所示的Sequential Topology來測試不同的α參數(shù)對吞吐量和延時的影響,中間4個Bolt均為GPU Bolt,執(zhí)行矩陣加法,Checker Bolt校驗計算結(jié)果。Sequential Topology共有24個GPU線程,封裝到6個Worker中。由式(4)~式(6)可知,α參數(shù)過大會導(dǎo)致任務(wù)在Supervisor節(jié)點上的分配不均衡??紤]負(fù)載均衡,本文限制α參數(shù)的范圍為0.00~0.20,這樣每個Worker中的最大線程數(shù)目變化范圍為4~8。
圖6測試用SequentialTopology的結(jié)構(gòu)示意圖
圖7顯示在不同α的情況下吞吐量隨時間的變化,可以看到在α為0.00時,系統(tǒng)平均吞吐量較大,此時所有的線程平均分配到各進(jìn)程中;α為0.20時,系統(tǒng)平均吞吐量最小,此時線程集聚會導(dǎo)致計算資源的繁忙。圖8顯示在不同α的情況下響應(yīng)延時隨時間的變化(響應(yīng)延時是指一條消息從Spout發(fā)出后到其所有衍生的消息樹產(chǎn)生的所有消息都被響應(yīng)的時間,響應(yīng)延時包含處理延時),顯然α為0.00時可取得最低的延時。
圖7 α取不同值時的吞吐量
圖8 α取不同值時的響應(yīng)延時
實驗結(jié)果表明,在α參數(shù)為0.00時,GPU線程平均分配到GPU進(jìn)程中,此時系統(tǒng)有最佳性能;隨著α值增大,有更多互相通信的線程被分配到同一進(jìn)程中,但性能越來越差,說明在使用GPU進(jìn)行矩陣運算加速的用例中,線程聚集帶來的通信開銷的節(jié)省遠(yuǎn)比不上線程聚集導(dǎo)致的計算資源的繁忙。因此,在通信開銷遠(yuǎn)小于計算開銷的應(yīng)用場景中,建議使用平均的分配算法,即取α為0.00;而在計算開銷遠(yuǎn)小于通信開銷的情況下,建議增大α的取值來減小通信開銷(α不能過大,否則會導(dǎo)致集群負(fù)載不均衡和發(fā)送數(shù)據(jù)堆積現(xiàn)象)。由此,本文后續(xù)工作取α為0.00。
β參數(shù)調(diào)控GPU性能和空閑度之間的衡量和選取。實驗環(huán)境中存在如表2所示的GPU參數(shù)。其性能參數(shù)和GFlops的衡量值見表2。由式(9)可知,β的取值為0~1,β值越大,表示分配時更側(cè)重考慮性能;β值越小,表示分配時更側(cè)重于考慮空閑度。例如,假設(shè)集群中每個GPU上有4個GPU資源槽,Topology需要4個Worker,則首先會優(yōu)先選擇3個GTX 1060上的可用Worker, 對于第4個Worker的選擇,當(dāng)β參數(shù)大于0.5時,會使用GTX 1060,反之則使用GTX 970。
表2 本文實驗環(huán)境中GPU參數(shù)
本文使用如圖9所示的矩陣乘法運算Topology來測試β參數(shù)的取值,其中,MatrixReader Spout從文件中讀取矩陣A和B,MatrixDealer負(fù)責(zé)計算矩陣的乘積,MatrixPrinter負(fù)責(zé)存儲運算結(jié)果。為了只驗證GPU選取的不同帶來的性能影響,本文實驗未考慮網(wǎng)絡(luò)傳輸消耗。
圖9 矩陣乘法Benchmark的Topology結(jié)構(gòu)
圖10顯示了在Worker數(shù)目分別為5和15、矩陣維度分別為64和256的情況下,吞吐量和響應(yīng)延時隨β參數(shù)的變化??梢钥吹?當(dāng)Worker數(shù)目為5時,在β參數(shù)為0.1時,可取得最大的吞吐量和較低的響應(yīng)延時,此時計算任務(wù)很少,側(cè)重考慮空閑度可獲得最大的吞吐量;當(dāng)Worker數(shù)目為15時,在β參數(shù)為0.3時,可取得最大的吞吐量和最低的響應(yīng)延時,此時仍然優(yōu)先考慮空閑度。在2種情況下,隨著β值的增大,即對GPU性能的衡量權(quán)重上升后,系統(tǒng)的性能都越來越差,說明對性能的側(cè)重使得任務(wù)過多地被分配到性能較高的GPU上,導(dǎo)致其負(fù)載過重,系統(tǒng)性能降低。而本文環(huán)境中的GPU性能都比較強,在矩陣計算中并沒有很大的速度差異,因此對性能的側(cè)重并不能得到計算時間的大幅縮短,反而導(dǎo)致了MPS服務(wù)的繁忙,從而性能降低。因此,綜合實驗結(jié)果,本文環(huán)境中取β參數(shù)值為0.3。
在H-Storm實現(xiàn)中,α參數(shù)和β參數(shù)均可以在集群配置文件和Topology配置中根據(jù)集群狀況和應(yīng)用場景進(jìn)行設(shè)置。
圖10 不同矩陣維度下吞吐量和延時隨β的變化
本文使用如圖9所示的矩陣乘法Topology來比較H-Storm異構(gòu)任務(wù)調(diào)度器和原生的Storm調(diào)度器的性能,兩者的差異在于其中的MatrixDealer組件,原生Storm任務(wù)使用CPU執(zhí)行計算,而H-Storm異構(gòu)任務(wù)采用GPU并行加速實現(xiàn)。
首先測試在并行度為1時,不同矩陣維度的CPU和GPU Bolt的處理延時,結(jié)果如表3所示。隨著矩陣維度的增加,CPU和GPU的差別越來越大,當(dāng)矩陣為512×512時,GPU的加速比達(dá)到74倍,可見,GPU Bolt對于關(guān)鍵計算節(jié)點的加速效果較好。
表3 不同矩陣維度下CPU和GPU Bolt的處理延時
隨后,分別在不同矩陣維度下測試CPU和GPU Topology的吞吐量和延時。圖11顯示了Worker數(shù)分別為5和12的情況下,CPU和GPU Topology的響應(yīng)延時,可以看到,隨著矩陣維度的增加,CPU的響應(yīng)延時迅速增加,而GPU的響應(yīng)延時則增長幅度較小,且遠(yuǎn)低于CPU,這與表2的結(jié)果相符。當(dāng)Worker數(shù)目為5,矩陣為512×512時,系統(tǒng)的響應(yīng)延時降低了77倍。同時,當(dāng)Worker數(shù)目較多時,CPU和GPU系統(tǒng)的響應(yīng)延時會增加約20%~40%,原因是內(nèi)核調(diào)度和網(wǎng)絡(luò)傳輸會導(dǎo)致開銷的增加。
圖11 Worker數(shù)為5和12時響應(yīng)延時隨矩陣維度的變化
不同矩陣維度下CPU和GPU吞吐量的比較結(jié)果如表4所示。當(dāng)矩陣維度為32×32時,GPU Topology吞吐量只有CPU的一半,因為此時計算開銷遠(yuǎn)低于GPU的調(diào)用開銷;隨著矩陣維度的增加,GPU的處理能力越來越強,因此吞吐量也相對于CPU有了巨大的提升,在256×256時能夠達(dá)到32倍的加速比。對于1 024×1 024的矩陣,CPU的處理延時達(dá)到2 s,因此并行度為5時每秒只能處理2個~3個消息,而GPU則達(dá)到每秒133個。同時可以看到,Worker數(shù)目為12時的吞吐量相比Worker為5時并沒有獲得理論值的2.4倍增益,一方面是由于CPU和GPU各自的上下文切換開銷,另一方面是由于本文的實驗環(huán)境中100 Mb/s網(wǎng)速的限制,使得GPU的吞吐量受到了限制。盡管如此,GPU在關(guān)鍵節(jié)點的加速還是使得Topology取得了可觀的吞吐量增益和響應(yīng)延時優(yōu)化。
表4 Worker數(shù)為5和12時CPU與GPU的吞吐量比較
在H-Storm設(shè)計中,自適應(yīng)分發(fā)決策機制通過決策概率γ來調(diào)控CPU/GPU數(shù)據(jù)流的分發(fā),θ用于調(diào)控決策概率γ的更新速度,而γ參數(shù)是系統(tǒng)自動調(diào)整的,不需要手動調(diào)節(jié)。
對自適應(yīng)分發(fā)決策機制的測試使用了如圖12所示的CPU/GPU混合的矩陣乘法Topology。首先設(shè)置初始的γ值為0.5。圖13顯示了在不同θ取值情況下γ的更新情況,圖14顯示了不同θ取值情況下的吞吐量。其中θ為0.9時γ進(jìn)入穩(wěn)定狀態(tài)較快,并且能保持穩(wěn)定,此時平均的吞吐量也是最大。由式(11)知,θ最大意味著更新基本取決于這段時間內(nèi)的響應(yīng)延時比,因此實驗結(jié)果證實了響應(yīng)延時是吞吐量的決定性因素,而且響應(yīng)延時基本比較穩(wěn)定,所以基本通過響應(yīng)延時比來更新γ參數(shù)可以獲得更快的穩(wěn)定速度,因此本文隨后的測試中取θ為0.9。
圖12 動態(tài)協(xié)同CPU/GPU混合矩陣乘法Topology結(jié)構(gòu)
圖13 θ取不同值時調(diào)控參數(shù)γ的變化
圖14 θ取不同值情況下穩(wěn)定時的平均吞吐量變化
在θ取值為0.9、矩陣維度分別為32×32和128×128的情況下,分別測試只存在并行度為5的CPU Bolt、只存在并行度為5 的GPU Bolt和CPU/GPU同時存在的3種矩陣乘法Topology的性能,用于驗證自適應(yīng)分發(fā)決策機制的性能增益。圖15顯示了3種結(jié)構(gòu)的Topology的吞吐量的比較,可以看到協(xié)同調(diào)度的吞吐量幾乎是單獨的CPU和單獨的GPU的吞吐量的總和,CPU和GPU的硬件計算能力都得到了充分的利用,達(dá)到了自適應(yīng)分發(fā)決策機制所設(shè)計實現(xiàn)的H-Storm協(xié)同計算的目標(biāo)。
圖15 不同矩陣維度下3種Topology的吞吐量比較
本文根據(jù)目前大數(shù)據(jù)密集型計算的復(fù)雜性,提出將GPU資源納入集群中進(jìn)行關(guān)鍵節(jié)點加速和協(xié)同計算的技術(shù),并基于Apache Storm實時流處理系統(tǒng)設(shè)計H-Storm異構(gòu)計算平臺。首先通過MPS特性實現(xiàn)集群中GPU資源的發(fā)現(xiàn)和量化,進(jìn)而提出H-Storm異構(gòu)集群的任務(wù)調(diào)度策略,實現(xiàn)綜合考慮GPU性能及負(fù)載的任務(wù)調(diào)度算法和協(xié)同計算下自適應(yīng)的流分發(fā)決策機制,最后使用矩陣計算用例進(jìn)行了參數(shù)調(diào)優(yōu)和性能測試。實驗結(jié)果表明,相對于原生的Storm平臺,本文實現(xiàn)的H-Storm平臺在512×152矩陣乘法用例下可以達(dá)到54.9倍的吞吐量提升和77倍的響應(yīng)延時下降。H-Storm在實時密集型流計算領(lǐng)域有很大的優(yōu)勢,如何在Storm中動態(tài)衡量作業(yè)對GPU資源的消耗從而進(jìn)一步優(yōu)化調(diào)度算法,以及如何在H-Storm平臺上運行深度學(xué)習(xí)將是下一步的研究重點。
[1] 孫大為,張廣艷,鄭緯民.大數(shù)據(jù)流式計算:關(guān)鍵技術(shù)及系統(tǒng)實例[J].軟件學(xué)報,2014,25(4):839-862.
[2] 吳恩華,柳有權(quán).基于圖形處理器(GPU)的通用計算[J].計算機輔助設(shè)計與圖形學(xué)學(xué)報,2004,16(5):601-612.
[3] TOSHNIWAL A,TANEJA S,SHUKLA A,et al.Storm@twitter[C]//Proceedings of 2014 ACM SIGMOD International Conference on Management of Data.New York,USA:ACM Press,2014:147-156.
[4] ANIELLO L,BALDONI R,QUERZONI L.Adaptive online scheduling in Storm[C]//Proceedings of the 7th ACM International Conference on Distributed Event-based Systems.New York,USA:ACM Press,2013:207-218.
[5] PENG B,HOSSEINI M,HONG Z,et al.R-storm:resource-aware scheduling in Storm[C]//Proceedings of the 16th Annual Middleware Conference.New York,USA:ACM Press,2015:149-161.
[6] ESKANDARI L,HUANG Z,EYERS D.P-scheduler:adaptive hierarchical scheduling in apache Storm[C]//Proceedings of Australasian Computer Science Week Multi Conference.New York,USA:ACM Press,2016:26-32.
[7] XU J,CHEN Z,TANG J,et al.T-storm:traffic-aware online scheduling in storm[C]//Proceedings of the 34th IEEE International Conference on Distributed Computing Systems.Washington D.C.,USA:IEEE Press,2014:535-544.
[8] ZHANG W,XU L,LI Z,et al.A deep-intelligence framework for online video processing[J].IEEE Software,2016,33(2):44-51.
[9] RAO L B,ELAYARAJA C.Image analytics on big data in motion-implementation of image analysis CCL in Apache Kafka and Storm[J].WIT Transactions on Engineering and Sciences,2015,3(3):12-16.
[10] 韓 杰,陳耀武.基于Storm平臺的實時視頻分析系統(tǒng)[J].計算機工程,2015,41(12):26-29,35.
[11] CHEN Z,XU J,TANG J,et al.G-Storm:GPU-enabled high-throughput online data processing in Storm[C]//Proceedings of International Conference on Big Data.Washington D.C.,USA:IEEE Press,2015:307-312.
[12] ZHU J,LI J,HARDESTY E,et al.GPU-in-hadoop:enabling MapReduce across distributed heterogeneous platforms[C]//Proceedings of the 13th IEEE/ACIS International Conference on Computer and Information Science.Washington D.C.,USA:IEEE Press,2014:321-326.
[13] ZHENG H X,WU J M.Accelerate K-means algorithm by using GPU in the hadoop framework[C]//Proceedings of International Conference on Web-Age Information Management.Berlin,Germany:Springer,2014:177-186.
[14] RENNICH S.CUDA C/C++ streams and concurrency[C]//Proceedings of IEEE NVIDIA’11.Washington D.C.,USA:IEEE Press,2011:125-134.
[15] Nvidia Corp.Multi-process service(MPS) overview[EB/OL].[2016-12-02].https://docs.nvidia.com/deploy/mps/index.html.
[16] Nvidia Corp.CUDA toolkit documentation v8.0[EB/OL].[2016-12-02].https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#compute-capability.