国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

大數(shù)據(jù)環(huán)境下的分布式數(shù)據(jù)流處理關(guān)鍵技術(shù)探析

2017-05-24 14:45陳付梅韓德志戴永濤
計(jì)算機(jī)應(yīng)用 2017年3期
關(guān)鍵詞:流式數(shù)據(jù)流隊(duì)列

陳付梅,韓德志,畢 坤,戴永濤

(上海海事大學(xué) 信息工程學(xué)院,上海 201306) (*通信作者電子郵箱dezhihan88@sina.com)

大數(shù)據(jù)環(huán)境下的分布式數(shù)據(jù)流處理關(guān)鍵技術(shù)探析

陳付梅,韓德志*,畢 坤,戴永濤

(上海海事大學(xué) 信息工程學(xué)院,上海 201306) (*通信作者電子郵箱dezhihan88@sina.com)

大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流處理實(shí)時(shí)性要求高,數(shù)據(jù)計(jì)算要求持續(xù)性和高可靠性。分布式數(shù)據(jù)流處理系統(tǒng)(DDSPS)能解決大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流處理問題,它除具備分布式系統(tǒng)的可擴(kuò)展性和容錯(cuò)性優(yōu)勢(shì)外,還具有高的實(shí)時(shí)處理能力。詳細(xì)介紹了組成基于大數(shù)據(jù)的分布式數(shù)據(jù)流處理系統(tǒng)的四個(gè)子系統(tǒng)及其關(guān)鍵技術(shù),討論和比較了各個(gè)子系統(tǒng)的不同技術(shù)方案;同時(shí)介紹一種分布式拒絕服務(wù)(DDoS)攻擊檢測(cè)數(shù)據(jù)流處理系統(tǒng)結(jié)構(gòu)案例,其研究?jī)?nèi)容能為大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流處理理論研究和應(yīng)用技術(shù)開發(fā)提供技術(shù)參考。

大數(shù)據(jù);流處理;消息隊(duì)列;數(shù)據(jù)處理;數(shù)據(jù)存儲(chǔ)

0 引言

2016年1月,中國(guó)互聯(lián)網(wǎng)信息中心(China Internet Network Information Center, CNNIC)發(fā)布了《第37次CNNIC報(bào)告:中國(guó)互聯(lián)網(wǎng)絡(luò)發(fā)展?fàn)顩r統(tǒng)計(jì)》[1],報(bào)告統(tǒng)計(jì)了目前中國(guó)互聯(lián)網(wǎng)大環(huán)境的發(fā)展情況。數(shù)據(jù)顯示,截至2015年12月,中國(guó)的網(wǎng)民規(guī)模達(dá)6.88億,全年共計(jì)新增網(wǎng)民3 951萬人;互聯(lián)網(wǎng)普及率為50.3%,較2014年底提升了2.4個(gè)百分點(diǎn),同時(shí)根據(jù)IDC(International Data Corporation, IDC)發(fā)布的數(shù)字宇宙報(bào)告顯示,至2020年數(shù)字宇宙將超出預(yù)期,達(dá)到40 ZB,相當(dāng)于地球上人均產(chǎn)生5 247 GB的數(shù)據(jù)[2]。上述數(shù)據(jù)意味著大數(shù)據(jù)時(shí)代已經(jīng)來臨,大量的信息呈現(xiàn)在用戶面前。比如國(guó)內(nèi)最大的電商平臺(tái)淘寶網(wǎng)每日訪問用戶達(dá)6 000萬,每日在線商品數(shù)目已經(jīng)超過了8億件。面對(duì)急速增長(zhǎng)的數(shù)據(jù)規(guī)模,用戶正面臨著“信息超載問題”,如果不借助于大數(shù)據(jù)分析系統(tǒng)和搜索引擎等輔助技術(shù),用戶從海量的互聯(lián)網(wǎng)資源中找到自己真正感興趣的信息是一件非常困難的事情,使得信息的有效利用率反而降低了。

在大數(shù)據(jù)時(shí)代,數(shù)據(jù)的來源已經(jīng)不再是人們所關(guān)心的問題,如何從形態(tài)多樣的海量數(shù)據(jù)中高效、快速、及時(shí)地挖掘出有用的信息這才是關(guān)鍵,這也大數(shù)據(jù)分析面臨的難題。單機(jī)系統(tǒng)不能滿足海量數(shù)據(jù)分析處理要求,Hadoop系統(tǒng)的開源解決了此難題;基于Google GFS(Google File System)[3]實(shí)現(xiàn)的HDFS(Hadoop Distributed File System)解決了海量數(shù)據(jù)的存儲(chǔ)問題;MapReduce則實(shí)現(xiàn)了海量數(shù)據(jù)的分布式計(jì)算,這大幅度降低了大數(shù)據(jù)處理的門檻,使得海量數(shù)據(jù)處理成為一種可能。雖然Hadoop系統(tǒng)具有近乎線性的擴(kuò)展能力、良好的容錯(cuò)性、分布式的計(jì)算能力等優(yōu)勢(shì),是很多公司處理海量數(shù)據(jù)的首選方案,但是,它仍存在一個(gè)關(guān)鍵的缺陷——缺乏實(shí)時(shí)性。Hadoop主要用于海量數(shù)據(jù)的離線計(jì)算,因此,從數(shù)據(jù)的產(chǎn)生到得到最終的數(shù)據(jù)結(jié)果之間存在時(shí)間差,不能滿足實(shí)時(shí)性要求高的應(yīng)用要求。

實(shí)時(shí)化、內(nèi)存計(jì)算、泛在化(普適應(yīng)計(jì)算或環(huán)境智能化)、智能化是當(dāng)今大數(shù)據(jù)技術(shù)的四大發(fā)展趨勢(shì)[4]。實(shí)時(shí)化作為發(fā)展趨勢(shì)之一,已經(jīng)受到越來越多的關(guān)注。而數(shù)據(jù)的實(shí)時(shí)處理,首先需要在數(shù)據(jù)產(chǎn)生的時(shí)候?qū)⑵滢D(zhuǎn)為源源不斷的數(shù)據(jù)流,并將數(shù)據(jù)流發(fā)送給流處理系統(tǒng),然后由流處理系統(tǒng)對(duì)數(shù)據(jù)流進(jìn)行在線(實(shí)時(shí),real-time)或近線(接近實(shí)時(shí),near-real-time)分析。流數(shù)據(jù)處理系統(tǒng)的核心思想是:從不斷流入的新數(shù)據(jù)中提取感興趣的、有效的信息,縮減數(shù)據(jù)從產(chǎn)生到被利用的時(shí)間間隔,丟棄部分無效數(shù)據(jù)或者已經(jīng)被處理后的原始數(shù)據(jù),在獲取有效信息的同時(shí)避免存儲(chǔ)海量原始數(shù)據(jù),降低數(shù)據(jù)的存儲(chǔ)成本。

大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流具有五大特點(diǎn):實(shí)時(shí)性、易失性、無序性、突發(fā)性、無限性[5-6],因此,在大數(shù)據(jù)流處理過程中面臨著一些新的挑戰(zhàn)[7]:實(shí)時(shí)性要求高、流入數(shù)據(jù)量不可預(yù)知、數(shù)據(jù)和計(jì)算的持續(xù)性、數(shù)據(jù)計(jì)算要求高的可靠性。分布式流處理系統(tǒng)是解決大數(shù)據(jù)流最理想系統(tǒng),它具備了分布式系統(tǒng)的可擴(kuò)展性和容錯(cuò)性等優(yōu)勢(shì),同時(shí)又很好地應(yīng)對(duì)了大數(shù)據(jù)流的各種挑戰(zhàn)。

