邰宇
摘 要 互聯(lián)網(wǎng)的快速發(fā)展促進(jìn)了海量數(shù)據(jù)的產(chǎn)生,而實(shí)時(shí)處理海量數(shù)據(jù)離不開(kāi)具有良好性能的分布式消息隊(duì)列,可明顯提高數(shù)據(jù)處理的效率,海量數(shù)據(jù)采用何種消息隊(duì)列進(jìn)行傳輸是關(guān)鍵問(wèn)題之一。分析研究應(yīng)用最頻繁的Apache Kafka、Rocket-MQ及Rabbit-MQ三種消息隊(duì)列特點(diǎn)及實(shí)現(xiàn)原理,在實(shí)時(shí)大數(shù)據(jù)計(jì)算場(chǎng)景下基于此對(duì)消息隊(duì)列分別搭建集群測(cè)試環(huán)境,比較有關(guān)結(jié)果后實(shí)現(xiàn)對(duì)消息隊(duì)列的性能設(shè)計(jì)優(yōu)化。
關(guān)鍵詞 海量數(shù)據(jù);消息隊(duì)列;大數(shù)據(jù)時(shí)代
中圖分類(lèi)號(hào) G2 文獻(xiàn)標(biāo)識(shí)碼 A 文章編號(hào) 1674-6708(2018)204-0106-02
1 概述
隨著互聯(lián)網(wǎng)技術(shù)的迅速發(fā)展,網(wǎng)絡(luò)日志在主流網(wǎng)站及應(yīng)用中每日產(chǎn)生的都是海量級(jí)的,其數(shù)據(jù)價(jià)值與其產(chǎn)生時(shí)間之間存在負(fù)相關(guān)關(guān)系。基于此研發(fā)的實(shí)時(shí)流計(jì)算系統(tǒng),使這些數(shù)據(jù)體現(xiàn)出最大的價(jià)值。將數(shù)據(jù)向計(jì)算系統(tǒng)傳輸已成為對(duì)計(jì)算效率產(chǎn)生影響的一個(gè)主要瓶頸,結(jié)合特定業(yè)務(wù)場(chǎng)景科學(xué)合理地選擇分布式消息隊(duì)列更為適宜,可在一定程度上使實(shí)時(shí)計(jì)算效率明顯提高。對(duì)不同分布式消息隊(duì)列在實(shí)時(shí)計(jì)算場(chǎng)景下實(shí)時(shí)性、并發(fā)性、可靠性及擴(kuò)展能力等方面表現(xiàn)出的差異比較,以確定最優(yōu)性能的消息隊(duì)列。
2 消息隊(duì)列
2.1 Kafka及其基本架構(gòu)
Kafka是可實(shí)現(xiàn)、發(fā)布及訂閱功能的分布式消息隊(duì)列系統(tǒng),生產(chǎn)者生產(chǎn)消息并將指定話題的消息向消息集群中發(fā)布,消費(fèi)者會(huì)對(duì)消息集群中的指定話題消息主動(dòng)訂閱,中間對(duì)持久化消息的存儲(chǔ)稱(chēng)為Broker。消息偏移量在消費(fèi)者中存儲(chǔ),因Kafka消息隊(duì)列無(wú)狀態(tài),用于對(duì)Kafka中當(dāng)前消費(fèi)者的消費(fèi)狀況進(jìn)行記錄。
若某個(gè)節(jié)點(diǎn)在集群中宕機(jī),系統(tǒng)還能提供正常服務(wù),但容易丟失存儲(chǔ)在宕機(jī)節(jié)點(diǎn)上的信息。無(wú)狀態(tài)的Kafka需消費(fèi)者定期維護(hù)消息隊(duì)列集群中消費(fèi)的偏移量,詳細(xì)記錄之前的消費(fèi)狀態(tài)。消息偏移量是不連續(xù)增量,在對(duì)下一個(gè)消息位置計(jì)算時(shí),應(yīng)將當(dāng)前消息長(zhǎng)度以原來(lái)偏移量為基礎(chǔ)進(jìn)行相加計(jì)算。
2.2 Rocket-MQ及其基本架構(gòu)
Rocket-MQ主要是由服務(wù)器端的NameServer、Broker和客戶(hù)端的Producer、Consumer四種節(jié)點(diǎn)組成,其Broker、Producer、Consumer與Kafka具有基本相同的功能。NameServer主要用于提供給Producer和Consumer生產(chǎn)消費(fèi)的Broker地址,Rocket-MQ集群隨著啟動(dòng)的Broker集群,發(fā)起連接指定NameServer的請(qǐng)求,Broker將以30s為周期會(huì)自動(dòng)發(fā)送具有目的topic消息的一次心跳,同時(shí)NameServer每隔兩分鐘對(duì)是否存在心跳進(jìn)行主動(dòng)檢測(cè),若未檢測(cè)到心跳,將自動(dòng)斷開(kāi)連接。若Broker掛掉,也將斷開(kāi)連接,NameServer迅速感知到并將topic和broker的關(guān)系更新。但不會(huì)向客戶(hù)端主動(dòng)通知,在客戶(hù)端啟動(dòng)時(shí),對(duì)部分NameServer指定具體的網(wǎng)址,客戶(hù)端自動(dòng)與指定NameServer進(jìn)行連接,若不能成功連接,客戶(hù)端就會(huì)嘗試連接其他NameServer地址,連接成功將每隔30s對(duì)路由信息進(jìn)行查詢(xún)。
2.3 Rabbit-MQ及其基本架構(gòu)
由Exchanges與Queues組成的Broker是Rabbit-MQ與其他2種消息隊(duì)列的主要區(qū)別,向Exchanges中push生產(chǎn)者生產(chǎn)的消息,系統(tǒng)利用RoutingKey將找到消息與Queue的對(duì)應(yīng)存儲(chǔ)位置。Queue利用routing keys進(jìn)行綁定,在消息傳輸中,若消費(fèi)者對(duì)客戶(hù)端的發(fā)送消息正確接收并消費(fèi),系統(tǒng)將這條消息從Queue中刪除。多個(gè)消費(fèi)者可接收發(fā)送來(lái)的同一消息,及時(shí)將數(shù)據(jù)向消費(fèi)者發(fā)送后同時(shí)在隊(duì)列中將這條數(shù)據(jù)刪除。
3 實(shí)驗(yàn)設(shè)計(jì)
采用本地虛擬機(jī)PC搭建測(cè)試環(huán)境,對(duì)測(cè)試主機(jī)進(jìn)行網(wǎng)絡(luò)配置。對(duì)實(shí)驗(yàn)系統(tǒng)進(jìn)行設(shè)計(jì),其主要過(guò)程為生產(chǎn)者向Broker集群中Push數(shù)據(jù),然后對(duì)消費(fèi)者Pull到Broker集群中的數(shù)據(jù)進(jìn)行計(jì)算。由程序自動(dòng)生成實(shí)驗(yàn)數(shù)據(jù),再向Broker集群中存儲(chǔ)。預(yù)先搭建好Storm實(shí)時(shí)計(jì)算系統(tǒng),3種消息隊(duì)列分別與生產(chǎn)者和消費(fèi)者建立連接,再對(duì)消息分別統(tǒng)計(jì)分析其生產(chǎn)和消費(fèi)效率。
4 性能優(yōu)化設(shè)計(jì)及實(shí)驗(yàn)結(jié)果
4.1 創(chuàng)新性
為使數(shù)據(jù)計(jì)算提高準(zhǔn)確性,采用全新Kafka消息結(jié)構(gòu),放棄消費(fèi)者利用對(duì)offset的維護(hù)消費(fèi)Kafka集群中的數(shù)據(jù)。消費(fèi)者對(duì)數(shù)據(jù)接口的讀取和消費(fèi)者對(duì)數(shù)據(jù)偏移量接口的修改分別進(jìn)行重新設(shè)計(jì),同時(shí)由消費(fèi)者調(diào)用讀取數(shù)據(jù)接口和修改偏移量接口,以確保其消費(fèi)端具有較高的可靠性。
4.2 消費(fèi)者可靠性設(shè)計(jì)
丟失傳送消息和重新傳送消費(fèi)過(guò)的消息是讀取Kafka原生的消費(fèi)者端數(shù)據(jù)中比較常見(jiàn)的2個(gè)問(wèn)題,因此,基于此采取可靠性設(shè)計(jì)方案。將主鍵Id分別添加到生產(chǎn)者中的每條消息中,在消費(fèi)者中若檢測(cè)出重復(fù)Id則進(jìn)行自動(dòng)過(guò)濾,以確保不會(huì)重新消費(fèi)已被消費(fèi)的消息。確保不丟失傳送數(shù)據(jù)的方法主要是采取消費(fèi)者同步處理數(shù)據(jù)和對(duì)數(shù)據(jù)偏移量修改,即消費(fèi)者將一條數(shù)據(jù)處理完后再依次將另一條數(shù)據(jù)依次進(jìn)行處理。
4.3 測(cè)試主要用例
測(cè)試主要是實(shí)現(xiàn)對(duì)以上3種消息隊(duì)列的磁盤(pán) IO、吞吐量及CPU資源消耗率之間存在差距的比較。正常啟動(dòng)Zookeeper和3個(gè)消息隊(duì)列,測(cè)試是將消息隊(duì)列集群與Storm計(jì)算集群?jiǎn)?dòng),push消息隊(duì)列中100萬(wàn)條準(zhǔn)備好的測(cè)試數(shù)據(jù),對(duì)topic分別創(chuàng)建,計(jì)算Storm計(jì)算集群pull出消息隊(duì)列中的數(shù)據(jù)。
4.4 實(shí)驗(yàn)結(jié)果
通過(guò)比較分析以實(shí)時(shí)流處理場(chǎng)景為基礎(chǔ)的吞吐量,Kafka最高,Rabbit-MQ的broker磁盤(pán)IO處于瓶頸。Rocket-MQ比較穩(wěn)定,磁盤(pán)IO使用率已接近全部。Rabbit-MQ在消耗CPU資源方面較大。再對(duì)服務(wù)端處理同步發(fā)送的有關(guān)消息隊(duì)列的性能進(jìn)行比較,Kafka消息隊(duì)列最高,Rabbit-MQ消息隊(duì)列最低。
5 結(jié)論
綜上所述,基于實(shí)時(shí)流處理業(yè)務(wù)場(chǎng)景對(duì)存儲(chǔ)和讀取需處理數(shù)據(jù)的消息隊(duì)列進(jìn)行選擇十分必要。通過(guò)對(duì)Kafka消息結(jié)構(gòu)的優(yōu)化,再比較基于Storm集群實(shí)時(shí)計(jì)算場(chǎng)景中性能較好的3種消息隊(duì)列,研究結(jié)果顯示以極大的實(shí)時(shí)計(jì)算數(shù)據(jù)量和較低延遲的要求為基礎(chǔ),綜合評(píng)價(jià)這3種消息隊(duì)列的吞吐量、磁盤(pán)IO及消耗CPU標(biāo)準(zhǔn)等有關(guān)指標(biāo),Kafka消息隊(duì)列的優(yōu)勢(shì)比較明顯。
參考文獻(xiàn)
[1]王巖,王純.一種基于Kafka的可靠的Consumer的設(shè)計(jì)方案[J].軟件,2016(17):61-66.
[2]馬浩然.基于NS3的分布式消息系統(tǒng)Kafka的仿真實(shí)現(xiàn)[J].軟件,2015(1):94-99.
[3]張鵬,李鵬霄,任彥,等.面向大數(shù)據(jù)的分布式流處理技術(shù)綜述[J].計(jì)算機(jī)研究與發(fā)展,2014(s2):1-9.
[4]周京暉.集成消息服務(wù)和定時(shí)通知的分布式內(nèi)存數(shù)據(jù)庫(kù)[J].軟件,2013,34(1):82-92.
[5]譚玉靖.基于ZooKeeper的分布式處理框架的研究與實(shí)現(xiàn)[D].北京:北京郵電大學(xué),2014.endprint