于金良 朱志祥 李聰穎
摘 要:為了解決實時流式數(shù)據(jù)的采集問題,研究了一種分布式消息隊列Kafka,它可以實時采集流式數(shù)據(jù),處理數(shù)據(jù)時先從它訂閱,就可以以流數(shù)據(jù)的形式處理數(shù)據(jù)。該隊列具有部署簡單、易于管理、吞吐量高、高容錯性等優(yōu)點。經(jīng)測試,該隊列可以滿足實際生產(chǎn)中對吞吐量的需求。
關(guān)鍵詞:分布式;消息隊列;主題;流數(shù)據(jù)
中圖分類號:TP274.2 文獻(xiàn)標(biāo)識碼:A 文章編號:2095-1302(2016)08-00-03
0 引 言
當(dāng)前大數(shù)據(jù)處理的數(shù)據(jù)形式主要分為兩種。一種是固定的批量數(shù)據(jù),這種數(shù)據(jù)是一個文件或者一個數(shù)據(jù)庫,其數(shù)據(jù)量是固定的,我們只需一次讀取,然后進(jìn)行計算;另外一種是活躍的流式數(shù)據(jù),這種數(shù)據(jù)是一個數(shù)據(jù)流,是實時生成、可傳輸?shù)模缫粋€網(wǎng)站的流量page views、用戶搜索的內(nèi)容等,這些數(shù)據(jù)是實時的,且數(shù)據(jù)量很大,其采集比較困難,傳統(tǒng)的消息隊列很難應(yīng)用到此種場景中。要想處理這些數(shù)據(jù)就需要先采集這些數(shù)據(jù),本文介紹了一種可以實時采集這些數(shù)據(jù)的分布式消息隊列Kafka。
1 簡介
Kafka是LinkedIn于2010年12月份開源的消息系統(tǒng),它主要用于處理活躍的流式數(shù)據(jù)。活躍的流式數(shù)據(jù)在Web網(wǎng)站的應(yīng)用中十分常見,包括網(wǎng)站的pv、用戶訪問的內(nèi)容、搜索的內(nèi)容等。這些數(shù)據(jù)通常以日志的形式記錄下來,然后每隔一段時間進(jìn)行一次統(tǒng)計處理。
傳統(tǒng)的日志分析系統(tǒng)提供了一種離線處理日志信息的可擴(kuò)展方案,但若要進(jìn)行實時處理,通常會有較大延遲。而現(xiàn)有的消息(隊列)系統(tǒng)能夠很好地處理實時或者近似實時的應(yīng)用,但未處理的數(shù)據(jù)通常不會寫到磁盤上,這對于Hadoop之類(一小時或者一天只處理一部分?jǐn)?shù)據(jù))的離線應(yīng)用而言,可能會存在些許問題。Kafka正是為了解決以上問題而設(shè)計的,它能夠很好地提供離線和在線應(yīng)用。
Kafka對消息按topic進(jìn)行歸類保存,消息的發(fā)布者稱為生產(chǎn)者(Producer),消息的接收者稱為消費(fèi)者(Consumer)。Kafka是分布式,它的集群由多個Kafka實例組成,每個實例是一個Broker。Kafka集群的信息及生產(chǎn)者與消費(fèi)者的元數(shù)據(jù)都由Zookeeper保存,它本身無需保存這些數(shù)據(jù)。
2 設(shè)計原理
Kafka的設(shè)計初衷是希望作為一個統(tǒng)一的數(shù)據(jù)收集平臺,能夠?qū)崟r收集數(shù)據(jù)、支撐大數(shù)據(jù),并具備良好的容錯能力。
2.1 存儲
Kafka不會在消息被消費(fèi)后直接刪除,而是將消息持久化在磁盤中,使用文件存儲消息,而文件系統(tǒng)的優(yōu)化幾乎是不可能的,為了提高性能,采用了緩存/直接內(nèi)存映射的方法。為了減少對磁盤的訪問次數(shù),Broker將數(shù)據(jù)暫時緩存起來,當(dāng)消息的數(shù)量達(dá)到一定值時,再flush到磁盤中,減少了在磁盤I/O上消耗的時間。
2.2 高吞吐量
為了提高Kafka的吞吐量,它采用了批量傳輸發(fā)送的方法,即生產(chǎn)者發(fā)布消息時,先將消息緩存,當(dāng)達(dá)到一定量時,批量發(fā)送到Broker;對于消費(fèi)者也一樣,Broker會批量發(fā)送多條消息??紤]到網(wǎng)絡(luò)I/O,Kafka將在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)進(jìn)行壓縮,它支持的壓縮方式有g(shù)zip/snappy等。而在創(chuàng)建主題時,可以分為多個分區(qū),進(jìn)一步提高讀寫的吞吐量。
2.3 負(fù)載均衡
生產(chǎn)者根據(jù)用戶指定的算法,將消息發(fā)送到指定的分區(qū);存在多個分區(qū),每個分區(qū)都有自己的副本,每個副本分布在不同的Broker節(jié)點上;多個分區(qū)需要選取出主分區(qū),主分區(qū)負(fù)責(zé)讀寫,并由Zookeeper負(fù)責(zé)故障恢復(fù);通過Zookeeper管理Broker與消費(fèi)者的動態(tài)加入與離開。
2.4 自動擴(kuò)容
由于在大數(shù)據(jù)行業(yè)中數(shù)據(jù)量的大小難以估計,Kafka支持集群的橫向擴(kuò)展,當(dāng)需要增加Broker結(jié)點時,新增的Broker、生產(chǎn)者、消費(fèi)者會向Zookeeper注冊,并及時作出調(diào)整。
3 技術(shù)架構(gòu)
Kafka是使用scala語言開發(fā)的,同時支持多種編程語言的客戶端(c++、java、python、go等),其總體架構(gòu)如圖1所示。
Kafka的消息分為以下幾個層次:
(1)主題(Topic):一類消息,例如頁面瀏覽日志,用戶搜索日志等都可以以主題的形式存在,Kafka集群能夠同時負(fù)責(zé)一個或多個主題的分發(fā)。
(2)分區(qū)(Partition):是在主題物理上的分組,一個主題可以分為多個分區(qū),每個分區(qū)是一個有序的隊列。分區(qū)中的每條消息都會被分配一個有序的id(offset)。
(3) 消息(Message):最小訂閱單元。
具體流程如下所示:
(1)生產(chǎn)者根據(jù)指定的分區(qū)方法(round-robin、hash等),將消息發(fā)布到指定主題的分區(qū)中;
(2)Kafka集群接收到生產(chǎn)者發(fā)過來的消息后,將其持久化到硬盤,并保留消息指定時長(可配置),而不關(guān)注消息是否被消費(fèi);
(3)消費(fèi)者從Kafka集群拉數(shù)據(jù),并控制獲取消息的offset。
4 主題與分區(qū)
一個topic是對一組消息的歸納。Kafka對每個主題的日志進(jìn)行了分區(qū),如圖2所示。
每個分區(qū)都由一系列有序的、不可變的消息組成,這些消息被連續(xù)追加到分區(qū)中。分區(qū)中的每個消息都有一個連續(xù)的序列號叫做offset,用來在分區(qū)中唯一的標(biāo)識這個消息。
在一個可配置的時間段內(nèi),Kafka集群保留所有發(fā)布的消息,不管這些消息有沒有被消費(fèi)。比如將消息的保存策略設(shè)置為2天,那么在一個消息發(fā)布到Kafka的兩天時間內(nèi),它都可以被消費(fèi)。之后它將被丟棄以釋放空間。Kafka的性能是和數(shù)據(jù)量無關(guān)的常量級,所以保留太多的數(shù)據(jù)并不是問題。
實際上每個消費(fèi)者唯一需要維護(hù)的是已消費(fèi)消息在消息隊列中的位置,即offset。一般情況下隨著消費(fèi)者不斷的讀取消息,offset的值隨之不斷增加,其實消費(fèi)者可以以任意的順序讀取消息,比如它可以將offset設(shè)置成一個舊值來重讀之前的消息。
結(jié)合以上特點可以發(fā)現(xiàn),Kafka消費(fèi)者是輕量級的,它們可以在不對集群和其他消費(fèi)者造成影響的情況下讀取消息。
將日志分區(qū)可以達(dá)到以下目的:首先這使得每個日志的數(shù)量不會太大,可以在單個服務(wù)上保存。另外每個分區(qū)可以單獨(dú)發(fā)布和消費(fèi),為并發(fā)操作主題提供了一種可能。
4.1 分布式
每個分區(qū)在Kafka集群的若干服務(wù)中都有副本,這些持有副本的服務(wù)可以共同處理數(shù)據(jù)和請求,副本數(shù)量可以配置。副本使Kafka具備了容錯能力。
每個分區(qū)都由一個服務(wù)器作為主服務(wù),零或若干服務(wù)器作為從服務(wù),主服務(wù)負(fù)責(zé)處理消息的讀和寫,從服務(wù)則復(fù)制主服務(wù),若主服務(wù)宕機(jī)了,從服務(wù)中的一臺則會自動成為主服務(wù)。集群中的每個服務(wù)都會同時扮演兩個角色:作為它所持有的一部分分區(qū)的主,同時作為其他分區(qū)的從,這樣集群就會有較好的負(fù)載均衡。
4.2 生產(chǎn)者
生產(chǎn)者(Producer)將消息發(fā)布到它指定的主題中,并決定發(fā)布到哪個分區(qū)中。一般是由負(fù)載均衡機(jī)制隨機(jī)選擇一個分區(qū),也可通過特定的分區(qū)函數(shù)來選擇分區(qū)。使用更多的是第二種。
4.3 消費(fèi)者
發(fā)布消息通常有兩種模式:隊列模式(queuing)和發(fā)布-訂閱模式(publish-subscribe)。隊列模式中,消費(fèi)者(Consumers)可以同時從服務(wù)端讀取消息,每個消息只被其中一個消費(fèi)者讀到;發(fā)布-訂閱模式中消息被廣播到所有的消費(fèi)者中。多個消費(fèi)者可以加入到一個消費(fèi)者組中,共同競爭一個主題,主題中的消息將被分發(fā)到組中的一個成員中。同一組中的消費(fèi)者可以在不同的程序中、不同的機(jī)器上。如果所有的消費(fèi)者都在一個組中,這就成為了傳統(tǒng)的隊列模式,在消費(fèi)者組中實現(xiàn)負(fù)載均衡。
5 性能測試
通過一臺4核、8 G內(nèi)存的臺式機(jī),向一個擁有5臺Broker的Kafka集群發(fā)布、訂閱消息。消息的平均大小為100個字節(jié)。測試結(jié)果如下:
生產(chǎn)者:每秒可發(fā)布30萬條消息,且可通過調(diào)整request.required.acks參數(shù)來保證數(shù)據(jù)的可靠性。
消費(fèi)者:每秒可消費(fèi)5萬條數(shù)據(jù)。通過使用java語言編寫Kafka生產(chǎn)者與消費(fèi)者對Kafka性能進(jìn)行測試。
6 結(jié) 語
分布式消息隊列Kafka具有部署簡單、易于管理、高吞吐量、高容錯性等優(yōu)點,經(jīng)測試,可以滿足實際生產(chǎn)中對吞吐量的需求。
參考文獻(xiàn)
[1]孫韓林.一種基于云計算的網(wǎng)絡(luò)流量分析系統(tǒng)結(jié)構(gòu)[J].西安郵電大學(xué)學(xué)報,2013,18(4):75-79.
[2]金澈清,錢衛(wèi)寧,周傲英.流數(shù)據(jù)分析與管理綜述[J].軟件學(xué)報,2004,15(8):1172-1181.
[3]曹婧華,冉彥中,許志軍.分布式消息隊列的設(shè)計與實現(xiàn)[J].河南科技大學(xué)學(xué)報(自然科學(xué)版),2010,31(4):35-38,109.
[4]王博,陳莉君.Hadoop遠(yuǎn)程過程調(diào)用機(jī)制的分析和應(yīng)用[J].西安郵電學(xué)院學(xué)報,2012,17(6):74-77.
[5]盧本捷.分布式消息隊列的理論、實現(xiàn)與應(yīng)用[D].武漢:華中科技大學(xué),2004.
[6]崔小燕.Linux集群系統(tǒng)分析[J].西安郵電學(xué)院學(xué)報,2006,11(5):103-106.
[7]于自強(qiáng).海量流數(shù)據(jù)挖掘相關(guān)問題研究[D].濟(jì)南:山東大學(xué),2015.
[8]謝曉燕,張靜雯.一種基于Linux集群技術(shù)的負(fù)載均衡方法[J].西安郵電大學(xué)學(xué)報,2014,19(3):64-68.
[9]陸慶,周世杰,秦志光,等.消息隊列中間件系統(tǒng)中消息隊列與消息分發(fā)技術(shù)研究[J].計算機(jī)應(yīng)用研究,2003(8):51-53.