分布流處理系統(tǒng)是一個(gè)很復(fù)雜的系統(tǒng),它由多個(gè)子系統(tǒng)組成,并且需要不同的子系統(tǒng)之間相互分工、共同協(xié)作[3]。一個(gè)完整的流式數(shù)據(jù)處理系統(tǒng)由4部分組成:1)數(shù)據(jù)收集系統(tǒng)(Data Collection System),用于收集、匯總原始數(shù)據(jù)。2)消息隊(duì)列系統(tǒng)(Message Management System),對(duì)收集到的實(shí)時(shí)數(shù)據(jù)進(jìn)行轉(zhuǎn)存、維護(hù),并將數(shù)據(jù)傳送給數(shù)據(jù)處理和分析系統(tǒng),是原始數(shù)據(jù)的中轉(zhuǎn)站、緩沖區(qū)。所謂的消息,是接收到的一條一條的數(shù)據(jù)。3)數(shù)據(jù)處理系統(tǒng)(Data Processing System),是整個(gè)流式數(shù)據(jù)處理系統(tǒng)的核心。它主要負(fù)責(zé)從消息隊(duì)列系統(tǒng)或其他數(shù)據(jù)發(fā)送系統(tǒng)中獲取數(shù)據(jù),并進(jìn)行及時(shí)的業(yè)務(wù)邏輯處理和分析,然后將處理后的結(jié)果數(shù)據(jù)保存到數(shù)據(jù)庫系統(tǒng)中或直接傳送給其他業(yè)務(wù)處理系統(tǒng),作進(jìn)一步的分析和展示。4)數(shù)據(jù)存儲(chǔ)系統(tǒng)(Data Storage System),是處理后的數(shù)據(jù)的最終歸屬地,也是連接流式數(shù)據(jù)處理系統(tǒng)跟其他系統(tǒng)之間的橋梁。本文將對(duì)大數(shù)據(jù)環(huán)境下的一個(gè)完整分布式流處理系統(tǒng)四個(gè)組成部分所采用的技術(shù)進(jìn)行介紹和探析,同時(shí)介紹一種分布式拒絕服務(wù)(Distributed Denial of Service, DDoS)攻擊檢測(cè)數(shù)據(jù)流處理系統(tǒng)結(jié)構(gòu),為大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流處理理論研究和應(yīng)用技術(shù)開發(fā)提供參考。

1 數(shù)據(jù)收集

海量的數(shù)據(jù)是大數(shù)據(jù)出現(xiàn)的前提,而數(shù)據(jù)收集則是大數(shù)據(jù)的基石。日志數(shù)據(jù)收集在流數(shù)據(jù)收集中占有重要比重,許多公司的業(yè)務(wù)平臺(tái)每天都會(huì)分散的產(chǎn)生大量日志數(shù)據(jù),收集并匯總這些業(yè)務(wù)日志數(shù)據(jù),供離線和在線的分析系統(tǒng)使用。日志收集系統(tǒng)所需考慮的基本特征包括:高可靠性、高可用性和高可擴(kuò)展性?!胺稚⑹占?,集中處理”是當(dāng)前日志處理系統(tǒng)的一個(gè)主流思想。日志收集也是流式日志處理系統(tǒng)的前提和基礎(chǔ),日志只有被實(shí)時(shí)收集、匯總之后,才能進(jìn)行后續(xù)的相關(guān)處理操作。下面針對(duì)當(dāng)前流行的開源日志數(shù)據(jù)流收集系統(tǒng),進(jìn)行介紹和對(duì)比。

1.1 Scribe

Scribe[8]是Facebook為了滿足內(nèi)部大量日志處理而設(shè)計(jì)的日志收集系統(tǒng)。它能將分散在不同服務(wù)器上的不同應(yīng)用日志匯總到中央存儲(chǔ)系統(tǒng),通常是將日志存入HDFS中,為日志的集中處理提供了有力的保障。

Scribe數(shù)據(jù)的傳遞依賴于Thrift[9],Thrift通過一個(gè)中間語言(接口定義語言),來定義遠(yuǎn)程過程調(diào)用(Remote Procedure Call, RPC)的接口和數(shù)據(jù)類型,然后通過編譯器,生成不同語言的代碼,是跨語言服務(wù)的部署框架)。其架構(gòu)如圖1所示,由以下三部分構(gòu)成:

1)Scribe Agent。位于日志產(chǎn)生的應(yīng)用服務(wù)器上,實(shí)質(zhì)是一個(gè)Thrift客戶端,通過RPC負(fù)責(zé)將應(yīng)用系統(tǒng)產(chǎn)生的日志,發(fā)送到匯總服務(wù)端。

2)Scribe Collector。完成多個(gè)Agent發(fā)送過來的數(shù)據(jù)接收,并將數(shù)據(jù)存入可靠的存儲(chǔ)介質(zhì)中,如:本地磁盤、HDFS等,此部分并不是Scribe日志收集系統(tǒng)中的必需部分,可以跳過Collector直接將日志從Agent存入到存儲(chǔ)系統(tǒng)中。

3)存儲(chǔ)介質(zhì)。Scribe已經(jīng)實(shí)現(xiàn)了向不同類型的存儲(chǔ)介質(zhì)中寫入數(shù)據(jù)的功能,包括文件系統(tǒng)(如HDFS,位于本地磁盤或共享式的存儲(chǔ)系統(tǒng)中),網(wǎng)絡(luò)(直接發(fā)送給其他Scribe),緩存(可滿足故障恢復(fù)的要求,數(shù)據(jù)優(yōu)先寫入主存儲(chǔ)中,若主存儲(chǔ)故障,則存入到備份的存儲(chǔ)中),多存儲(chǔ)介質(zhì)(同時(shí)將數(shù)據(jù)寫入不同的存儲(chǔ)系統(tǒng)中,達(dá)到數(shù)據(jù)備份的目的)。

從架構(gòu)上分析,Scribe能在一定程度上保證數(shù)據(jù)不丟失。Scribe進(jìn)程能將消息在內(nèi)存中緩存一段時(shí)間,但是當(dāng)Scribe Agent出現(xiàn)故障時(shí),這些緩存的數(shù)據(jù)就會(huì)丟失,因此,從這方面來講,Scribe不能嚴(yán)格保證數(shù)據(jù)可靠性。

圖1 Scribe 體系架構(gòu)

1.2 Flume

Flume最初是由Cloudera的工程師設(shè)計(jì)用于合并日志數(shù)據(jù)的系統(tǒng)[10],后將其開源出來,并逐漸發(fā)展成為一款開源、高可靠、高擴(kuò)展、易管理、支持客戶擴(kuò)展的分布式數(shù)據(jù)流采集系統(tǒng),主要是用于日志數(shù)據(jù)的收集和聚合。

在原始的Flume版本中,一個(gè)完整的Flume系統(tǒng)由Agent(用于采集數(shù)據(jù))、Master(配置及通信管理)、Collector(對(duì)數(shù)據(jù)進(jìn)行聚合)構(gòu)成。而重構(gòu)后的新版Flume也稱為Flume NG(Next Generation),其系統(tǒng)中只有Agent一種角色。圖2為Flume NG的架構(gòu),由分布在不同節(jié)點(diǎn)的Agent負(fù)責(zé)收集不同的應(yīng)用所產(chǎn)生的數(shù)據(jù),并發(fā)往匯總的Agent節(jié)點(diǎn),最后存入大容量、高可靠的存儲(chǔ)系統(tǒng),如:HDFS。

圖2 Flume架構(gòu)

每一個(gè)Flume Agent的內(nèi)部都是由Source、Channel以及Sink組成。Source即為要收集數(shù)據(jù)的來源,負(fù)責(zé)產(chǎn)生或接收數(shù)據(jù),并發(fā)往Channel。Channel則是負(fù)責(zé)接收來自Source的數(shù)據(jù),并傳送到Sink,負(fù)責(zé)對(duì)數(shù)據(jù)提供可靠性保證。Sink則是從Channel拉取數(shù)據(jù),并將數(shù)據(jù)寫入到后端的存儲(chǔ)系統(tǒng)中,已經(jīng)實(shí)現(xiàn)的Sink包括:HDFS Sink(將數(shù)據(jù)寫入到HDFS中)、Hive Sink(將數(shù)據(jù)存入Hive中)、Avro Sink(將數(shù)據(jù)以Avro的方式進(jìn)行序列化,并發(fā)往后端的Avro接收端,也可以是Flume的Avro Source)等若干常見的數(shù)據(jù)存儲(chǔ)和接收系統(tǒng)。

