馬浩然
摘要:在數(shù)據(jù)已滲透到我們生活的各個領(lǐng)域的時代,人們對于數(shù)據(jù)的挖掘和使用愈發(fā)頻繁。作為以消息為單位進(jìn)行數(shù)據(jù)共享的分布式架構(gòu),分布式消息系統(tǒng)成為數(shù)據(jù)處理的核心技術(shù)。傳統(tǒng)的分布式消息系統(tǒng)大多用于處理數(shù)據(jù)量小的關(guān)鍵性數(shù)據(jù),然而在信息劇增的今天,人們對信息的關(guān)注領(lǐng)域在不斷擴(kuò)大,挖掘的信息量在不斷增多,傳統(tǒng)的消息處理架構(gòu)已不能滿足我們對數(shù)據(jù)的處理需求,一個高吞吐量,可實時消費的高性能分布式消息系統(tǒng)成為必需。Kafka即是一種處理海量數(shù)據(jù)的分布式消息系統(tǒng)。本文總結(jié)了Kafka系統(tǒng)的特征和架構(gòu)策略,對其進(jìn)行抽象建模,通過網(wǎng)絡(luò)仿真工具NS3,設(shè)計實際系統(tǒng)的場景部署,最后運行仿真系統(tǒng),得出數(shù)據(jù)并分析,以幫助我們理解和評估Kafka分布式消息系統(tǒng)。
關(guān)鍵詞:計算機軟件;分布式消息系統(tǒng);卡夫卡;網(wǎng)絡(luò)仿真模擬器
中圖分類號:TP311.5
文獻(xiàn)標(biāo)識碼:A
1 相關(guān)背景及技術(shù)
1.1分布式消息系統(tǒng)的概念
分布式系統(tǒng)是指分散的物理機通過互聯(lián)網(wǎng)連接建立起的一套軟件系統(tǒng),具有高度的內(nèi)聚性和透明性。分布式環(huán)境中需要進(jìn)行大量,高效,可靠的數(shù)據(jù)傳輸,而不同平臺之間協(xié)議的多樣性,不兼容性提高了分布式交互的復(fù)雜度。因此,能在客戶端和服務(wù)端提供同步和異步的連接,實現(xiàn)應(yīng)用程序之間的協(xié)同,保證不同平臺之間高效通信的消息中間件機制得以采用。綜上所述,基于消息中間件機制的分布式架構(gòu)即稱為分布式消息系統(tǒng)。
1.2分布式消息系統(tǒng)的發(fā)展
消息中間件機制的不同,決定了分布式消息系統(tǒng)架構(gòu)迥然而異,最直接且關(guān)鍵的影響是消息處理模式的不同。在分布式系統(tǒng)發(fā)展初期,消息的傳遞采用的是點對點的通道模式,即發(fā)送方處理消息時需明確注明接收方的地址,盡管發(fā)送方和接收方是松耦合連接,相互通信不需要保持同步,但過于依賴地址和通道,使得系統(tǒng)不夠靈活,難以擴(kuò)展尤其是消息應(yīng)用面向企業(yè)級發(fā)展后,數(shù)據(jù)集遠(yuǎn)遠(yuǎn)擴(kuò)大,點對點模式的更加暴露了點對點通道模式的局限性。因此,消息中間件開始向發(fā)布/訂閱模式轉(zhuǎn)變,并逐漸成為目前消息處理的一種核心模式。與點對點模式不同,發(fā)布/訂閱模式中的發(fā)送方并不將消息發(fā)送給特定的接收方,而是將消息分類發(fā)送給消息代理方,接收方通過與代理方通信,接收自己感興趣的消息。即消息的“發(fā)布者”與“訂閱者”并不直接關(guān)聯(lián),這種發(fā)送方與接收方的解耦增強了系統(tǒng)的可擴(kuò)展性。目前比較典型的發(fā)布/訂閱中間件包括Microsoft MSMQ,RabbitMQ,ActiveMQ,以及Kafka等等。
1.3什么是Kafka?
Kafka由社交網(wǎng)站Linkedin開發(fā),為系統(tǒng)日志的實時處理提供數(shù)據(jù)“管道”。Kafka采用的是發(fā)布/訂閱的消息處理模式,用于低延時環(huán)境下收集和提交海量日志數(shù)據(jù),且適用于實時和離線的消息處理。
作為一個相對新穎的分布式發(fā)布/訂閱消息系統(tǒng),Kafka有著自己獨特設(shè)計策略:1吐吞量,是Kafka最關(guān)鍵的特性,Kafka設(shè)計初衷就是用來處理海量的系統(tǒng)日志;2持久化消息存儲,對于海量且安全性不高的消息,考慮開銷代價,Kafka采用的是本地文件系統(tǒng)的存儲方式,且存儲設(shè)計采用高效的Partition機制。3Pull模型,Kafka采用消費者主動從代理獲取消息的“拉”模型消費機制,消費狀態(tài)保存在消費端,而不在服務(wù)端。4基于zookeeper的負(fù)載均衡,Kafka使用分布式協(xié)調(diào)服務(wù)zookeeper來管理和平衡客戶端負(fù)載。
1.4網(wǎng)絡(luò)仿真工具NS3
研究網(wǎng)絡(luò)系統(tǒng)必然需要實際的網(wǎng)絡(luò)環(huán)境,但實現(xiàn)真實的網(wǎng)絡(luò)系統(tǒng)往往代價很高,尤其是分布式系統(tǒng)。因此,網(wǎng)絡(luò)仿真就成為我們首選方法。所謂網(wǎng)絡(luò)仿真就,就是使用計算機程序?qū)W(wǎng)絡(luò)通信進(jìn)行模型抽象,模仿真實網(wǎng)絡(luò)的特征和行為,并通過程序的運行得出可靠的數(shù)據(jù),為研究提供分析和驗證。
當(dāng)前有許多優(yōu)秀的網(wǎng)絡(luò)仿真軟件,本文中采用的是NS3(Network Simulator 3)。NS3是一種面向網(wǎng)絡(luò)系統(tǒng)的離散事件仿真軟件,由C++和Python語言編寫,適用于Linux,Unix等多種操作系統(tǒng)。它包含了網(wǎng)絡(luò)組件的模擬接口,如網(wǎng)絡(luò)傳輸協(xié)議,通信媒介,socket服務(wù),客戶端/服務(wù)端應(yīng)用程序等;事件調(diào)度器,以供執(zhí)行相關(guān)事件,用來模型實際中的通信“行為”;以及基于文本的跟蹤日志,非常方便仿真結(jié)果的分析。
本文使用NS3仿真工具對分布式消息系統(tǒng)Kafka進(jìn)行抽象建模,模擬出現(xiàn)實網(wǎng)絡(luò)通信場景;通過應(yīng)用程序?qū)崿F(xiàn)消息的生產(chǎn)者,消費者和代理者,消息數(shù)據(jù)的設(shè)計,基于主題的分類方法,基于partition的存儲策略,基于隊列的發(fā)送和接收方式,基于zookeeper的調(diào)度管理和負(fù)載均衡策略等等;通過不同的參數(shù)模擬不同場景的運行狀況,得出數(shù)據(jù)并進(jìn)行分析。
2 仿真建模
2.1架構(gòu)設(shè)計
闡明一個分布式系統(tǒng)首先需解釋它的物理拓?fù)浜瓦壿嬐負(fù)洹N锢硗負(fù)涿枋龅氖窍到y(tǒng)的各個部分相互連接而成的結(jié)構(gòu)。邏輯拓?fù)浞从车南到y(tǒng)各個部分的職能區(qū)別和交互關(guān)系。
本文實現(xiàn)的仿真系統(tǒng)采用的是星形物理拓?fù)浣Y(jié)構(gòu),即使用一個“全局路由”作為中心節(jié)點,網(wǎng)絡(luò)中的其它節(jié)點均與中心節(jié)點連接,任意兩個節(jié)點間的通信均要通過中心節(jié)點,發(fā)送消息需先發(fā)送到中心節(jié)點,中心節(jié)點再負(fù)責(zé)將消息轉(zhuǎn)發(fā)至目的節(jié)點,如圖1所示。
NS3工具封裝了節(jié)點類,以代替實際網(wǎng)絡(luò)中的主機,它模擬了現(xiàn)實中的網(wǎng)卡設(shè)備,協(xié)議棧,驅(qū)動程序,IP地址等功能,以及作為中心節(jié)點的“全局路由”節(jié)點。本文通過使用這些模板類來搭建仿真系統(tǒng)的物理模型。
本文所述的邏輯拓?fù)浞磻?yīng)的是研究對象Kafka的架構(gòu)策略。Kafka是一個基于主題分類的發(fā)布/訂閱系統(tǒng),包括消息的生產(chǎn)者(producer),消費者(consumer),代理者(broker)和管理者(zookeeper)四個主體,生產(chǎn)者和消費者與代理者分別進(jìn)行消息傳輸,其行為稱作“發(fā)布”和“訂閱”,代理者提供相關(guān)的存儲介質(zhì)和存儲策略,負(fù)責(zé)消息的持久化存儲和轉(zhuǎn)發(fā)。管理者負(fù)責(zé)協(xié)調(diào),分配其它三個主體之間的交互,保證系統(tǒng)的處于平衡狀態(tài)。如圖2所示。
NS3工具封裝了包類,socket服務(wù)類和應(yīng)用程序類。本文使用包類模擬實際網(wǎng)絡(luò)中的數(shù)據(jù)載體,即消息;socket類模擬實際網(wǎng)絡(luò)的發(fā)送,接收;應(yīng)用程序類是本系統(tǒng)的關(guān)鍵所在,我們用它來模擬Kafka的“參與者”及其設(shè)計思想,如基于partition存儲策略的代理者,基于Pull模型的消費者,基于zookeeper進(jìn)行調(diào)度管理和負(fù)載均衡的管理者等。
2.2細(xì)節(jié)實現(xiàn)
基于前兩章所述,本文在NS3仿真工具下實現(xiàn)以下仿真系統(tǒng),本節(jié)將對系統(tǒng)的關(guān)鍵模塊和實現(xiàn)策略進(jìn)行詳細(xì)描述。
2.2.1創(chuàng)建物理拓?fù)?/p>
上節(jié)已經(jīng)提過仿真系統(tǒng)物理拓?fù)涞膶崿F(xiàn)方法,這里使用NS3節(jié)點類,設(shè)置三個節(jié)點容器,分別儲存生產(chǎn)者,消費者和代理者節(jié)點。用戶可任意添加每種節(jié)點的數(shù)目,且為每個節(jié)點添加虛擬網(wǎng)卡,傳輸協(xié)議,IP地址等,保證節(jié)點間的正常通信。其中影響系統(tǒng)性能的兩個關(guān)鍵屬性:節(jié)點與節(jié)點間的傳輸速率和延遲。
2.2.2數(shù)據(jù)載體一消息
實際網(wǎng)絡(luò)通過包的形式封裝數(shù)據(jù)進(jìn)行收發(fā),NS3使用了同樣的設(shè)計思想。每一個網(wǎng)絡(luò)包代表一條消息,仿真中一個消息包含兩個組成部分,真實數(shù)據(jù)和元數(shù)據(jù)。與真實網(wǎng)絡(luò)不同的是,在仿真中使用的“真實數(shù)據(jù)”實際上是一個虛擬的零字節(jié)緩存,并不占據(jù)內(nèi)存空間,僅僅代表一條消息的負(fù)載大?。辉獢?shù)據(jù)是用來描述真實數(shù)據(jù)信息的數(shù)據(jù),盡管它不是我們需要消費的信息,但對我們至關(guān)重要,在運行過程中起著解釋和控制的作用,這也是本文關(guān)于消息設(shè)計的關(guān)鍵所在。本系統(tǒng)通過繼承標(biāo)簽基類設(shè)計出一個消息標(biāo)簽MsgTag,它包含三部分基本信息:1)真實數(shù)據(jù)信息;如消息主題,消息編號,消息大小。2)位置信息。如生產(chǎn)者序號,所屬partition序號,消費偏移量值。3)時間信息。每條消息的生產(chǎn)時間,發(fā)布時間,被請求消費時間,獲得消費時間等,這里的“時間”指的是NS3仿真模擬器控制的離散時間軸的上某一時間點。在仿真系統(tǒng)中,應(yīng)用程序負(fù)責(zé)維護(hù)以上標(biāo)簽信息,并根據(jù)它們控制程序的進(jìn)度和方向。
2.2.3生產(chǎn)者Producer
通過繼承應(yīng)用程序基類設(shè)計生產(chǎn)者模型。生產(chǎn)者首先依據(jù)參數(shù)生產(chǎn)相關(guān)主題,數(shù)量和大小的消息,其中消息大小采用指定范圍隨機數(shù);然后生產(chǎn)者向管理者“詢問”可用代理,獲取發(fā)布目的地址,調(diào)用底層Socket服務(wù)與之連接;最后生產(chǎn)者將消息加入發(fā)送隊列,并設(shè)置發(fā)送時間間隔,將消息發(fā)送給代理。
2.2.4消息代理Broker
代理者模型同樣通過繼承應(yīng)用程序基類實現(xiàn),它的主要功能包括消息的接收和存儲兩部分。
與真實網(wǎng)絡(luò)的socket服務(wù)一樣,NS3仿真系統(tǒng)的socket也會將超過指定大小的消息進(jìn)行拆分,分別進(jìn)行發(fā)送和接收,所以在接收方需要將被拆分的“碎片包”進(jìn)行重組。NS3沒有提供相關(guān)的組包方法,但提供了可用的字節(jié)標(biāo)簽接口,字節(jié)標(biāo)簽標(biāo)記了每個包的拆分位置。Broker模塊采取隊列的形式來接收包,通過字節(jié)標(biāo)簽來判斷“碎片包”是否為同一個包,并進(jìn)行重組。
Kafka依賴于本地的文件系統(tǒng)進(jìn)行持久化存儲。且存儲策略基于partition機制,即每個話題(Topic)分為若干個partition,每個partition分為若干個segment,每個segment存儲若干條消息。代理接收到消息后會依次順序添加至segment文件,且每條消息使用位偏移量進(jìn)行記錄?;谶@些設(shè)計思想,Broker模型設(shè)計持久化存儲采用了Map和Vector嵌套的數(shù)據(jù)結(jié)構(gòu),其中Topic是以主題和Partition為鍵值對的Map結(jié)構(gòu),partition和segment分別為vector結(jié)構(gòu)。此外,為Broker添加一個負(fù)載等級屬性,它會根據(jù)Broker存儲的消息數(shù)量進(jìn)行更改,反應(yīng)了每個消息代理的空間負(fù)載程度。
2.2.5消費者Consumer
消費者模型同樣繼承應(yīng)用程序基類,它的主要功能包括發(fā)送請求,接收并存儲消息,消息數(shù)據(jù)的分析。
上文提到Kafka的消息分類基于主題,存儲基于partition和segment,記錄基于位偏移量offset,因此消費者模型的消費思路為:將需要消費信息的主題和位偏移量發(fā)送給管理者,管理者根據(jù)主題尋找可消費的Broker,根據(jù)位偏移量尋找消息的partition和segment位置,并將結(jié)果返回給消費者,消費者根據(jù)收到的結(jié)果與對應(yīng)Broker通信,取回消息。這個過程體現(xiàn)了Kafka的其中一個設(shè)計思想:Pull模型。
消費者接收和存儲消息采用了Broker模型的組包算法和存儲結(jié)構(gòu)。但在本系統(tǒng)的消費者模型中,我們更加關(guān)心消費結(jié)果,即一條消息從生產(chǎn)者產(chǎn)生,經(jīng)Broker存儲,最后由消費者消費的過程中消息發(fā)生的變化,它體現(xiàn)在上文提到的消息元數(shù)據(jù)中。NS3提供了時間戳接口,消息在關(guān)鍵的生產(chǎn),存儲,消費等關(guān)鍵動作時,使用此接口方法為其添加對應(yīng)時間軸點的時間戳,并存儲在消息元數(shù)據(jù)中。當(dāng)消費者消費一條消息后,可以從其元數(shù)據(jù)的時間戳屬性中得到此消息的生產(chǎn),存儲時和消費時間等等,通過對比這些信息,我們可以對系統(tǒng)的功能和性能進(jìn)行評估和進(jìn)一步研究。
2.2.6管理者Zookeeper
Zookeeper是一個針對大型分布式系統(tǒng)的協(xié)調(diào)服務(wù),Kafka使用它來協(xié)調(diào)控制分布式網(wǎng)絡(luò)中各個節(jié)點的通信,維護(hù)系統(tǒng)的負(fù)載均衡,本系統(tǒng)通過繼承應(yīng)用程序基類模擬Zookeeper。它的功能包括兩大部分:1)維護(hù)系統(tǒng)信息。這里使用了Map嵌套結(jié)構(gòu)生成一個節(jié)點信息表和一個代理消息存儲表。每個生產(chǎn)者,代理者和消費者節(jié)點被創(chuàng)建時都會在節(jié)點信息表中注冊基本信息,如節(jié)點名稱,編號,IP地址,運行狀態(tài)等等;Broker在存儲消息時會在代理消息存儲表中注冊每條消息的位置信息,如Broker序號與主題的對應(yīng)關(guān)系,每個主題下的partiton和segment與消息的對應(yīng)關(guān)系。2)協(xié)調(diào)控制節(jié)點間的通信。這里包含兩個重要算法,一是生產(chǎn)者發(fā)布消息時對代理的選擇,zookeeper模塊通過對比代理的負(fù)載等級選取負(fù)載最輕的代理節(jié)點返回給發(fā)布者,這樣可以保證代理系統(tǒng)的空間負(fù)載趨于平衡狀態(tài)。二是消費者請求消費時對代理的選擇,zookeeper通過代理的運行狀態(tài)選取最“閑”的代理節(jié)點返回給消費者,這樣保證最大程度減輕代理系統(tǒng)的通信壓力,提高總體系統(tǒng)的性能。
3 設(shè)計場景并運行
為驗證仿真系統(tǒng)與Kafka系統(tǒng)的一致性,我們通過設(shè)置參數(shù)設(shè)計如下場景:
1.為系統(tǒng)添加3個生產(chǎn)者,4個代理者,和2個消費者;
2.設(shè)置消息大小為100字節(jié),3個生產(chǎn)者分別發(fā)布8000,5000,11000條消息;
3.每個代理者存儲結(jié)構(gòu)負(fù)責(zé)管理10個主題,每個主題分為10個partition,每個partition分為100個segmentfile,每個segmentfile可存儲100條消息;
4.兩個消費者采用隨機消費的方式進(jìn)行消費,分別消費300和1100條消息。
運行上述場景,得出數(shù)據(jù),這里我們選取了其中一個代理節(jié)點和一個消費節(jié)點的信息數(shù)據(jù)進(jìn)行分析。如圖3.1-3.3。
圖3.1為仿真系統(tǒng)代理節(jié)點得到的部分實際數(shù)據(jù),圖3.2,圖3.3描述了生產(chǎn)者從發(fā)布消息和代理者接收消息的時間趨勢,以及他們的時間差。由圖中曲線可以看出,接收時間滯后于發(fā)布時間,接收時間差在開始會有一個比較高的峰值,之后趨于平穩(wěn),初步估計由于系統(tǒng)的調(diào)度和下層網(wǎng)絡(luò)連接導(dǎo)致的。
圖4.1為仿真系統(tǒng)中消費者節(jié)點得到的部分實際數(shù)據(jù),從中根據(jù)包標(biāo)簽屬性可以看出符合我們隨機消費的要求。圖4.2,圖4.3描述了消息消費的時間趨勢和時間差,由圖中看出它們并沒有明顯的規(guī)律可循。這是因為Kafka采取了與傳統(tǒng)系統(tǒng)不同的消費模式:PULL模型?!袄蹦P鸵韵M者為主動方發(fā)起消費行為,這使得消息的大小,類型,存儲位置等都會影響到其被消費的時間延遲。
4 結(jié)論
作為新一代分布式消息系統(tǒng),在大數(shù)據(jù)背景下的今天,Kafka為我們處理海量數(shù)據(jù)提供了研究方向。本文對分布式消息系統(tǒng)Kafka進(jìn)行了抽象建模,并基于網(wǎng)絡(luò)仿真工具NS3模擬實現(xiàn)了其基本功能,最后設(shè)計場景并運行程序,得到相關(guān)數(shù)據(jù)并進(jìn)行了分析。以上工作旨在深入了解分布式消息系統(tǒng)Katka的設(shè)計架構(gòu),理解其基本原理,工作流程和異于傳統(tǒng)架構(gòu)的特征,為之后的相關(guān)研究提供基本思路和工作環(huán)境。