朱蔚林,木偉民,金宗澤,王偉平
(1.中國科學(xué)院 信息工程研究所,北京 100093;2.中國科學(xué)院大學(xué),北京 100049)
數(shù)據(jù)的價(jià)值隨著時(shí)間推移會(huì)慢慢降低,在社會(huì)生活中,特別是商業(yè)場景中這一現(xiàn)象更加顯著。流處理系統(tǒng)的出現(xiàn)讓用戶能夠快速地從龐雜的數(shù)據(jù)流中提煉出數(shù)據(jù)蘊(yùn)含的價(jià)值。由于數(shù)據(jù)流的持續(xù)產(chǎn)生且體積龐大,給系統(tǒng)的存儲(chǔ)能力帶來了巨大挑戰(zhàn)。由此導(dǎo)致數(shù)據(jù)流統(tǒng)計(jì)具有one-pass訪問、有限內(nèi)存和實(shí)時(shí)等特點(diǎn),也為統(tǒng)計(jì)帶來了極大的挑戰(zhàn)。
現(xiàn)如今,大數(shù)據(jù)時(shí)代涌現(xiàn)了如S4[1-2]、Storm[3]、Spark Streaming[4]等一系列數(shù)據(jù)流處理平臺(tái)[5-6],這些平臺(tái)由于面向的應(yīng)用場景不同而各有特點(diǎn)。S4由Yahoo在2010年提出,S4為了保證良好的可擴(kuò)展性,采用去中心化結(jié)構(gòu),但是S4消息路由僅支持按照Key值進(jìn)行分布,而且沒有提供消息處理的反饋機(jī)制,使得容錯(cuò)成為了很大的問題。Storm面向純流式處理,以低延遲為核心設(shè)計(jì)目標(biāo),將數(shù)據(jù)流處理任務(wù)抽象為有向無環(huán)圖,稱作Topology,同時(shí)在at-least-once語義的基礎(chǔ)上實(shí)現(xiàn)了exactly-once語義,但是其弱中心化的結(jié)構(gòu)也沒有徹底解決單點(diǎn)的問題。Spark Streaming構(gòu)建在Spark的基礎(chǔ)上,沿用了Spark中創(chuàng)新性的存儲(chǔ)結(jié)構(gòu)RDD(彈性分布式數(shù)據(jù)集),將數(shù)據(jù)流切分為一個(gè)個(gè)RDD,增大了處理粒度以提高吞吐量,但是也因此增加了延遲。
在數(shù)據(jù)流上有一種普遍的應(yīng)用場景是在數(shù)據(jù)流上進(jìn)行數(shù)據(jù)統(tǒng)計(jì)[7],例如對(duì)于電信網(wǎng)絡(luò)、社交網(wǎng)絡(luò)數(shù)據(jù)流等等。在這些數(shù)據(jù)流上的實(shí)時(shí)統(tǒng)計(jì)無論對(duì)于輿情控制,還是提供更高質(zhì)量的服務(wù)都有重要意義。然而上述平臺(tái)并不能完美契合此類應(yīng)用場景。
數(shù)據(jù)流統(tǒng)計(jì)場景下一個(gè)很典型的處理是分組統(tǒng)計(jì)。以社交網(wǎng)絡(luò)數(shù)據(jù)流為例,假設(shè)數(shù)據(jù)流S中每個(gè)元組包含uid,topic,time三個(gè)字段,uid唯一標(biāo)識(shí)一個(gè)用戶個(gè)體,topic為用戶參與的話題,time為該條信息產(chǎn)生的時(shí)間分片,現(xiàn)在需要統(tǒng)計(jì)各個(gè)時(shí)間分片的熱點(diǎn)話題。如果將數(shù)據(jù)流類比為傳統(tǒng)的關(guān)系數(shù)據(jù)庫,會(huì)提交以下SQL語句:
SELECT topic, time, count(1)
FROM S GROUP BY topic, time
而對(duì)于數(shù)據(jù)流而言,數(shù)據(jù)流具有快速流動(dòng)、易失的特性,絕不可能像數(shù)據(jù)庫一樣在一個(gè)時(shí)刻看到整個(gè)數(shù)據(jù)流的全部數(shù)據(jù),而且熱點(diǎn)話題具有很強(qiáng)的時(shí)效性,統(tǒng)計(jì)結(jié)果延遲要盡量做到最小。同時(shí),電信網(wǎng)絡(luò)、社交網(wǎng)絡(luò)由于用戶眾多,數(shù)據(jù)量往往很大,因此對(duì)流統(tǒng)計(jì)平臺(tái)的性能要求很高,如流統(tǒng)計(jì)的吞吐率和延遲等。
面對(duì)這樣的應(yīng)用場景,系統(tǒng)主要的需求在于高吞吐率和低延遲。目前流行的通用分布式流處理平臺(tái)如Spark Streaming和Storm等,系統(tǒng)結(jié)構(gòu)較復(fù)雜,在這種高吞吐量的實(shí)時(shí)連續(xù)統(tǒng)計(jì)場景上性能有所局限。對(duì)此,面向基于窗口的連續(xù)查詢需求,提出了一個(gè)高吞吐、低延遲的分布式數(shù)據(jù)流統(tǒng)計(jì)模型Mars,同時(shí)提供了強(qiáng)大的容錯(cuò)性。
數(shù)據(jù)流可以理解為一個(gè)不斷增長的數(shù)據(jù)集合[8-9],將其定義如下:
定義1:設(shè)t表示任意時(shí)間戳,t時(shí)刻到來的數(shù)據(jù)集為dt,稱{…dt-1,dt,dt+1}為數(shù)據(jù)d的數(shù)據(jù)流。
數(shù)據(jù)流具有實(shí)時(shí)性、易失性、突發(fā)性、無序性、無限性等特征。在數(shù)據(jù)流上的統(tǒng)計(jì)查詢處理由于這些特征,劃分為不同的模型。
按照統(tǒng)計(jì)的時(shí)序范圍來劃分,數(shù)據(jù)流上的統(tǒng)計(jì)處理分為界標(biāo)模型[10]、滑動(dòng)窗口模型[11]以及跳動(dòng)窗口模型[12]。
1.1.1 界標(biāo)模型
設(shè)當(dāng)前時(shí)間戳為t,s為一個(gè)指定的在t之前的時(shí)間戳,稱作界標(biāo),界標(biāo)模型統(tǒng)計(jì)的是從界標(biāo)時(shí)間戳s開始一直到當(dāng)前時(shí)間戳t范圍內(nèi)數(shù)據(jù)流{ds…dt}上的統(tǒng)計(jì)結(jié)果。
1.1.2 滑動(dòng)窗口模型
設(shè)當(dāng)前時(shí)間戳為t,滑動(dòng)窗口模型統(tǒng)計(jì)的是當(dāng)前時(shí)間戳之前一定時(shí)間范圍內(nèi)的數(shù)據(jù)流,設(shè)滑動(dòng)窗口大小為n,即時(shí)間范圍為n,滑動(dòng)窗口模型所處理的數(shù)據(jù)流為{dt-n…dt}。
除了上述基于時(shí)間戳的滑動(dòng)窗口模型,另外還有基于元組數(shù)量的滑動(dòng)窗口,即在內(nèi)存中保存一定數(shù)量的元組。
1.1.3 跳動(dòng)窗口模型
跳動(dòng)窗口是滑動(dòng)窗口的一個(gè)延伸。在滑動(dòng)窗口模型中,窗口以一定時(shí)間范圍或者元組數(shù)量為單位向前滑動(dòng),兩次連續(xù)的處理的數(shù)據(jù)集有重疊,而跳動(dòng)窗口模型相鄰兩次處理數(shù)據(jù)集無重疊,這一次處理時(shí)間范圍的起點(diǎn)是上一次處理時(shí)間范圍的終點(diǎn)(以基于時(shí)間的窗口為例)。因此,每個(gè)跳動(dòng)窗口的處理結(jié)果之間無交集,同時(shí)它們的并集就是整個(gè)數(shù)據(jù)集上全量的處理結(jié)果。
根據(jù)統(tǒng)計(jì)查詢提交形式的不同,查詢類型分為Ad-hoc查詢[13]和連續(xù)查詢[14]。
1.2.1 Ad-hoc查詢
Ad-hoc查詢也稱即席查詢,Ad-hoc查詢請(qǐng)求可以在統(tǒng)計(jì)系統(tǒng)運(yùn)行時(shí)的任一時(shí)刻提交,統(tǒng)計(jì)系統(tǒng)接收到查詢請(qǐng)求后立即處理該查詢并在產(chǎn)生結(jié)果后立即返回查詢結(jié)果。目前處理Ad-hoc查詢的數(shù)據(jù)流處理平臺(tái)較少。
1.2.2 連續(xù)查詢
連續(xù)查詢請(qǐng)求在查詢統(tǒng)計(jì)系統(tǒng)啟動(dòng)時(shí)或某一特定時(shí)間戳提交,在這一時(shí)間戳之后,直到用戶主動(dòng)取消查詢請(qǐng)求,系統(tǒng)一直處理該查詢,每隔一定時(shí)間間隔輸出查詢結(jié)果。Mars面向的查詢類型即是連續(xù)查詢。
對(duì)數(shù)據(jù)流做統(tǒng)計(jì)處理時(shí),最小的單位是元組。理想狀態(tài)下,數(shù)據(jù)流中的每一個(gè)元組都應(yīng)該被處理一次,且僅僅處理一次。但是在系統(tǒng)層面,由于分布式系統(tǒng)本身的復(fù)雜性和不確定性,以及在某些應(yīng)用場景下對(duì)系統(tǒng)需求的不同,往往會(huì)對(duì)精度做一定程度上的犧牲。根據(jù)對(duì)每一元組處理次數(shù)的保證,統(tǒng)計(jì)模型的語義分為三種。
1.3.1 至多一次(at most once)
至多一次語義是最松的約束,該語義模型盡力使得每一元組被處理到,但不保證對(duì)任意元組處理的必然性。由于約束寬松,實(shí)現(xiàn)這樣的模型系統(tǒng)開銷往往是最小的,由于這樣的特性,該語義適合對(duì)吞吐量要求較高、但不要求統(tǒng)計(jì)精確度的應(yīng)用場景。
1.3.2 至少一次(at least once)
至少一次語義保證數(shù)據(jù)流中每一元組都至少被處理一次,但是在一些特殊情況發(fā)生時(shí),有可能會(huì)造成重復(fù)處理某些元組。在這樣的語義約束下,統(tǒng)計(jì)模型實(shí)現(xiàn)時(shí)需要設(shè)計(jì)嚴(yán)格的容錯(cuò)機(jī)制,確保在任何可控故障發(fā)生時(shí),每個(gè)元組都會(huì)被處理,但是容錯(cuò)性會(huì)帶來不菲的系統(tǒng)開銷。
1.3.3 精確一次(exactly once)
精確一次語義是理想狀態(tài)下的語義,也是最嚴(yán)格的語義約束。這一語義一般是在至少一次語義的模型基礎(chǔ)上,對(duì)統(tǒng)計(jì)結(jié)果進(jìn)行去重而得到的。因此,系統(tǒng)實(shí)現(xiàn)時(shí),精確一次語義比至少一次語義又多了一重開銷,只有在精確度要求極其苛刻的應(yīng)用場景下會(huì)使用這一語義約束,例如銀行系統(tǒng)或證券系統(tǒng)。
Mars的系統(tǒng)架構(gòu)如圖1所示。
圖1 Mars系統(tǒng)架構(gòu)
由圖1可見,Mars依賴的外部組件有四個(gè):分布式消息中間件提供消息服務(wù),Mars的輸入和輸出都通過分布式消息中間件完成,統(tǒng)計(jì)任務(wù)數(shù)據(jù)庫儲(chǔ)存用戶的統(tǒng)計(jì)需求,分布式協(xié)調(diào)系統(tǒng)提供對(duì)分布式集群運(yùn)行時(shí)的狀態(tài)管理等等,分布式緩存負(fù)責(zé)異步解耦和臨時(shí)緩存中間數(shù)據(jù)。
Mapper集群、Reducer集群是Mars的核心組件。Mapper集群從消息中間件拉取消息并處理,將中間結(jié)果順序緩存在分布式緩存中;Reducer集群從分布式緩存中順序讀取分布式緩存中的中間數(shù)據(jù),處理后將最終的統(tǒng)計(jì)結(jié)果再發(fā)送回消息隊(duì)列。
Mapper和Reducer的靈感來自于MapReduce編程模型,盡管MapReduce編程模型是為了批處理場景而提出的,但是它將大規(guī)模數(shù)據(jù)處理過程抽象為Map和Reduce兩個(gè)階段,對(duì)于數(shù)據(jù)流統(tǒng)計(jì)問題同樣具有重要的指導(dǎo)意義。Mars將MapReduce模型擴(kuò)展到了集群概念上,每一個(gè)Mapper或者Reducer計(jì)算單元都是分布式集群中的一個(gè)節(jié)點(diǎn),分別稱作Mapper或Reducer。所有的Mapper節(jié)點(diǎn)組成Mapper集群,所有的Reducer節(jié)點(diǎn)組成Reducer集群。
Mapper集群采用去中心化結(jié)構(gòu),集群內(nèi)各個(gè)節(jié)點(diǎn)是對(duì)等關(guān)系,使用去中心化結(jié)構(gòu)的核心目標(biāo)是將計(jì)算粒度切分,在進(jìn)行較大窗口下的統(tǒng)計(jì)時(shí),如果使用集中式結(jié)構(gòu),緩存窗口內(nèi)的全部數(shù)據(jù)將造成極大的內(nèi)存開銷和時(shí)間開銷。而Mars將這樣的大窗口切分為小窗口分布到集群上并行處理,有效解決了該問題。同時(shí)這一設(shè)計(jì)保證了Mapper集群良好的可擴(kuò)展性,使得Mars的計(jì)算能力隨著集群規(guī)模的擴(kuò)大可以得到近似于線性的增加。
Reducer集群的統(tǒng)計(jì)功能是將經(jīng)過Mapper切分的細(xì)粒度統(tǒng)計(jì)結(jié)果合并為任務(wù)需求窗口大小的結(jié)果,因此Reducer集群采用主從架構(gòu)。集群啟動(dòng)時(shí),各個(gè)節(jié)點(diǎn)首先借助分布式協(xié)調(diào)系統(tǒng)選舉出一個(gè)領(lǐng)導(dǎo)者,該領(lǐng)導(dǎo)者負(fù)責(zé)給各個(gè)節(jié)點(diǎn)分配任務(wù),并監(jiān)聽節(jié)點(diǎn)狀態(tài),當(dāng)集群規(guī)模發(fā)生改變時(shí)重新分配任務(wù)。同時(shí),各個(gè)從節(jié)點(diǎn)也監(jiān)聽主節(jié)點(diǎn)的狀態(tài),當(dāng)主節(jié)點(diǎn)發(fā)生故障時(shí)重新選舉領(lǐng)導(dǎo)者。
Mapper和Reducer通過分布式緩存?zhèn)鬟f數(shù)據(jù)[15]的協(xié)議設(shè)計(jì)是Mars的一個(gè)關(guān)鍵點(diǎn)。Mars使用了一種特殊的序號(hào)機(jī)制保證Mapper和Reducer協(xié)作步進(jìn),同時(shí)保證兩個(gè)階段異步運(yùn)行。
初始狀態(tài)時(shí),首先針對(duì)每個(gè)處理任務(wù)在分布式緩存中為Mapper集群和Reducer集群初始化一個(gè)序號(hào),稱為SEQ。當(dāng)Mapper節(jié)點(diǎn)處理完輸入的原始數(shù)據(jù)集后,將緩存中的SEQ自增1,使用自增操作的主要目的是使得多節(jié)點(diǎn)并行處理統(tǒng)一統(tǒng)計(jì)任務(wù)時(shí)不會(huì)得到統(tǒng)一SEQ,造成數(shù)據(jù)覆蓋。
而Reducer集群出于容錯(cuò)性考慮,使用了延遲更新SEQ的策略。
容錯(cuò)是所有分布式流處理系統(tǒng)應(yīng)當(dāng)關(guān)注的問題,雖然集群中每個(gè)節(jié)點(diǎn)發(fā)生故障的概率很小,但是一旦發(fā)生,由于數(shù)據(jù)流不斷流動(dòng)的特點(diǎn),丟失的數(shù)據(jù)便很難找回。大多數(shù)現(xiàn)有的故障恢復(fù)策略都是通過冗余備份策略實(shí)現(xiàn)的[16],Mars所采用的策略也類似。
在Mapper端,每一個(gè)節(jié)點(diǎn)拉取數(shù)據(jù)后都首先將數(shù)據(jù)存一份本地文件,同時(shí)將文件名與需要該數(shù)據(jù)的統(tǒng)計(jì)任務(wù)id列表的對(duì)應(yīng)關(guān)系注冊到分布式緩存中,每當(dāng)某一任務(wù)處理完該數(shù)據(jù)時(shí),從列表中刪去該任務(wù)id,直到列表為空時(shí)刪除本地文件,最后向分布式消息中間件反饋ack消息。假如在ack之前該節(jié)點(diǎn)發(fā)生了宕機(jī),由于消息中間件未接收到ack消息,當(dāng)發(fā)生超時(shí)后,消息中間件會(huì)向其他節(jié)點(diǎn)重發(fā)該消息。這一機(jī)制保證了每一條元組都會(huì)被處理至少一次。
在Reducer端,采用延遲更新SEQ的方式來保證容錯(cuò)性。以一個(gè)統(tǒng)計(jì)任務(wù)的處理過程為例,如圖2所示。Reducer集群有兩個(gè)節(jié)點(diǎn),R1正在處理數(shù)據(jù),初始R1從緩存中得到序號(hào)1,于是從緩存中得到序號(hào)為1的數(shù)據(jù)集并處理,處理完成后在內(nèi)存中序號(hào)加1變?yōu)?,并不更新到緩存中。接著在緩存中讀取序號(hào)為2的數(shù)據(jù)集,假如在處理過程中,歸并后的統(tǒng)計(jì)結(jié)果尚未輸出之前R1節(jié)點(diǎn)發(fā)生宕機(jī),經(jīng)過新一輪的領(lǐng)導(dǎo)者選舉以及任務(wù)分配,該任務(wù)遷移到R2節(jié)點(diǎn)。這時(shí)R2節(jié)點(diǎn)從緩存中獲取到的序號(hào)依然為1,不會(huì)造成數(shù)據(jù)丟失。
圖2 Reducer容錯(cuò)機(jī)制示意圖
綜上,Mars強(qiáng)大的容錯(cuò)機(jī)制保證了Mars的at-least-once語義。
使用Java語言實(shí)現(xiàn)了Mars,并在典型的分組統(tǒng)計(jì)使用場景下對(duì)Mars進(jìn)行了測試,同時(shí)與Storm和Spark Streaming進(jìn)行對(duì)比。
實(shí)驗(yàn)采用由30臺(tái)服務(wù)器組成的集群。其中,消息中間件擁有3個(gè)服務(wù)節(jié)點(diǎn)和30個(gè)數(shù)據(jù)節(jié)點(diǎn),分布式緩存為15節(jié)點(diǎn)的主備集群,分布式協(xié)調(diào)服務(wù)擁有7個(gè)服務(wù)節(jié)點(diǎn)。
實(shí)驗(yàn)的數(shù)據(jù)源選用了模擬的網(wǎng)絡(luò)數(shù)據(jù)流S,數(shù)據(jù)流的每個(gè)元組包含20個(gè)字段,其中關(guān)鍵字段有timestamp、type、sip、dip、port、location等。
每個(gè)字段的內(nèi)容根據(jù)一個(gè)已知的集合中以均勻概率隨機(jī)生成,生成后每個(gè)元組的平均大小為150字節(jié)。共生成元組10億條,整個(gè)數(shù)據(jù)集大小140 G。在每個(gè)單元實(shí)驗(yàn)前,提前將數(shù)據(jù)集加載到分布式消息中間件中。
在上述數(shù)據(jù)流上,構(gòu)建了如下的統(tǒng)計(jì)場景:
SELECT sip, type, timestamp, count(*)
FROM S
GROUP BY sip, type, timestamp
WINDOWING 60 s
其中,WINDOWING關(guān)鍵字表示以60 s大小的跳動(dòng)窗口進(jìn)行統(tǒng)計(jì)。需要特別說明的是,對(duì)時(shí)間戳進(jìn)行分組統(tǒng)計(jì)是為了使統(tǒng)計(jì)結(jié)果具有應(yīng)用價(jià)值,Mapper會(huì)對(duì)時(shí)間戳以窗口大小,即60 s為單位進(jìn)行歸一化。
性能一般以吞吐量為表征,吞吐量計(jì)算公式如下:
T=tps×bs×ts
其中,T表示吞吐量;tps表示消息中間件每秒處理的事務(wù)數(shù),實(shí)驗(yàn)中Mapper集群作為消息中間件的消費(fèi)者,tps相當(dāng)于每秒消費(fèi)的數(shù)據(jù)集個(gè)數(shù);bs表示每個(gè)數(shù)據(jù)集包含的元組數(shù);ts表示每個(gè)元組的大小(平均)。
分別在不同的數(shù)據(jù)集大小和不同的集群規(guī)模下對(duì)上述統(tǒng)計(jì)需求進(jìn)行實(shí)驗(yàn),結(jié)果如下所述。
3.4.1 集群規(guī)模固定,數(shù)據(jù)集大小不同
實(shí)驗(yàn)在20個(gè)節(jié)點(diǎn)的集群上完成,吞吐量取統(tǒng)計(jì)過程中整個(gè)集群吞吐量的平均值。由圖3可見,隨著每個(gè)數(shù)據(jù)集所包含的元組數(shù)量的增長,一開始吞吐量上升很快,當(dāng)數(shù)據(jù)及大小達(dá)到5 000時(shí),吞吐量達(dá)到峰值,隨著數(shù)據(jù)及大小繼續(xù)增加,吞吐量呈緩慢下降的趨勢。不難分析出,當(dāng)數(shù)據(jù)集大小為1這種極端情況時(shí),每次網(wǎng)絡(luò)開銷只傳輸一個(gè)元組,效率極低;當(dāng)數(shù)據(jù)集大小增大到5 000個(gè)元組時(shí),網(wǎng)絡(luò)開銷和解包開銷達(dá)到一個(gè)平衡點(diǎn),故性能達(dá)到最優(yōu);當(dāng)數(shù)據(jù)集大小繼續(xù)增大時(shí),雖然每次網(wǎng)絡(luò)開銷得到的數(shù)量足夠大,但是反序列化數(shù)據(jù)流會(huì)占用大量的CPU資源,導(dǎo)致用于統(tǒng)計(jì)的系統(tǒng)資源減少,從而吞吐量下降。
圖3 不同數(shù)據(jù)集大小時(shí)的吞吐量變化曲線
3.4.2 數(shù)據(jù)集大小固定,集群規(guī)模變化
在上述實(shí)驗(yàn)得出的最優(yōu)數(shù)據(jù)集大小下,吞吐量取統(tǒng)計(jì)過程中整個(gè)集群吞吐量的平均值。由圖4可見,吞吐量隨著集群規(guī)模的逐步增大,幾乎呈線性增長,當(dāng)集群規(guī)模增大到20個(gè)節(jié)點(diǎn)時(shí),性能達(dá)到最優(yōu),此時(shí)整個(gè)集群吞吐量達(dá)到3.5 GB/s,可以計(jì)算得單節(jié)點(diǎn)吞吐量為179 MB/s。
圖4 不同集群規(guī)模時(shí)的吞吐量變化曲線
當(dāng)集群規(guī)模繼續(xù)擴(kuò)大時(shí),吞吐量并未繼續(xù)增加,這是由于從分布式消息中間件消費(fèi)數(shù)據(jù)時(shí)是讀盤操作。
測試數(shù)據(jù)中使用的數(shù)據(jù)是提前發(fā)送并緩存在消息隊(duì)列上的。系統(tǒng)從消息隊(duì)列中消費(fèi)數(shù)據(jù)時(shí),消息隊(duì)列會(huì)從磁盤上讀取數(shù)據(jù)。由于消息隊(duì)列特性,每個(gè)消息隊(duì)列節(jié)點(diǎn)只會(huì)從一塊磁盤上讀取數(shù)據(jù)。一共有35個(gè)消息隊(duì)列節(jié)點(diǎn),磁盤讀取速度上限大約是100 Mbps,因此整個(gè)消息隊(duì)列所能提供的最大消費(fèi)速率約為3.5 G。當(dāng)集群規(guī)模大于20時(shí),由于消息隊(duì)列磁盤讀取速度已達(dá)上限,速度無法繼續(xù)增加。
同時(shí),將上述的統(tǒng)計(jì)需求分別使用Spark Streaming和Storm的編程接口進(jìn)行了實(shí)現(xiàn),二者同樣使用Mars的分布式消息中間件作為輸入和輸出。
3.5.1 性能對(duì)比
由于Spark Streaming和Storm本身已經(jīng)對(duì)數(shù)據(jù)集進(jìn)行了抽象,故無需在不同數(shù)據(jù)集大小的情況下進(jìn)行對(duì)比。在不同的集群規(guī)模下,實(shí)驗(yàn)結(jié)果如圖5所示。
圖5 Mars,Spark Streaming,Storm性能對(duì)比曲線
由圖5可見,與Storm和Spark Streaming相比,Mars在分組統(tǒng)計(jì)需求下具有較明顯的性能優(yōu)勢。在集群規(guī)模為20時(shí),Mars的吞吐量是Spark Streaming的1.46倍,是Storm的2.82倍。
3.5.2 實(shí)時(shí)性對(duì)比
實(shí)時(shí)性方面,實(shí)驗(yàn)計(jì)算了部分處理日志中記錄的每個(gè)元組的平均延遲。Storm專門為了流處理場景設(shè)計(jì),平均延遲最小,為653 ms;Spark Streaming由于需要“攢”數(shù)據(jù),平均延遲達(dá)到了2 383 ms;Mars介于兩者之間,平均延遲為1 372 ms。
3.5.3 語義準(zhǔn)確性對(duì)比
由于統(tǒng)計(jì)需求是在連續(xù)跳動(dòng)窗口上的分組統(tǒng)計(jì),沒有過濾對(duì)數(shù)據(jù)量產(chǎn)生變化的計(jì)算,因此如果統(tǒng)計(jì)過程保證了exactly-once語義,那么統(tǒng)計(jì)結(jié)果中分組統(tǒng)計(jì)量的和應(yīng)與原始記錄數(shù)量保持一致。
該實(shí)驗(yàn)使用上述實(shí)驗(yàn)數(shù)據(jù)中一個(gè)4 000萬數(shù)據(jù)量的子集完成,實(shí)驗(yàn)集群在實(shí)驗(yàn)過程中分別將其中三個(gè)節(jié)點(diǎn)斷網(wǎng)以模擬故障,實(shí)驗(yàn)結(jié)果如表1所示。
表1 Mars Spark Streaming,Storm語義準(zhǔn)確性對(duì)比
由表1可見,Mars雖然僅實(shí)現(xiàn)了at-least-once語義,但由于其良好的容錯(cuò)性,在發(fā)生節(jié)點(diǎn)故障時(shí)并沒有造成數(shù)據(jù)丟失或重復(fù),實(shí)現(xiàn)了與Storm相同級(jí)別的語義限制。
提出了一個(gè)面向基于窗口的連續(xù)查詢需求的分布式數(shù)據(jù)流統(tǒng)計(jì)模型。該模型在保證at-least-once語義的前提下,實(shí)現(xiàn)了優(yōu)異的性能,尤其在基于窗口的分組統(tǒng)計(jì)這一統(tǒng)計(jì)場景下,相比目前流行的分布式數(shù)據(jù)流處理平臺(tái)具有較明顯的優(yōu)勢。同時(shí),Mars具有良好的可擴(kuò)展性,使其在面對(duì)不同規(guī)模的數(shù)據(jù)場景時(shí)有良好的適應(yīng)性。
與此同時(shí),由于該統(tǒng)計(jì)模型是面向大規(guī)模實(shí)時(shí)流統(tǒng)計(jì)場景的,因此對(duì)于需要進(jìn)行迭代計(jì)算、復(fù)雜計(jì)算的流處理場景支持并不完善。該模型的下一步發(fā)展和完善應(yīng)當(dāng)是面對(duì)更多流處理場景,進(jìn)行通用化拓展。
[1] CHAUHAN J, CHOWDHURY S A, MAKAROFF D. Performance evaluation of Yahoo! S4:a first look[C]//Seventh international conference on P2P,parallel,grid,cloud and internet computing.[s.l.]:IEEE,2012:58-65.
[2] NEUMEYER L,ROBBINS B,NAIR A,et al.S4:distributed stream computing platform[C]//Proceedings of the 10th IEEE international conference on data mining workshops.Sydney:IEEE Press,2010:170-177.
[3] SIMONCELLI D,DUSI M,GRINGOLI F,et al.Scaling out the performance of service monitoring applications with BlockMon[C]//Proceedings of the 14th international conference on passive and active measurement.Hong Kong:IEEE Press,2013:253-255.
[4] ZAHARIA M,DAS T,LI H,et al.Discretized streams:an efficient and fault-tolerant model for stream processing on large clusters[C]//USENIX conference on hot topics in cloud computing.[s.l.]:USENIX,2012.
[5] 張 鵬,李鵬霄,任 彥,等.面向大數(shù)據(jù)的分布式流處理技術(shù)綜述[J].計(jì)算機(jī)研究與發(fā)展,2014,51:1-9.
[6] 崔星燦,禹曉輝,劉 洋,等.分布式流處理技術(shù)綜述[J].計(jì)算機(jī)研究與發(fā)展,2015,52(2):318-332.
[7] GüNDüZ S,?ZSU M T.A web page prediction model based on click-stream tree representation of user behavior[C]//Proceedings of the ninth ACM SIGKDD international conference on knowledge discovery and data mining.[s.l.]:ACM,2003:535-540.
[8] BABCOCK B,BABU S,DATAR M,et al.Models and issues in data stream systems[C]//Proceedings of the twenty-first ACM SIGMOD-SIGACT-SIGART symposium on principles of database systems.[s.l.]:ACM,2002:1-16.
[9] 孫大為,張廣艷,鄭緯民.大數(shù)據(jù)流式計(jì)算:關(guān)鍵技術(shù)及系統(tǒng)實(shí)例[J].軟件學(xué)報(bào),2014,25(4):839-862.
[10] PERNG C S,WANG H,ZHANG S R,et al.Landmarks:a new model for similarity-based pattern querying in time series databases[C]//16th international conference on data engineering.[s.l.]:IEEE,2000:33-42.
[11] BABCOCK B,DATAR M,MOTWANI R.Sampling from a moving window over streaming data[C]//Proceedings of the thirteenth annual ACM-SIAM symposium on discrete algorithms.[s.l.]:[s.n.],2002:633-634.
[12] ZHU Y,SHASHA D.Statstream:statistical monitoring of thousands of data streams in real time[C]//Proceedings of the 28th international conference on very large data bases.[s.l.]:[s.n.],2002:358-369.
[13] 熊全洪,魏 娟,劉 武.即席查詢研究[J].現(xiàn)代商貿(mào)工業(yè),2008,20(12):345-346.
[14] CHANDRASEKARAN S,FRANKLIN M J.Streaming queries over streaming data[C]//Proceedings of the 28th international conference on very large data bases.[s.l.]:[s.n.],2002:203-214.
[15] 何小東,尹海波.基于共享緩沖區(qū)的數(shù)據(jù)流處理框架設(shè)計(jì)與實(shí)現(xiàn)[J].計(jì)算機(jī)工程與設(shè)計(jì),2012,33(11):4398-4401.
[16] 陳晗鳴,羅 威,李明輝.分布式系統(tǒng)中基于主/副版本的實(shí)時(shí)容錯(cuò)調(diào)度綜述[J].計(jì)算機(jī)應(yīng)用研究,2012,29(11):4017-4022.