1.3 Chukwa

Chukwa是Apache旗下的一款開源數(shù)據(jù)收集軟件[11],它可以將不同類型的數(shù)據(jù)匯聚成適合Hadoop處理的文件,并保存在HDFS中,并與Hadoop集成,可以快速方便地進(jìn)行各種MapReduce操作。Chukwa本身已經(jīng)實(shí)現(xiàn)了很多內(nèi)置的功能,能夠用于數(shù)據(jù)的收集和整理。

Chukwa的架構(gòu)如圖3所示,由Agent、Collector以及HDFS構(gòu)成。Agent是運(yùn)行在不同節(jié)點(diǎn)上負(fù)責(zé)收集數(shù)據(jù)的程序,而Agent又由多個(gè)Adapter組成,并由Adapter執(zhí)行實(shí)際的數(shù)據(jù)收集工作;Collector負(fù)責(zé)接收不同的Agent發(fā)送過來的數(shù)據(jù),并負(fù)責(zé)將數(shù)據(jù)寫入HDFS;HDFS是Chukwa中數(shù)據(jù)的最終存儲(chǔ)系統(tǒng),能夠滿足海量數(shù)據(jù)的存儲(chǔ)要求,并具有很好的容錯(cuò)性、可用性、擴(kuò)展性。Chukwa非常適合于將數(shù)據(jù)收集后需要進(jìn)行MapReduce操作的應(yīng)用場(chǎng)景。

圖3 Chukwa架構(gòu)

1.4 LogStash

LogStash[12]是著名的開源數(shù)據(jù)棧ELK (ElasticSearch, Logstash, Kibana)中的那個(gè)L,其主要功能就是進(jìn)行數(shù)據(jù)的收集,配合ElasticSearch進(jìn)行數(shù)據(jù)索引和檢索,Kibana用于數(shù)據(jù)的展示,即為一個(gè)完整實(shí)時(shí)數(shù)據(jù)分析平臺(tái)。LogStash是一款輕量級(jí)的日志收集處理軟件,可以極其方便地把分散的、多樣化的日志收集起來,并能根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)自定義的處理,然后傳輸?shù)街付ǖ奈恢?,比如某個(gè)服務(wù)器或者文件。

圖4為L(zhǎng)ogStash架構(gòu),Input Plugin負(fù)責(zé)從不同的地方接收或讀取數(shù)據(jù),轉(zhuǎn)化為L(zhǎng)ogStash所支持的事件格式,其已經(jīng)實(shí)現(xiàn)了幾十種常用的Input Plugin。Filter Plugin則用于根據(jù)自定義的規(guī)則對(duì)事件進(jìn)行過濾或轉(zhuǎn)為特定的格式。Output Plugin則將事件發(fā)往指定的存儲(chǔ)位置,將數(shù)據(jù)進(jìn)行持久化存儲(chǔ),完成數(shù)據(jù)的匯總。LogStash已經(jīng)有非常豐富的Plugin,而且,如果已有的Plugin不能滿足要求,還能通過自己編碼來實(shí)現(xiàn)自定義的Plugin,因此,其靈活性非常好。

圖4 LogStash架構(gòu)

1.5 日志收集系統(tǒng)對(duì)比

日志數(shù)據(jù)流收集系統(tǒng)具備三個(gè)基本組件,分別是Agent(接收原始數(shù)據(jù),并將數(shù)據(jù)發(fā)給Collector)、Collector(接收多個(gè)Agent發(fā)送過來的數(shù)據(jù),匯總后將數(shù)據(jù)發(fā)往Store)、Store(中央存儲(chǔ)系統(tǒng),將匯總后的數(shù)據(jù)進(jìn)行持久化存儲(chǔ))。表1綜合對(duì)比了Scribe、Flume、Chukwa、LogStash四種日志數(shù)據(jù)流收集系統(tǒng)。

表1 四種日志數(shù)據(jù)流收集系統(tǒng)對(duì)比

所述四種數(shù)據(jù)流收集系統(tǒng)都具備一定高可用和擴(kuò)展性,且都是開源的系統(tǒng),完全可以進(jìn)行二次開發(fā),完成功能的自定義擴(kuò)展。由表1綜合考慮,F(xiàn)lume在各個(gè)方面都具備一定優(yōu)勢(shì),是一款通用的數(shù)據(jù)流收集軟件;若需要對(duì)數(shù)據(jù)流進(jìn)行檢索,則LogStash是非常不錯(cuò)的選擇;若要實(shí)現(xiàn)對(duì)收集的數(shù)據(jù)流進(jìn)行MapReduce操作,則可以選擇Chukwa。

2 消息隊(duì)列管理技術(shù)

在離線數(shù)據(jù)處理系統(tǒng)中,只需要將數(shù)據(jù)進(jìn)行匯總到中央存儲(chǔ)系統(tǒng),然后對(duì)匯總后的數(shù)據(jù)定期的集中處理即可。在數(shù)據(jù)流處理系統(tǒng)中,由于數(shù)據(jù)是源源不斷流入,且需要對(duì)新增的數(shù)據(jù)進(jìn)行實(shí)時(shí)處理。相比于離線數(shù)據(jù)處理系統(tǒng),數(shù)據(jù)流處理系統(tǒng)中需要一個(gè)消息隊(duì)列系統(tǒng)充當(dāng)數(shù)據(jù)緩沖區(qū)的角色,一方面快速接收數(shù)據(jù)收集系統(tǒng)發(fā)送過來的數(shù)據(jù),另一方面,當(dāng)數(shù)據(jù)處理系統(tǒng)處理能力未達(dá)到滿負(fù)載時(shí),盡量快地發(fā)送數(shù)據(jù)給數(shù)據(jù)流處理系統(tǒng),當(dāng)處理系統(tǒng)達(dá)到滿負(fù)載時(shí),緩存接收到的數(shù)據(jù),減緩數(shù)據(jù)發(fā)往數(shù)據(jù)流處理系統(tǒng)的速度。下面主要介紹當(dāng)前比較流行的幾款消息隊(duì)列系統(tǒng)。

2.1 RabbitMQ

RabbitMQ[13]是一個(gè)由Erlang開發(fā)的、基于高級(jí)消息隊(duì)列(Advanced Message Queue, AMQP)協(xié)議[14]的開源消息系統(tǒng)。其最初誕生于金融應(yīng)用系統(tǒng),用于轉(zhuǎn)發(fā)存儲(chǔ)分布式系統(tǒng)中的消息,在擴(kuò)展性、易用性、高可用性等方面優(yōu)勢(shì)突出。

如圖5所示,RabbitMQ中包括Producer(消息產(chǎn)生者)、Broker(消息隊(duì)列管理者)和Consumer(消息使用者)。

圖5 RabbitMQ架構(gòu)

Broker中的Exchange(消息交換機(jī))負(fù)責(zé)接收Producer發(fā)送過來的消息,并將消息根據(jù)Routing Key(路由關(guān)鍵字)綁定到不同的Queue(消息隊(duì)列)。每一條消息都會(huì)被綁定到至少一個(gè)Queue,而每一個(gè)Queue則是若干條消息的實(shí)體。Consumer再從不同的Queue中讀取數(shù)據(jù),進(jìn)行后續(xù)的分析和計(jì)算。

2.2 ZeroMQ

ZeroMQ[15]是一個(gè)非常輕量級(jí)的消息系統(tǒng),也是一種基于消息隊(duì)列的多線程網(wǎng)絡(luò)庫。它是網(wǎng)絡(luò)通信中新的一層,介于應(yīng)用層和傳輸層之間,像框架一樣的一個(gè)Socket library,大幅度簡(jiǎn)化了Socket編程,而且性能更高效。與傳統(tǒng)消息隊(duì)列管理系統(tǒng)不同的是,ZeroMQ不再需要一個(gè)消息服務(wù)器(Broker)來存儲(chǔ)轉(zhuǎn)發(fā)消息,而是直接在發(fā)送端緩存。ZeroMQ是一個(gè)可嵌入的并發(fā)框架,不需要獨(dú)立部署任何服務(wù)進(jìn)程,但需要在其提供的API(Application Program Interface)基礎(chǔ)上編程實(shí)現(xiàn)消息管理邏輯,從這方面來講,它是一個(gè)比較復(fù)雜的系統(tǒng)。ZeroMQ設(shè)計(jì)初衷就是為了盡可能快地發(fā)送消息,且其具有良好的跨平臺(tái)、跨語言特性,能夠在Windows、Linux、OS X下運(yùn)行,能支持超過20種編程語言的編程操作。

