顏曉蓮,章 剛,邱曉紅
(1.江西理工大學(xué) 軟件工程學(xué)院(南昌),江西 南昌 330013;2.江西北大科技園,江西 南昌 330013)
分布式消息系統(tǒng)作為分布式系統(tǒng)重要的模塊間消息傳遞組件,利用可靠、高效的與平臺(tái)無(wú)關(guān)的消息傳遞與分發(fā),可實(shí)現(xiàn)分布式系統(tǒng)內(nèi)部解耦以及分布式系統(tǒng)各模塊的有效集成,因而受到業(yè)界高度關(guān)注[1]。
ApacheKafka[2-5]是當(dāng)前較為主流、基于發(fā)布-訂閱機(jī)制的高吞吐量分布式消息系統(tǒng),前期由LinkedIn開(kāi)發(fā)后由Apache基金管理并開(kāi)源。其優(yōu)勢(shì)包括:(1)支持上層應(yīng)用端多語(yǔ)言開(kāi)發(fā),如C#、JAVA、PHP、Python、Ruby等;(2)支持與平臺(tái)無(wú)關(guān)的消息傳遞與分發(fā);(3)支持準(zhǔn)實(shí)時(shí)性的大規(guī)模消息處理;(4)支持on-line水平擴(kuò)展。相對(duì)其他消息系統(tǒng),Kafka憑借眾多技術(shù)優(yōu)勢(shì),已在各行業(yè)企業(yè)級(jí)應(yīng)用中普及。
在Kafka中,多個(gè)Broker(服務(wù)器)組成Kafka集群,并被ZooKeeper集中管理。Producer為消息生產(chǎn)者,Consumer為消息消費(fèi)者,Kafka將每個(gè)新產(chǎn)生的消息進(jìn)行劃分并歸類(lèi)到某個(gè)主題Topic中(Topic可理解為邏輯存儲(chǔ)單元)。每個(gè)Topic被劃為多個(gè)分區(qū)Partition(Partition可理解為實(shí)際存儲(chǔ)單元),這些分區(qū)Partition按某種規(guī)則均勻地部署到多個(gè)Broker上。根據(jù)Kafka系統(tǒng)定義和推薦,Producer生產(chǎn)的消息,依據(jù)Hash算法被分發(fā)至其所屬Topic相應(yīng)的Partition上。
Consumer作為消費(fèi)者訂閱其關(guān)注的主題Topic(可訂閱多個(gè)),Kafka按Range策略(即均勻分配)將Consumer分配至其關(guān)注Topic下眾多Partition之一上,由該P(yáng)artition作為服務(wù)接入端,并依次消費(fèi)其關(guān)注的主題Topic下所有Partition的感興趣消息。當(dāng)訂閱流量或分發(fā)消息數(shù)量增加時(shí),Kafka可通過(guò)配置文件管理增加Partition數(shù)量,實(shí)現(xiàn)on-line水平擴(kuò)展從而提升系統(tǒng)性能與吞吐量。
伴隨大數(shù)據(jù)時(shí)代來(lái)臨,各行各業(yè)對(duì)大數(shù)據(jù)技術(shù)的需求越發(fā)強(qiáng)烈。Kafka作為消息中間件,不僅在分布式系統(tǒng)中扮演重要角色,同時(shí)也已成為大數(shù)據(jù)流處理框架Apache Samza的核心組件之一。但隨著Kafka應(yīng)用的多樣化,其自身的一些不足逐漸顯現(xiàn)。
其中不足之一便是Partition過(guò)載問(wèn)題(Partition overload problem,POP)。Partition在Kafka中扮演承上啟下角色,上連消息生產(chǎn)者Producer,下連消息消費(fèi)者Consumer,Partition服務(wù)性能決定著B(niǎo)roker及Kafka整體性能,對(duì)POP問(wèn)題研究將為后期優(yōu)化Broker、Consumer乃至Kafka整個(gè)系統(tǒng)性能打下基礎(chǔ)。
在綜合衡量已有研發(fā)成果及文中所關(guān)注的重心的基礎(chǔ)上,認(rèn)為POP問(wèn)題指消息分發(fā)、消息存儲(chǔ)或消息消費(fèi)、消息訂閱等操作造成主題Topic下Partition過(guò)度服務(wù),并影響到支撐Partition的實(shí)際物理載體Broker的性能。
通常而言,在大型商業(yè)應(yīng)用影響下,某時(shí)刻會(huì)造成某個(gè)(或多個(gè))主題Topic源源不斷地涌入新消息,并依據(jù)Hash算法向其Partition分發(fā),此時(shí)Partition不僅要處理消息存儲(chǔ)還要處理Consumer的服務(wù)請(qǐng)求,當(dāng)新消息數(shù)量達(dá)到某閾值時(shí),必將導(dǎo)致Partition過(guò)載,而這將影響到Partition的物理載體Broker的性能。
雖然,Kafka可通過(guò)配置文件增加Partition數(shù)量,緩解Partition過(guò)載現(xiàn)象出現(xiàn),但依然存在如下問(wèn)題:(1)這種由人為主觀判定及人為修改的方式,不僅準(zhǔn)確度無(wú)法保證而且極為僵化;(2)Partition文件配置管理與基于Hash算法的消息分發(fā)相互獨(dú)立、相互分離,無(wú)法根據(jù)Partition實(shí)際情況建立協(xié)同工作機(jī)制。這些問(wèn)題的存在,已使得Kafka無(wú)法滿足當(dāng)前多樣化應(yīng)用需求。
當(dāng)前有關(guān)Kafka中Partition過(guò)載問(wèn)題討論極為少見(jiàn)。研究成果較為常見(jiàn)的包括:(1)ZooKeeper集中管理機(jī)制[6-7],主要討論業(yè)務(wù)復(fù)雜化后,Broker、Consumer、Consumer Group等注冊(cè)管理,Topic與Broker映射關(guān)系以及Partition分配等管理機(jī)制,有助于提升系統(tǒng)整體效率;(2)Broker[8-9]負(fù)載均衡,主要討論虛擬化背景下Broker如何實(shí)現(xiàn)接入負(fù)載均衡,有助于提升Broker資源利用率;(3)Consumer[10-11]負(fù)載均衡,主要討論大規(guī)模數(shù)據(jù)處理環(huán)境下,傳統(tǒng)Kafka易造成的高開(kāi)銷(xiāo)、高誤差率等問(wèn)題,有助于降低系統(tǒng)耗能、提升服務(wù)質(zhì)量。這些雖都對(duì)Kafka系統(tǒng)實(shí)現(xiàn)優(yōu)化,但都無(wú)法解釋Partition過(guò)載問(wèn)題。
針對(duì)此,提出一種改進(jìn)型Partition負(fù)載優(yōu)化算法(IPOOA算法),該算法實(shí)現(xiàn)消息分發(fā)預(yù)測(cè)以及消息分發(fā)與文件配置管理協(xié)同,從而可有效緩解Partition過(guò)載問(wèn)題出現(xiàn)。
算法思想:新消息產(chǎn)生后,IPOOA算法先根據(jù)實(shí)際業(yè)務(wù)提取業(yè)務(wù)關(guān)鍵字Key,依據(jù)Hash分發(fā)規(guī)則計(jì)算分發(fā)至Partition,接著算法評(píng)估該P(yáng)artition的即時(shí)服務(wù)耗量,如果即時(shí)服務(wù)耗量在閾值范圍內(nèi),則新消息被分發(fā)至該P(yáng)artition,否則算法依次計(jì)算與該P(yáng)artition相似度較高的候選Partition,并評(píng)估候選Partition的即時(shí)服務(wù)耗量,如果滿足閾值范圍,則新消息被分發(fā)至候選Partition,否則重復(fù)計(jì)算候選Partition,直至迭代次數(shù)超過(guò)半數(shù)Partition總量。如果依然沒(méi)有完成消息分發(fā)任務(wù),則通知Kafka自動(dòng)修改配置文件新增Partition并存儲(chǔ)新消息,從而能夠有效緩解Partition過(guò)載。
按照Kafka的定義,消息分發(fā)機(jī)制共包括Hash分發(fā)、隨機(jī)分發(fā)以及輪詢分發(fā)等(如圖1所示),實(shí)際中企業(yè)級(jí)應(yīng)用使用范圍較廣的是Hash分發(fā)機(jī)制。
(a)Hash分發(fā)機(jī)制
(b)隨機(jī)分發(fā)機(jī)制
(c)輪詢分發(fā)機(jī)制
該機(jī)制大致過(guò)程如下:
Step1:指定消息的Key(通常選取實(shí)際業(yè)務(wù)所含關(guān)鍵字符);
Step2:基于Key實(shí)現(xiàn)Hash(Key);
Step3:根據(jù)mod(Hash(key))結(jié)果將消息分發(fā)至指定Partition;
Step4:返回Step1。
Hash分發(fā)機(jī)制相對(duì)其余兩種方式,其能夠較好地保證消息均勻有序分發(fā),因而被行業(yè)廣泛普及使用。但Hash消息分發(fā)無(wú)法根據(jù)Partition實(shí)際負(fù)載情況進(jìn)行有序分發(fā),從而易加重Partition負(fù)載。
1.2.1 即時(shí)服務(wù)耗量(instant service consumption,ISC)
ISC反映當(dāng)前t時(shí)刻Partition中消息消費(fèi)產(chǎn)生的服務(wù)消耗量,對(duì)任意消息k而言,在時(shí)刻t產(chǎn)生的服務(wù)消耗量Cmt由t時(shí)刻消息k訂閱數(shù)CmtNum及t時(shí)刻消息k訪問(wèn)連接數(shù)CmtCon線性加權(quán)組成,如式(1):
Cmtt(k)=λ1CmtNumt(k,N1)+λ2CmtCont(k,N2)
(1)
其中,λ1∈(0,1)和λ2∈(0,1)為權(quán)重系數(shù),N1和N2分別為訂閱總數(shù)和連接總數(shù)。
t時(shí)刻Partition的ISC可表示為:
(2)
1.2.2 Partition相似度(partition similarity,PS)
PS反映某一時(shí)刻兩個(gè)Partition所存消息的相似程度,對(duì)任意Partition而言,在時(shí)刻t所存儲(chǔ)消息隊(duì)列表示為Partitiont={Meg1,Meg2,…,MegNUM},NUM為消息總數(shù)。則時(shí)刻t任意兩個(gè)Partition的PS可根據(jù)加權(quán)閔可夫斯基距離(Minkowski distance)計(jì)算,如式(3):
其中,p≥1為指數(shù)參數(shù),θ∈(0,1)為權(quán)重系數(shù)。
1.2.3 算法過(guò)程
Step1:初始化Partition配置文件,載入Kafka系統(tǒng)中,并設(shè)置各類(lèi)參數(shù)λ1,λ2,p,θ以及Θ(Cmt閾值),設(shè)定迭代次數(shù),轉(zhuǎn)入Step2;
Step2:等待新消息導(dǎo)入,并根據(jù)Hash分發(fā)算法計(jì)算其分發(fā)至Partition,轉(zhuǎn)入Step3;
Step3:根據(jù)式(1)、式(2)計(jì)算該P(yáng)artition的ISC值,轉(zhuǎn)入Step4;
Step4:判定該P(yáng)artition的ISC值是否滿足閾值Θ,如果滿足則新消息存儲(chǔ)并轉(zhuǎn)入Step2.;否則轉(zhuǎn)入Step5;
Step5:根據(jù)式(3)依次計(jì)算該P(yáng)artition與候選Partition的PS值,并挑選出最優(yōu)PS值,轉(zhuǎn)入Step3;
Step6:如果迭代次數(shù)超過(guò)半數(shù)Partition總量,則通知Kafka自動(dòng)修改配置文件新增Partition,并將新消息存儲(chǔ)在新增Partition上,根據(jù)實(shí)際情況轉(zhuǎn)入Step2或轉(zhuǎn)入Step7;
Step7:退出算法。
軟硬件環(huán)境:選取12個(gè)Broker(服務(wù)器)作為Kafka集群,CPU型號(hào)為Xeon E5-2620V3,內(nèi)存8G,SATA硬盤(pán)300G,操作系統(tǒng)為SUSE Linux Enterprise Server 15。
核心參數(shù)設(shè)置:在綜合考慮文獻(xiàn)對(duì)參數(shù)取值的建議和基于多次重復(fù)實(shí)驗(yàn)的結(jié)果,參數(shù)設(shè)定如下:p=1 ORp=2,λ1,λ2∈(0.35,0.65),θ1,θ2,…∈(0.1,0.9),Θ∈[0.5,0.65],其中實(shí)驗(yàn)中所有權(quán)重系數(shù)之和都為1。
場(chǎng)景模擬:12個(gè)Broker服務(wù)器分成3個(gè)功能區(qū),其中3個(gè)服務(wù)器作為Producer消息生產(chǎn)者不斷模擬分發(fā)消息,3個(gè)服務(wù)器作為Consumer消息消費(fèi)者不斷模擬消費(fèi)消息,Producer與Consumer隨機(jī)分布在不同區(qū)域,另外6個(gè)服務(wù)器作Kafka集群服務(wù)器集中管理,處理Producer消息分發(fā)以及Consumer消息消費(fèi)[12-15]。
對(duì)比算法:為展示實(shí)驗(yàn)的客觀性,分別選取傳統(tǒng)Kafka算法[2-5],基于Broker負(fù)載均衡的BL算法[8]和基于Consumer負(fù)載均衡的CL算法[10]與融合文中IPOOA算法的Kafka相比較。
測(cè)試指標(biāo):為體現(xiàn)實(shí)驗(yàn)的全面性,將從多個(gè)維度驗(yàn)證算法的性能:(1)Kafka集群CPU使用率(Kafka CPU rate,KCR);(2)Kafka服務(wù)延時(shí)率(Kafka service delay rate,KSDR);(3)Kafka系統(tǒng)收斂延時(shí)比(Kafka system convergence delay rate,KSCDR)。
實(shí)驗(yàn)方案:
實(shí)驗(yàn)1:在并發(fā)規(guī)模為2 000環(huán)境下,KCR、KSDR及KSCDR對(duì)比如表1所示。
表1 在并發(fā)規(guī)模為2 000環(huán)境下,4種算法的KCRKSDRKSCDR對(duì)比 %
實(shí)驗(yàn)2:在并發(fā)規(guī)模為3 500環(huán)境下,KCR、KSDR及KSCDR對(duì)比如表2所示。
表2 在并發(fā)規(guī)模為3 500環(huán)境下,4種算法的KCRKSDRKSCDR對(duì)比 %
實(shí)驗(yàn)3:在并發(fā)規(guī)模為5 000環(huán)境下,KCR、KSDR及KSCDR對(duì)比如表3所示。
表3 在并發(fā)規(guī)模為5 000環(huán)境下,4種算法的KCRKSDRKSCDR對(duì)比 %
實(shí)驗(yàn)總結(jié):在并發(fā)規(guī)模逐漸增加下,融合文中算法的Kafka系統(tǒng)(IPOOA_Kafka)在各項(xiàng)指標(biāo)層面相對(duì)較優(yōu),主要原因在于IPOOA_Kafka能夠?qū)崿F(xiàn)預(yù)測(cè)消息分發(fā)以及消息分發(fā)與文件配置協(xié)同工作,從而能緩解Partition過(guò)載問(wèn)題出現(xiàn),提升系統(tǒng)整體性能。
針對(duì)Kafka中Partition文件配置管理所存在的被動(dòng)、僵化及孤立等不足,使得Partition過(guò)載問(wèn)題無(wú)法有效解決,提出一種改進(jìn)型Partition過(guò)載優(yōu)化算法。該算法通過(guò)即時(shí)服務(wù)耗量,Partition相似度和配置文件自動(dòng)修改相結(jié)合,實(shí)現(xiàn)消息分發(fā)預(yù)測(cè)以及消息分發(fā)與文件配置管理協(xié)同,從而可有效緩解Partition過(guò)載問(wèn)題出現(xiàn)。實(shí)驗(yàn)從Kafka集群CPU使用率、Kafka服務(wù)延時(shí)率、Kafka系統(tǒng)收斂延時(shí)比等幾個(gè)方面驗(yàn)證了算法的有效性及合理性。未來(lái)將重點(diǎn)圍繞消息分發(fā)、消息訂閱及文件配置管理等多層面協(xié)同展開(kāi)研究。