圖6ZeroMQ架構(gòu)圖中的I/O(Input/Output)線程所涉及的I/O操作都是異步的。ZeroMQ會(huì)在初始化時(shí)要求用戶傳入接口參數(shù),并根據(jù)這些參數(shù)創(chuàng)建對(duì)應(yīng)的I/O線程,每個(gè)I/O線程都有與之綁定的Poller(輪詢器),Poller則采用Reactor模型[16]與不同操作系統(tǒng)平臺(tái)的I/O模型進(jìn)行通信。主線程與I/O線程通過消息盒子(Mail Box)進(jìn)行通信。Server開始監(jiān)聽或者Client發(fā)起連接時(shí),在主線程中創(chuàng)建連接器或監(jiān)聽器,通過消息盒子以發(fā)消息的形式將其綁定到I/O線程,I/O線程會(huì)把連接器或監(jiān)聽器添加到Poller中用以偵聽讀/寫事件。Server與Client在第一次通信時(shí),會(huì)發(fā)送認(rèn)證標(biāo)識(shí)符,用以進(jìn)行認(rèn)證。認(rèn)證結(jié)束后,雙方會(huì)為此次連接創(chuàng)建會(huì)話(Session),以后雙方就通過會(huì)話進(jìn)行通信。每個(gè)會(huì)話都會(huì)關(guān)聯(lián)到相應(yīng)的讀/寫管道,主線程收發(fā)消息只是分別從管道中讀/寫數(shù)據(jù)。會(huì)話并不直接跟Kernel交換I/O數(shù)據(jù),而是通過Plugin到會(huì)話中的Engine來與kernel交換I/O數(shù)據(jù)。

圖6 ZeroMQ整體架構(gòu)

2.3 Kafka

Kafka[17]是由LinkedIn開發(fā)的,作為其運(yùn)營(yíng)數(shù)據(jù)處理管道(Pipeline)和活動(dòng)流(Activity Stream)的基礎(chǔ),并于2010年將其開源,成為Apache下一個(gè)子項(xiàng)目。經(jīng)過幾年的發(fā)展,現(xiàn)在它已被用作數(shù)據(jù)管道和消息系統(tǒng)廣泛的使用在不同應(yīng)用領(lǐng)域。Kafka作為一個(gè)高性能的分布式發(fā)布/訂閱(Publish/Subscribe)消息隊(duì)列系統(tǒng),其具有以下特性:1)高吞吐量,能在低性能的設(shè)備上達(dá)到每秒數(shù)十萬的消息讀寫速度;2)支持水平擴(kuò)展,當(dāng)集群吞吐量不能滿足需求時(shí),只需要增加設(shè)備,就能讓其吞吐量近似線性地增長(zhǎng);3)容錯(cuò)性好,不管消息有沒有被消費(fèi)掉,都可以將數(shù)據(jù)存儲(chǔ)在磁盤上,可以對(duì)消息進(jìn)行多次讀取,且可以自動(dòng)將消息拷貝到不同的機(jī)器上,實(shí)現(xiàn)數(shù)據(jù)的冗余;4)保證消息有序,通過將消息分區(qū)存儲(chǔ),能保證每一個(gè)分區(qū)中的數(shù)據(jù)都能被有序地消費(fèi)。

Kafka的整體架構(gòu)如圖7所示,包括三種角色:生產(chǎn)者(Producer),向Kafka集群發(fā)送數(shù)據(jù)的一端,由不同的數(shù)據(jù)收集系統(tǒng)和組件構(gòu)成;代理集群(Broker Cluster),運(yùn)行Kafka相關(guān)進(jìn)程的一端,負(fù)責(zé)接收來自Producer的數(shù)據(jù),并將數(shù)據(jù)轉(zhuǎn)發(fā)給Consumer;消費(fèi)者(Consumer),即數(shù)據(jù)的使用者,如實(shí)時(shí)數(shù)據(jù)應(yīng)用系統(tǒng)等,完成對(duì)數(shù)據(jù)作業(yè)務(wù)邏輯相關(guān)的處理。

在Kafka中每一條消息至少屬于某一個(gè)主題(Topic),一個(gè)Topic則是某一類消息的分組,并根據(jù)消息的Topic進(jìn)行分區(qū)(Partition)并分散到不同服務(wù)器上的日志(log)文件中按順序存儲(chǔ)。每條消息所在的文件中會(huì)有一個(gè)不斷增長(zhǎng)的長(zhǎng)整型偏移量(offset),通過offeset能唯一標(biāo)識(shí)一條消息。Kafka中消息存儲(chǔ)和消費(fèi)有關(guān)的狀態(tài)信息,如:offeset,都是通過Zookeeper[18]來保存。雖然Kafka將數(shù)據(jù)存儲(chǔ)到了磁盤中,但是磁盤的順序讀寫速度是非常快,甚至能超過內(nèi)存的隨機(jī)讀寫速度,且Kafka中使用了Zero-Copy[19]技術(shù),因此Kafka能保證消息的快速讀取。

圖7 Kafka整體架構(gòu)

這三款消息隊(duì)列系統(tǒng)都是非常優(yōu)秀的,有很多共性,也有一些區(qū)別(如表2)。

表2 消息隊(duì)列系統(tǒng)對(duì)比

1)RabbixMQ采用通用高級(jí)消息隊(duì)列協(xié)議(AMQP),得到很多到公司的支持,且其能很好地支持消息的事物機(jī)制、數(shù)據(jù)的持久化,非常適用于金融行業(yè),但是在相同的配置情況下,其吞吐量比另外兩款消息隊(duì)列系統(tǒng)要低很多。

2)ZeroMQ實(shí)質(zhì)上是一個(gè)基于Socket的可嵌入的并發(fā)框架,其并沒有完整的實(shí)現(xiàn)消息隊(duì)列管理系統(tǒng),而是需要用戶通過調(diào)用相關(guān)的API來完成對(duì)消息的管理,因此,其使用起來要稍微復(fù)雜一些。其底層的相關(guān)技術(shù),能夠盡快地發(fā)送消息數(shù)據(jù),其吞吐量非常大,但是,不提供數(shù)據(jù)的持久化支持,即消息被消費(fèi)者接收后,就不能再次讀取,因此在一些需要非常高的吞吐量且不需要多次讀取消息,且也能容忍系統(tǒng)故障時(shí)丟失部分?jǐn)?shù)據(jù)的場(chǎng)景中比較適合。

3)Kafka則是RabbitMQ和ZeroMQ的折中方案,能支持消息的持久化,在盡可能保證數(shù)據(jù)不丟失的同時(shí),又使用Zero-Copy技術(shù)及順序的存儲(chǔ)和讀取消息機(jī)制,使其具有很高的吞吐量。其擴(kuò)展性也是非常的出色,只需要增加相應(yīng)的設(shè)備,即能使其吞吐量達(dá)到幾乎線性地增長(zhǎng)。Kafka比較適合互聯(lián)網(wǎng)的應(yīng)用場(chǎng)景,在很多的互聯(lián)網(wǎng)公司都被廣泛地使用。

3 分布式流式數(shù)據(jù)處理技術(shù)

數(shù)據(jù)被實(shí)時(shí)地收集和匯總形成數(shù)據(jù)流,為了盡快得到實(shí)時(shí)應(yīng)用系統(tǒng)需要的數(shù)據(jù)結(jié)果,需要數(shù)據(jù)分析系統(tǒng)能盡快完成對(duì)原始數(shù)據(jù)的處理。在大數(shù)據(jù)環(huán)境下,單臺(tái)服務(wù)器很難滿足短時(shí)間內(nèi)大量的數(shù)據(jù)計(jì)算要求,且考慮到業(yè)務(wù)和數(shù)據(jù)的增長(zhǎng),這些都要求數(shù)據(jù)分析系統(tǒng)具有良好的擴(kuò)展性。下面介紹目前幾種主流的分布式數(shù)據(jù)流處理系統(tǒng)。

3.1 Storm

Storm[20]最初是由Twitter開發(fā)并開源的、基于分布式的實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),在Twitter、Yahoo、Alibaba等很多知名的大公司都得到廣泛的應(yīng)用。其具有很好的容錯(cuò)性、擴(kuò)展性,且能到次秒級(jí)的延時(shí),非常適合于低延時(shí)的應(yīng)用場(chǎng)景。其組成系統(tǒng)組成為:

1)Nimbus,集群的主節(jié)點(diǎn),負(fù)責(zé)集群資源的管理、任務(wù)的調(diào)度分配。

2)Supervisor,負(fù)責(zé)接收Nimbus分配的任務(wù),啟動(dòng)和停止屬于自己管理的工作進(jìn)程。

3)Zookeeper,是Storm重點(diǎn)依賴的外部組件,提供Supervisor和Nimbus之間協(xié)調(diào)的服務(wù),Nimbus和Supervisor心跳和任務(wù)運(yùn)行情況都是保存在Zookeeper上。

Storm實(shí)現(xiàn)的數(shù)據(jù)流模型如圖8所示,包括:Topoloy(拓?fù)?,類似于Hadoop上的MapReduce任務(wù),數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)方向所組成的一個(gè)圖,且包含數(shù)據(jù)的處理邏輯;Tuple(消息元組),最小的消息處理和傳遞單元,每個(gè)Tuple都是不可變數(shù)組;Spout(噴嘴),從Storm外部接收數(shù)據(jù)轉(zhuǎn)為內(nèi)部的數(shù)據(jù)來源,并將原始數(shù)據(jù)轉(zhuǎn)為處Tuple;Bolt(螺栓),接收來自Spout或上一級(jí)的Bolt的Tuple,在其內(nèi)部作簡(jiǎn)單的數(shù)據(jù)轉(zhuǎn)換和計(jì)算,并產(chǎn)生多個(gè)輸出Tuple流,發(fā)送給其他的Bolt,協(xié)作完成復(fù)雜的計(jì)算邏輯。

如圖8所示,Storm會(huì)通過Spout將外部的流式數(shù)據(jù)讀入Topology中,將其轉(zhuǎn)為消息處理單元Tuple,且給每一個(gè)Tuple分配一個(gè)消息ID(Identity),開始消息的處理流程。再將Tuple輸出到Bolt中,對(duì)于復(fù)雜的數(shù)據(jù)處理過程,Storm會(huì)將其分解成若干個(gè)簡(jiǎn)單的處理邏輯,并根據(jù)特定的順序在不同的Bolt中進(jìn)行處理和流通,直到經(jīng)過最后一個(gè)Bolt的計(jì)算,此時(shí)才會(huì)將該消息ID標(biāo)記為處理完成。

3.2 Samza

Samza[21]是LinkedIn開源的一個(gè)分布式流處理系統(tǒng)。Samza具有一些非常優(yōu)秀的特性:通過簡(jiǎn)單的API可以非常方便地處理流式數(shù)據(jù);具有很好的容錯(cuò)性,能在用戶沒有感知到的情況下恢復(fù)處理失敗的任務(wù);任務(wù)狀態(tài)管理,出現(xiàn)故障時(shí),能快速準(zhǔn)確地恢復(fù)到失敗之前的狀態(tài)。

一個(gè)完整的Samza系統(tǒng)由三個(gè)組件構(gòu)成:①Kafka為Samza提供實(shí)時(shí)的消息數(shù)據(jù)來源,也可以作為Samza數(shù)據(jù)處理后的數(shù)據(jù)存儲(chǔ)系統(tǒng);②Samza進(jìn)行流式數(shù)據(jù)處理,用戶可以使用它提供的API簡(jiǎn)單方便的處理流式數(shù)據(jù),而不用關(guān)心處理過程及容錯(cuò)性等的管理;③Yarn[22]是Samza中的資源分配和任務(wù)管理系統(tǒng),客戶端(Client)提交任務(wù)時(shí)會(huì)向Yarn集群中的RM(資源管理器,Resource Manager)申請(qǐng)資源,RM以容器(Container)的形式將資源封裝起來,并在容器里執(zhí)行相應(yīng)的Samza計(jì)算任務(wù)。

圖9為Samza系統(tǒng)的流式數(shù)據(jù)處理模型。當(dāng)用戶提交一個(gè)任務(wù)時(shí),首先會(huì)向Yarn中的Resource Manager申請(qǐng)所需的資源。接著Yarn在Node Manager節(jié)點(diǎn)上啟動(dòng)容器,供Samza運(yùn)行相應(yīng)的任務(wù)。然后Samza進(jìn)程從Kafka中不同分區(qū)中實(shí)時(shí)拉取數(shù)據(jù),并進(jìn)行相應(yīng)的計(jì)算。最后將處理后的結(jié)果再次存入到Kafka進(jìn)入到下一輪的計(jì)算或者輸出到其他存儲(chǔ)系統(tǒng)。

圖8 Storm數(shù)據(jù)流模型

圖9 Samza流處理模型

3.3 Flink

Flink[23]起源于柏林理工大學(xué)的一個(gè)研究性項(xiàng)目,2014年被Apache孵化器所接受,并迅速地成為了ASF(Apache Software Foundation)的頂級(jí)項(xiàng)目之一。Flink是一個(gè)能同時(shí)適用于流數(shù)據(jù)和批處理的分布式處理引擎,其體現(xiàn)了一個(gè)最新的設(shè)計(jì)理念:數(shù)據(jù)處理應(yīng)該是流式的,批處理只是流處理的一個(gè)特例,也就是說所有的任務(wù)都可以當(dāng)成流來處理。這也是Flink跟其他流處理系統(tǒng)的最大區(qū)別。

圖10展示了Flink流計(jì)算的數(shù)據(jù)處理模型,在分布式數(shù)據(jù)處理系統(tǒng)中數(shù)據(jù)會(huì)在多個(gè)節(jié)點(diǎn)(Node)之間進(jìn)行傳輸,在不同節(jié)點(diǎn)之間的數(shù)據(jù)傳輸分為兩種情況:

1)流處理。對(duì)實(shí)時(shí)達(dá)到的數(shù)據(jù)流在一個(gè)節(jié)點(diǎn)上處理之后,會(huì)將處理后的結(jié)果緩存在當(dāng)前節(jié)點(diǎn)中,并立刻將數(shù)據(jù)傳輸給后續(xù)節(jié)點(diǎn),進(jìn)行下一步的處理,一直重復(fù)這個(gè)流程,直到得到最終結(jié)果。

2)批處理。當(dāng)前節(jié)點(diǎn)會(huì)把需要處理的所有數(shù)據(jù)逐條處理,序列化并緩存起來,但不會(huì)立刻將該處理后的結(jié)果發(fā)送給下一個(gè)節(jié)點(diǎn),當(dāng)緩存不足時(shí),會(huì)將數(shù)據(jù)持久化到磁盤,只有當(dāng)所有數(shù)據(jù)都被處理完成后,才會(huì)將處理后的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn)。

Flink通過設(shè)置緩存數(shù)據(jù)的超時(shí)時(shí)間,來同時(shí)應(yīng)對(duì)流處理和批處理系統(tǒng):若超時(shí)時(shí)間為0,則會(huì)執(zhí)行上述1)中的流程;若超時(shí)時(shí)間為無窮大,則執(zhí)行2)的數(shù)據(jù)處理流程。此外,還可以通過設(shè)置超時(shí)時(shí)間的長(zhǎng)短,來達(dá)到調(diào)節(jié)流處理延時(shí)的目的。

圖10 Flink數(shù)據(jù)處理模型

3.4 Spark Streaming

Spark Streaming是Spark[24]中用于流處理的一個(gè)組件。Spark是一個(gè)通用的并行計(jì)算框架,由加州伯克利大學(xué)(UCBerkeley)的AMP(Algorithms Machines People)實(shí)驗(yàn)室開發(fā),并于2010年開源,2013年成長(zhǎng)為Apache旗下為大數(shù)據(jù)領(lǐng)域最活躍的開源項(xiàng)目之一。Spark也是基于MapReduce模型實(shí)現(xiàn)的分布式計(jì)算框架,擁有Hadoop MapReduce所具有的優(yōu)點(diǎn),并且增加了很多優(yōu)秀的特性。

Spark同樣適用于批處理和流處理,其數(shù)據(jù)處理的實(shí)現(xiàn)都是基于彈性分布式數(shù)據(jù)集(Resident Distributed Dataset, RDD)[25]。RDD是其本質(zhì)是一個(gè)基于內(nèi)存的數(shù)據(jù)集,記錄了數(shù)據(jù)塊列表、數(shù)據(jù)塊上的數(shù)據(jù)如何轉(zhuǎn)化的函數(shù)、與父RDD之間的依賴關(guān)系(Lineage)、以及針對(duì)Key-Value類型數(shù)據(jù)的分區(qū)函數(shù)、數(shù)據(jù)的偏好位置(用于數(shù)據(jù)計(jì)算本地化)。Spark Streaming是Spark生態(tài)系統(tǒng)中用于處理流式數(shù)據(jù)的一個(gè)模塊,其本質(zhì)是將用戶設(shè)定的固定時(shí)間內(nèi)新接收的數(shù)據(jù)轉(zhuǎn)為一個(gè)RDD,進(jìn)而分成很多小段的批處理,此操作稱為離散流(Discretized Stream, DStream)[26]。一個(gè)DStream實(shí)質(zhì)是一個(gè)時(shí)間間隔很短的微批處理(micro-batching),所以,Spark Streaming的本質(zhì)是將所有的數(shù)據(jù)處理形式都當(dāng)作批處理對(duì)待。

圖11為Spark Streaming流式數(shù)據(jù)處理模型,在Spark Streaming中會(huì)把每個(gè)時(shí)間間隔內(nèi)新接收的數(shù)據(jù)存入到一個(gè)新的RDD中,然后對(duì)每一個(gè)RDD進(jìn)行相同的轉(zhuǎn)化操作(Transformation)和動(dòng)作(Action),且它能支持窗口(Window)操作,即將不同時(shí)刻得到的RDD的數(shù)據(jù)統(tǒng)一進(jìn)行操作,使之成為一個(gè)新的RDD,這樣就可以將新數(shù)據(jù)跟歷史數(shù)據(jù)相結(jié)合。

圖11 Spark Streaming數(shù)據(jù)處理模型

Spark Streaming流處理的理念與其他的流處理系統(tǒng)存在本質(zhì)區(qū)別,但是在很多的應(yīng)用場(chǎng)景中是可以容忍秒級(jí)別的延時(shí),且將流數(shù)據(jù)進(jìn)行微批處理在一定程度上能提高系統(tǒng)的吞吐量。此外,它在Spark的全棧式生態(tài)系統(tǒng)中,能很好地與批處理、Spark ML(Machine Language)、Spark Graph、Spark SQL(Structured Query Language)等相結(jié)合,解決數(shù)據(jù)的后續(xù)處理與分析問題,這是其他系統(tǒng)所不能比擬的。

3.5 分布式流式數(shù)據(jù)處理系統(tǒng)對(duì)比

表3中從不同角度比較Storm、Samza、Flink和Spark Streaming和特性。

表3 流式數(shù)據(jù)處理系統(tǒng)對(duì)比

在實(shí)時(shí)性方面,Spark Streaming由于采用微批處理的方式,所以延時(shí)最大,會(huì)存在秒級(jí)的延時(shí),而其他三個(gè)流處理系統(tǒng)都是次秒級(jí)的延時(shí)。

分布式系統(tǒng)都會(huì)重點(diǎn)考慮容錯(cuò)性,因此,這些分布式流處理系統(tǒng)都具有很好的容錯(cuò)性。

而在語言支持上,Storm支持C/C++、Python、基于JVM(Java Virtual Machine)等大多數(shù)編程語言,相對(duì)而言,其語言支持特性是最好的,而且其他的系統(tǒng)一般只支持Python和基于JVM的編程語言。

在狀態(tài)管理上,Storm最初是不支持狀態(tài)管理的,后來才提供高層抽象——Trident來支持狀態(tài)管理,而其他流處理系統(tǒng)都支持狀態(tài)管理。

數(shù)據(jù)處理語義方面,都能保證數(shù)據(jù)至少被處理一次,這種方式在特定場(chǎng)景下,會(huì)存在部分?jǐn)?shù)據(jù)被多次處理的情況,而Storm、Spark Streaming、Flink通過特定的配置,能達(dá)到數(shù)據(jù)剛好被處理一次的要求。

適用場(chǎng)景及生態(tài)系統(tǒng)完整性方面,Spark Streaming是最全面的,既支持批處理,又支持流處理,且還支持分布式圖計(jì)算、機(jī)器學(xué)習(xí)庫等高級(jí)功能;Flink緊跟Spark的步伐,也具備非常完善的生態(tài)系統(tǒng),Storm也有部分其他的功能支持,而Samza則只支持流處理,缺乏其他場(chǎng)景的應(yīng)用支持。

4 數(shù)據(jù)存儲(chǔ)技術(shù)

一方面,在一些場(chǎng)景中需要將海量的原始數(shù)據(jù)保存一段很長(zhǎng)的時(shí)間,供后續(xù)的數(shù)據(jù)分析及防止系統(tǒng)故障導(dǎo)致的數(shù)據(jù)丟失。另一方面,在流式數(shù)據(jù)處理系統(tǒng)中,原始數(shù)據(jù)被處理之后,部分的數(shù)據(jù)會(huì)被立刻交付給應(yīng)用系統(tǒng)加以應(yīng)用,但也有部分是需要共享或者長(zhǎng)期保存的,這就要求將處理后的結(jié)果存儲(chǔ)到可靠介質(zhì)中。

表4列出了四種常用的數(shù)據(jù)存儲(chǔ)方式,下面從不同方面進(jìn)行對(duì)比介紹:

HDFS(Hadoop Distributed File System),是谷歌GFS(Google File System)的開源實(shí)現(xiàn),是一個(gè)分布式的數(shù)據(jù)存儲(chǔ)系統(tǒng),支持大規(guī)模的數(shù)據(jù)存儲(chǔ),具有很好的容錯(cuò)性,其存儲(chǔ)能力隨著集群數(shù)量的增加呈線性增長(zhǎng),其具備很高的吞吐量,但是不適合低延遲數(shù)據(jù)訪問,無法高效存儲(chǔ)大量小文件,不支持多用戶寫入及任意修改文件。HDFS作為Hadoop生態(tài)系統(tǒng)中的主要存儲(chǔ)系統(tǒng),在實(shí)時(shí)性要求不是很高的情況下,已經(jīng)成為很多公司的首選存儲(chǔ)方案;

HBase[27],作為一個(gè)分布式的、面向列存儲(chǔ)的開源NoSQl數(shù)據(jù)庫,其理論基礎(chǔ)來源于谷歌的BigTable[28],支持上百萬列的大表,其數(shù)據(jù)最終存儲(chǔ)在HDFS中;但是,它克服了HDFS實(shí)時(shí)性和隨機(jī)讀寫的缺陷,可以支持?jǐn)?shù)據(jù)的隨機(jī)讀寫、實(shí)時(shí)訪問,從而彌補(bǔ)了Hadoop生態(tài)系統(tǒng)中實(shí)時(shí)數(shù)據(jù)讀寫的空白。在CAP定理[29]中,HBase選擇了CP,即:C(Consistency,一致性)和P(Partition tolerance,分區(qū)容錯(cuò)性),因此HBase在可用性上稍有欠缺,需要結(jié)合Zookeeper來完善其高可用性;

Cassandra[30],最初由Facebook開發(fā),非常適合于社交網(wǎng)絡(luò)的數(shù)據(jù)存儲(chǔ),在亞馬遜分布式引擎——Dynamo[31]的基礎(chǔ)上,結(jié)合BigTable的列族(Column Family)數(shù)據(jù)模型,并采用P2P(Peer to Peer)去中心化節(jié)點(diǎn)管理方式,側(cè)重于CAP理論中的AP:A(Availability,可用性)和P(Partition tolerance,分區(qū)容錯(cuò)性),而采用最終一致性。支持多數(shù)據(jù)中心的數(shù)據(jù)復(fù)制,并提供類SQL語言——CQL(Cassandra Query Language)的支持。

Redis[32],是一款基于內(nèi)存的key-value存儲(chǔ)系統(tǒng)。由于基于內(nèi)存存儲(chǔ),其具有很高的吞吐量,同時(shí)也支持將數(shù)據(jù)持久化到磁盤,提供強(qiáng)大的數(shù)據(jù)類型支持,包括lists、sets、ordered sets以及hashes等。此外,Redis中所有操作都是原子性的,Redis 3.0[33]以后提供了Cluster(集群)支持,使得其擴(kuò)展性大幅度增強(qiáng),但是其數(shù)據(jù)存儲(chǔ)容量比其他分布式數(shù)據(jù)庫系統(tǒng)略小。

表4中的四種常用數(shù)據(jù)存儲(chǔ)技術(shù)分別適用于不同的大數(shù)據(jù)應(yīng)用場(chǎng)景,延時(shí)大小、擴(kuò)展性、容錯(cuò)性、高可用性等方面都是大數(shù)據(jù)環(huán)境下需要考慮的關(guān)鍵因素,沒有最好,只有更適合業(yè)務(wù)場(chǎng)景的解決方案。

表4 四種常用的數(shù)據(jù)存儲(chǔ)方式對(duì)比

5 分布式數(shù)據(jù)流的DDoS攻擊檢測(cè)

為了更好地理解分布式數(shù)據(jù)流處理系統(tǒng)組成,本章介紹一種大數(shù)據(jù)環(huán)境下的分布式拒絕服務(wù)(Distributed Denial of Service, DDoS)攻擊檢測(cè)數(shù)據(jù)流處理系統(tǒng),其結(jié)構(gòu)如圖12所示。整個(gè)系統(tǒng)包括:數(shù)據(jù)收集、數(shù)據(jù)分析、數(shù)據(jù)存儲(chǔ)、模型(或算法)訓(xùn)練、入侵檢測(cè)。

圖12 大數(shù)據(jù)環(huán)境下的DDoS攻擊檢測(cè)數(shù)據(jù)流處理系統(tǒng)

系統(tǒng)的數(shù)據(jù)流向?yàn)椋?)數(shù)據(jù)的來源為不同的服務(wù)器,通過各種抓包軟件,如:TcpDump、NetFlow、Sniff等,對(duì)特定的網(wǎng)卡或端口進(jìn)行數(shù)據(jù)包抓取,并通過Flume將不同服務(wù)器上的網(wǎng)絡(luò)數(shù)據(jù)匯總,將數(shù)據(jù)抓、分析和檢測(cè)分離,減輕應(yīng)用服務(wù)器的負(fù)擔(dān)。2)數(shù)據(jù)匯聚之后,將所抓取的網(wǎng)絡(luò)數(shù)據(jù)作為Kafka Producer的消息源,并傳送到Kafka Broker,讓Broker對(duì)所有網(wǎng)絡(luò)數(shù)據(jù)進(jìn)行有序的管理。3)Spark Streaming則實(shí)時(shí)從Kafka Broker中拉取數(shù)據(jù),再將數(shù)據(jù)分散到不同的Spark Executor進(jìn)行分析和統(tǒng)計(jì)。4)Spark將抓取的網(wǎng)絡(luò)數(shù)據(jù)處理后,一方面可以將結(jié)果傳給其他的應(yīng)用,作進(jìn)一步的分析;另一方面可以將結(jié)果持久化,存儲(chǔ)在數(shù)據(jù)庫(HDFS或其他數(shù)據(jù)庫)中,供后續(xù)分析使用。5)對(duì)得到的實(shí)時(shí)數(shù)據(jù),可以使之與之前得到的歷史數(shù)據(jù)進(jìn)行合并進(jìn)行模型(或算法)訓(xùn)練或者直接通過模型進(jìn)行DDoS檢測(cè),并得到檢測(cè)結(jié)果。整個(gè)系統(tǒng)數(shù)據(jù)搜集使用Flume、消息隊(duì)列管理使用Kafka、數(shù)據(jù)實(shí)時(shí)分析使用Spark Streaming、數(shù)據(jù)存儲(chǔ)使用HDFS或其他數(shù)據(jù)庫,具有好的擴(kuò)展性、容錯(cuò)性和實(shí)時(shí)處理能力,能充分滿足大數(shù)據(jù)環(huán)境下的各種DDoS攻擊檢測(cè)需求。

6 結(jié)語

本文主要研究了組成大數(shù)據(jù)環(huán)境下分布式數(shù)據(jù)流處理系統(tǒng)的各個(gè)子系統(tǒng),包括數(shù)據(jù)收集子系統(tǒng)、消息隊(duì)列管理子系統(tǒng)、流式數(shù)據(jù)處理子系統(tǒng)和數(shù)據(jù)存儲(chǔ)子系統(tǒng),詳細(xì)介紹了四類子系統(tǒng)中涉及的相關(guān)技術(shù),并從不同的應(yīng)用角度進(jìn)行了比較,本文的研究?jī)?nèi)容能為大數(shù)據(jù)環(huán)境下的數(shù)據(jù)流處理的理論研究和應(yīng)用系統(tǒng)開發(fā)提供參考,有一定的理論和應(yīng)用價(jià)值。

References)

[1] 國(guó)家圖書館研究院.CNNIC發(fā)布第37次《中國(guó)互聯(lián)網(wǎng)絡(luò)發(fā)展?fàn)顩r統(tǒng)計(jì)報(bào)告》[J].國(guó)家圖書館學(xué)刊,2016(2):20.(The Research Institute of the National Library.The 37th China Internet network development state statistic report issued by CNNIC [J]. Journal of the National Library of China, 2016(2):20.)

[2] IDC. The digital universe of opportunities: rich data and the increasing value of the Internet of things [EB/OL]. [2014- 04- 15]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm.

[3] 戴永濤.分布式流處理系統(tǒng)研究與應(yīng)用[D].上海:上海海事大學(xué),2016:1-40.(DAI Y T. Research and application of distributed streaming system [D]. Shanghai: Shanghai Maritime University, 2016:1-40.)

[4] 趙勇.架構(gòu)大數(shù)據(jù)——大數(shù)據(jù)技術(shù)及算法解析[M].北京:電子工業(yè)出版社,2015:394-410.(ZHAO Y. Big Data Structure—The Technology and Algorithm Analysis of Big Data [M]. Beijing: Publishing House of Electronics Industry, 2015: 394-410.)

[5] 孟小峰,慈祥.大數(shù)據(jù)管理:概念、技術(shù)與挑戰(zhàn)[J].計(jì)算機(jī)研究與發(fā)展,2013,50(1):146-169.(MENG X F, CI X. Big data management: concepts, technology and challenges [J]. Journal of Computer Research and Development, 2013, 50(1): 146-169.)

[6] 孫大為,張廣艷,鄭緯民.大數(shù)據(jù)流式計(jì)算:關(guān)鍵技術(shù)及系統(tǒng)實(shí)例[J].軟件學(xué)報(bào),2014,25(4):839-862.(SUN D W, ZHANG G Y, ZHENG W M. Big data flow calculation: the key technology and system instance[J]. Journal of Software, 2014, 25(4): 839-862.)

[7] MICHAEL K, MILLER K W. Big data: new opportunities and new challenges [J]. Computer, 2013, 46(6): 22-24.

[8] Facebook. Scribe Wiki [EB/OL]. [2015- 02- 03]. https://github.com/facebookarchive/scribe/wiki.

[9] Apache Software Foundation. Apache Thrift [EB/OL].[2016- 05- 01]. http://thrift.apache.org/.

[10] Apache Software Foundation. Apache Flume [EB/OL]. [2016- 04- 09]. http://flume.apache.org/.

[11] Apache Software Foundation. Apache Chukwa [EB/OL]. [2016- 04- 05]. http://chukwa.apache.org/.

[12] Elasticsearch. Elasticsearch Logstash[EB/OL]. [2016- 04- 11]. https://www.elastic.co/products/logstash.

[13] Pivotal Software. RabbitMQ [EB/OL]. [2016- 04- 11]. http://www.rabbitmq.com/.

[14] Erlang. Erlang introduction [EB/OL]. [2016- 04- 11]. http://www.erlang.org/.

[15] HINTJENS P. ZeroMQ : Messaging for Many Applications [M]. Sebastopol: O’Reilly Media, 2013.

[16] SCHMIDT D C. Reactor: an object behavioral pattern for demultiplexing and dispatching handles for synchronous events [J]. Compilers Principles Techniques & Tools, 1999, 261(2): 201-208.

[17] Apache Software Foundation. Kafka [EB/OL]. [2016- 04- 11]. http://kafka.apache.org/.

[18] Apache Software Foundation. Apache Zookeeper [EB/OL]. [2016- 04- 11]. http://zookeeper.apache.org/.

[19] PALANIAPPAN S K, NAGARAJA P B. Efficient data transfer through zero copy [EB/OL]. [2016- 03- 13]. https://www.ibm.com/developerworks/linux/library/j-zerocopy/j-zerocopy-pdf.pdf.

[20] Apache Software Foundation. Apache Storm [EB/OL]. [2016- 04- 07]. http://storm.apache.org/.

[21] Apache Software Foundation. Apache Samza [EB/OL]. [2016- 04- 08]. http://samza.apache.org/.

[22] MURTHY A. Apache Hadoop YARN—background and an overview [EB/OL]. [2016- 04- 08]. http://hortonworks.com/blog/apache-hadoop-yarn-background-and-an-overview/.

[23] Apache Software Foundation. Apache Flink [EB/OL]. [2016- 04- 08]. http://flink.apache.org/.

[24] Apache Software Foundation. Apache Spark [EB/OL]. [2016- 04- 08]. http://spark.apache.org/.

[25] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing [C]// Proceedings of the 9th Usenix Conference on Networked Systems Design and Implementation. Berkely: USENIX Association, 2012: 2.

[26] ZAHARIA M, DAS T, LI H, et al. Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters [C]// Proceedings of the 4th USENIX Conference on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2012: 10.

[27] Apache Software Foundation. Apache Hbase [EB/OL]. [2016- 04- 06]. http://hbase.apache.org/.

[28] CHANG F, DEAN J, GHEMAWAT S, et al. Bigtable: a distributed storage system for structured data [C]// Proceedings of the 7th USENIX Symposium on Operating Systems Design and Implementation. Berkeley, CA: USENIX Association, 2006: 15.

[29] 百度百科.CAP原則 [EB/OL]. [2016- 03- 18]. http://baike.baidu.com/link?url=i-7VhglR7AO5k63IHraug1jk0t6LE03jVJsKJQiU L4VU22oNGDa3u2vr_PT8m27-b4ZG_vpxtY7laL8jHNzq9q.(Baidu Encyclopedia. CAP principle [EB/OL]. [2016- 03- 18]. http://baike.baidu.com/link?url=i-7VhglR7AO5k63IHraug1jk0t6LE03jVJsKJQiU L4VU22oNGDa3u2vr_PT8m27-b4ZG_vpxtY7laL8jHNzq9q.)

[30] LAKSHMAN A, MALIK P. Cassandra: a decentralized structured storage system [J]. ACM SIGOPS Operating Systems Review, 2010, 44(2): 35-40.

[31] DECANDIA G, HASTORUN D, JAMPANI M, et al. Dynamo: amazon’s highly available key-value store [J]. ACM SIGOPS Operating Systems Review, 2007, 41(6): 205-220.

[32] Redis Labs. Redis [EB/OL]. [2016- 04- 11]. http://redis.io/.

[33] Redis Labs. Redis cluster tutorial [EB/OL]. [2016- 04- 11]. http://redis.io/topics/cluster-tutorial.

This work is partially supported by the National Natural Science Foundation of China (61672338, 61373028).

CHEN Fumei, born in 1989,M.S. candidate, Her research interests include cloud computing, big data real-time analysis.

HAN Dezhi, born in 1966, Ph.D., professor. His research interests include cloud computing, cloud storage and security technologies, big data application technology.

BI Kun, born in 1981, Ph.D., lecturer, His research interests include cloud computing, cloud storage, big data application technology.

DAI Yongtao, born in 1991, M.S. candidate, His research interests include cloud computing, distributed computing, data mining, network security technology.

Key technologies of distributed data stream processing based on big data

CHEN Fumei, HAN Dezhi*, BI Kun, DAI Yongtao

(CollegeofInformationEngineering,ShanghaiMaritimeUniversity,Shanghai201306,China)

In the big data environment, the real-time processing requirement of data stream is high, and data calculations require persistence and high reliability. Distributed Data Stream Processing System (DDSPS) can solve the problem of data stream processing in big data environment. Besides, it has the advantages of scalability and fault-tolerance of distributed system, and also has high real-time processing capability. Four subsystems and their key technologies of the DDSPS based on big data were introduced in detail. The different technical schemes of each subsystem were discussed and compared. At the same time, an example of data stream processing system structure to detect Distributed Denial of Service (DDoS) attacks was introduced, which can provide the technical reference for data stream processing theory research and application technology development under big data environment.

big data; stream processing; message queue; data processing; data storage

2016- 09- 20;

2016- 10- 18。

國(guó)家自然科學(xué)基金資助項(xiàng)目(61373028, 61672338)。

陳付梅(1989—),女,山東臨沂人,碩士研究生,主要研究方向:云計(jì)算、大數(shù)據(jù)實(shí)時(shí)分析; 韓德志(1966—),男,河南信陽人,教授,博士, CCF高級(jí)會(huì)員,主要研究方向:云計(jì)算、云存儲(chǔ)及其安全技術(shù)、大數(shù)據(jù)應(yīng)用技術(shù); 畢坤(1981—),男,山東青島人,講師,博士,主要研究方向:云計(jì)算、云存儲(chǔ)、大數(shù)據(jù)應(yīng)用技術(shù); 戴永濤(1991—),男,湖南邵陽人,碩士研究生,主要研究方向:云計(jì)算、分布式計(jì)算、數(shù)據(jù)挖掘、網(wǎng)絡(luò)安全技術(shù)。

1001- 9081(2017)03- 0620- 08

10.11772/j.issn.1001- 9081.2017.03.620

TP391; TP311.13

A

猜你喜歡
流式數(shù)據(jù)流隊(duì)列
常熟開關(guān)新品來襲!CSX3系列電氣防火限流式保護(hù)器
流式大數(shù)據(jù)數(shù)據(jù)清洗系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)
汽車維修數(shù)據(jù)流基礎(chǔ)(上)
隊(duì)列隊(duì)形體育教案
汽車維修數(shù)據(jù)流基礎(chǔ)(下)
隊(duì)列里的小秘密
基于XML的數(shù)據(jù)流轉(zhuǎn)換在民航離港系統(tǒng)中應(yīng)用
基于多隊(duì)列切換的SDN擁塞控制*
一種汽車空調(diào)平行流式冷凝器側(cè)進(jìn)風(fēng)性能研究
在隊(duì)列里
利津县| 余江县| 建水县| 治多县| 台东市| 阿坝| 盈江县| 寿宁县| 洪洞县| 忻州市| 衡阳县| 稷山县| 东阳市| 石棉县| 大埔区| 定远县| 古蔺县| 延寿县| 宝应县| 福鼎市| 石狮市| 衢州市| 卓资县| 黄大仙区| 本溪| 元阳县| 汨罗市| 汤原县| 金塔县| 柞水县| 通城县| 鸡东县| 龙里县| 饶河县| 隆回县| 额敏县| 苍南县| 郸城县| 当阳市| 高唐县| 江陵县|