周煜敏 王 鵬 汪 衛(wèi)
(復(fù)旦大學(xué)計算機科學(xué)技術(shù)學(xué)院 上海 200433)
隨著云時代的到來,大數(shù)據(jù)和云計算已經(jīng)吸引了越來越多的業(yè)內(nèi)外人士的關(guān)注。在諸如金融、電信和大規(guī)模傳感器監(jiān)控等許多領(lǐng)域中,在線處理實時數(shù)據(jù),也稱為數(shù)據(jù)流處理也得到了越來越廣泛的應(yīng)用。數(shù)據(jù)流處理應(yīng)用程序通常需要低延遲、快速處理和實時反饋。
在物聯(lián)網(wǎng)的場景下,一套完整的系統(tǒng)由數(shù)千個傳感器節(jié)點的分布式集合組成,每個傳感器節(jié)點能夠從環(huán)境中感測多種類型的信息并以特定頻率發(fā)送數(shù)據(jù)。這就需要非常強大的分布式處理能力來滿足計算的需求。
在Storm、Spark以及Flink等實時大數(shù)據(jù)計算引擎不斷發(fā)展的同時,傳感器的分析專家則難以掌握這些計算引擎的處理原語。因此需要用一套簡單的自定義的語言支持,幫助分析專家能夠簡單地構(gòu)建計算邏輯,這樣才能夠最快速地為這些專家提供可靠的數(shù)據(jù)結(jié)果。
本文在深入分析了傳感器計算的需求后,發(fā)現(xiàn)絕大部分的計算都是基于傳感器數(shù)據(jù)的數(shù)值計算,而且都是基于滑動窗口的計算,將高頻的數(shù)據(jù)的統(tǒng)計量以秒、分鐘至小時級別的頻率計算,加以輔助的閾值設(shè)定和四則運算最終呈現(xiàn)結(jié)果。
本文試圖設(shè)計一套輕量但有效的方案,以支持在大量傳感器上運行的多個查詢計算。由于處理是實時的,傳統(tǒng)的靜態(tài)數(shù)據(jù)平臺和算法不適合傳感器的流數(shù)據(jù)。同時,由于數(shù)據(jù)的高頻率,還應(yīng)該滿足處理的高吞吐量以對應(yīng)數(shù)據(jù)的攝入速率。對于傳感器監(jiān)測分析師,他們可以將計算需求轉(zhuǎn)化為腳本,系統(tǒng)會自動解析并轉(zhuǎn)化Storm的流處理程序,將高頻數(shù)據(jù)處理之后再源源不斷地產(chǎn)生處理結(jié)果并及時反饋給分析師。
該系統(tǒng)還會針對優(yōu)化處理在計算的過程中出現(xiàn)的重復(fù)計算,衡量所涉及的通信和計算成本并通過中間結(jié)果共享的分區(qū)算法來減少網(wǎng)絡(luò)通信已達(dá)到更好的性能。
實時計算需要一個合適的分布式平臺輔助運行,社區(qū)已經(jīng)開發(fā)了許多系統(tǒng),例如早期的Aurora[1]、S4以及Storm[2]和Spark Streaming[3]等來處理秒級甚至毫秒級響應(yīng)中的大量數(shù)據(jù)。Storm具有高度可擴展性,易于使用,并且具有低延遲和有保證的數(shù)據(jù)處理能力。同時,Storm提出了拓?fù)?Topology)的計算概念[4],相比于傳統(tǒng)大數(shù)據(jù)引擎Hadoop的MapReduce更加靈活并且適合實時場景。這些特性都非常契合物聯(lián)網(wǎng)數(shù)據(jù)處理應(yīng)用的需求。與Storm不同,S4等系統(tǒng)無法保證每個元組都會被處理。而Spark Streaming則提出了一個新的模型,使用微批處理的方式用來近似進(jìn)行分布式流處理[5],但其延遲對于實時響應(yīng)來說很高,無法滿足實際的應(yīng)用需求。
為了在大數(shù)據(jù)流上進(jìn)行計算,已經(jīng)提出了許多支持復(fù)雜事件處理(CEP)的語言,包括SQL-TS[6]、Cayuga[7]等。雖然他們設(shè)計了不同的語法規(guī)則,但某些語言不適合物聯(lián)網(wǎng)大數(shù)據(jù)的應(yīng)用程序場景。在過去的研究中,也有相關(guān)的研究人員設(shè)計了一套實時處理應(yīng)用輔助開發(fā)框架以簡化開發(fā)人員工作[9]。而本文的工作專注于數(shù)據(jù)流上的聚合和滑動窗口計算,而上述語言和系統(tǒng)設(shè)計初衷是處理更復(fù)雜的流處理作業(yè),因此在分布式集群上并沒有良好的兼容性,使用這些語言會在解析子句和生成作業(yè)時會產(chǎn)生額外的開銷。
為了實現(xiàn)高吞吐量,應(yīng)該充分利用分布式集群。對性能的關(guān)注需要在有限的資源下完成工作。許多先前的研究已經(jīng)解決了在Storm上開發(fā)的一部分優(yōu)化問題。TMSH-Storm[8]有效降低了Storm的處理延遲和通信開銷,然而在多查詢的環(huán)境下,該方法的優(yōu)勢則并不明顯。
之前的許多文獻(xiàn)都討論了物聯(lián)網(wǎng)環(huán)境下傳感器的實時處理算法,例如:基于微簇的橋梁監(jiān)測數(shù)據(jù)流異常識別算法[10],主要利用主成分分析提取特征,優(yōu)化異常檢測的計算;基于復(fù)雜事件處理的用戶需求響應(yīng)性能實時監(jiān)測分析,主要在復(fù)雜事件處理上實現(xiàn)了R算法的內(nèi)嵌和可視化的仿真[11]。這些文獻(xiàn)著重解決靜態(tài)數(shù)據(jù)的上下文的處理優(yōu)化而并非流處理。此外,還有一些文獻(xiàn)作了滑動窗口相關(guān)的優(yōu)化,例如:具有不同長度和不同選擇謂詞的滑動窗口上的聚合的多查詢優(yōu)化[12],然而沒有利用先驗知識達(dá)到更好的效果。
實時查詢通常適用于無界數(shù)據(jù)流,而不是靜態(tài)數(shù)據(jù)集??紤]到內(nèi)存限制,有必要設(shè)計用于維護流歷史的摘要或概要的技術(shù)。對于大多數(shù)應(yīng)用程序,數(shù)據(jù)流的最新元素比舊的元素更重要。因此這種對最近數(shù)據(jù)的偏好產(chǎn)生了實時流數(shù)據(jù)上的滑動窗口的表達(dá)形式。窗口的大小和滑動間隔通常使用時間間隔(基于時間)或元組數(shù)來指定(基于元組)。
在本文中滑動窗口使用時間間隔標(biāo)準(zhǔn),時間滑動窗口W定義如下。
定義1將時間長度為Lms,每次滑動時間長度為Sms定義為滑動窗口,記作W(L,S)。
W(L,S)所對應(yīng)的時間窗口如圖1所示。
圖1 時間滑動窗口示意圖
當(dāng)需要計算W(L,S)中的數(shù)據(jù)時,就需要在內(nèi)存中存儲時間長度為L的數(shù)據(jù)以供計算,并且每Sms就需要給出當(dāng)前窗口的結(jié)果反饋。
首先介紹一下流數(shù)據(jù)和查詢的基本數(shù)據(jù)結(jié)構(gòu)。每個傳感器都有一個唯一的標(biāo)識號。傳入數(shù)據(jù)流采用(sensor_id,timestamp,data)的數(shù)據(jù)格式。它指的是在timestamp時間戳?xí)r,sensor_id對應(yīng)傳感器的值為data。
Timestamp的類型為long,使用的是Unix時間戳。
Data的類型包括布爾類型、整數(shù)數(shù)據(jù)和實數(shù)數(shù)據(jù)。系統(tǒng)會將其統(tǒng)一轉(zhuǎn)化為單精度浮點類型進(jìn)行存儲和計算。
每個傳感器將以靜態(tài)頻率發(fā)送該結(jié)構(gòu)的元組。sensor_id與其頻率之間的關(guān)系存儲在數(shù)據(jù)庫中,在優(yōu)化計算時該信息會被使用。
一位分析師可以對傳感器啟動一系列查詢,并添加一些進(jìn)一步的計算以獲得檢查所需的最終結(jié)果。這里將查詢組稱為工作流。不同的分析師希望監(jiān)控不同傳感器上的不同參數(shù),從而導(dǎo)致許多工作流一起執(zhí)行。為了消除歧義,應(yīng)該為工作流提供明確的定義,并為分析師提供語言標(biāo)準(zhǔn),為此本文設(shè)計了一套腳本語言。
本文將用戶的查詢劃分為以下3類:
(1) 滑動窗口聚合功能;
(2) 流聯(lián)合函數(shù);
(3) 基本算術(shù)計算支持。
對于第一類,我們提供四種基本聚合功能:滑動窗口的最大值、最小值、平均值、求和運算。這4種函數(shù)每一種都需要3個參數(shù),包括輸入的sensor_id,窗口長度L和滑動間隔S。計算所產(chǎn)生的結(jié)果可以計為用戶定義的新的流。例如:
A1=avg("S1",1 000,1 000)
該語句即代表了傳感器“S1”在1秒鐘的滑動窗口上的平均值,計算的結(jié)果成為一個新的流,并命名為A1。
對于第二類,本文為流提供聯(lián)合函數(shù)union,其中的每個參數(shù)都是一個不同的流id,可以使用原始數(shù)據(jù)的sensor_id或者是另一個用戶定義的流id。該函數(shù)還將生成一個帶有新用戶定義id的連接流。
對于第三類,本文提供4個算術(shù)運算(加,減,乘,除)和4個聚合函數(shù)(最大值,最小值,平均值,求和運算),接收不同的流id作為運算參數(shù),并持續(xù)計算結(jié)果輸出。與第一類計算不同,這些運算符給出結(jié)果的時間發(fā)生在任何輸入?yún)?shù)值發(fā)生變化的時候,而不是等待窗口時間之后更新其結(jié)果。
以下腳本是工作流的另一個示例。這意味著首先計算兩個傳感器的平均值,并在一個10分鐘的滑動窗口中以5分鐘的滑動間隔計算兩個傳感器的連接流,然后在同一時間計算三個輸出流的最大值和平均值窗口。
MD1Z=avg("8MD1-AZ",600 000,300 000);
UNI=union("8MD2-A","8MD3-A");
MD23=avg("UNI",600 000,300 000);
MD4Z=avg("8MD4-AZ",600 000,300 000);
UNIF=union("MD1Z","MD23","MD4Z");
out_MZ=max("UNIF",600 000,300 000);
out_AZ=avg("UNIF",600 000,300 000)。
本節(jié)主要講述該實時計算平臺的系統(tǒng)架構(gòu),如圖2所示。系統(tǒng)內(nèi)部主要分為三大模塊:腳本解析模塊、實時計算代碼生成模塊和分布式實時計算模塊。
圖2 計算平臺示意圖
腳本解析模塊負(fù)責(zé)解析腳本語言,提取出關(guān)鍵信息供后續(xù)邏輯搭建,為后續(xù)模塊的查詢優(yōu)化提供信息。
當(dāng)收到所有的腳本時,這些腳本首先通過語法分析模塊生成抽象語法樹,然后再通過腳本所攜帶的額外信息通過計算圖生成模塊進(jìn)一步生成計算圖。為了使分析計算組組之間的時間序列計算的計算能夠共享,需要將計算組中的每一個語句,也就是每個計算,當(dāng)成一個節(jié)點看待,而運算符和與該運算所用到的底層所有其他運算之間形成有向邊,對于用戶所給出的計算組進(jìn)行一個有向圖狀的描述,從而對于相同的操作可以進(jìn)行有效的合并。
本模塊通過ANTLR的解析,能夠識別出對于相同傳感器的聚集操作,對于其中不同的窗口進(jìn)行最大公約數(shù)的合并計算,以最大限度地節(jié)省不必要的計算。由于減少了一些重復(fù)互相有交集的時間片段數(shù)據(jù)存儲,因此在橋梁傳感器網(wǎng)絡(luò)監(jiān)測的高頻率數(shù)據(jù)流的應(yīng)用情景下,合并切分窗口來進(jìn)行分析計算會節(jié)省不少內(nèi)存消耗。
這些信息將通過信息收集模塊將代碼所需信息存儲起來,以便后續(xù)使用。
在1.4節(jié)中描述的腳本語言簡易的語法使得以前需要使用成千上萬行的Storm代碼才能完成的查詢邏輯,只需要幾十條語句便可以完成。當(dāng)用戶遇到查詢需求變更的時候,用戶只需要把簡短的幾十條語句作輕微的修改,然后由實時計算代碼生成模塊重新生成可以執(zhí)行的Storm代碼,進(jìn)行部署和運行。
實時計算代碼生成模塊通過Java反射機制,根據(jù)用戶的腳本需求,將計算圖進(jìn)一步轉(zhuǎn)化為Storm的Bolt具體的處理代碼。
上一模塊產(chǎn)生的計算圖的結(jié)果,分別經(jīng)過代碼生成模塊和優(yōu)化分區(qū)結(jié)果生成模塊的解析、相應(yīng)的源代碼和分區(qū)結(jié)果。
其中代碼生成模塊主要運用Java語言的反射機制,將計算圖的邏輯轉(zhuǎn)換成相對應(yīng)的Java函數(shù),并配置對應(yīng)的參數(shù)。而每一種函數(shù)都對應(yīng)一段具體的Storm原語的計算邏輯。
對于優(yōu)化分區(qū)結(jié)果生成模塊而言,由于多個腳本之間存在重復(fù)的查詢語句,因此代碼生成模塊中還包含了查詢共享發(fā)現(xiàn)模塊。該模塊負(fù)責(zé)把腳本中存在的重復(fù)查詢語句組進(jìn)行合并去重,減少數(shù)據(jù)流元組在網(wǎng)絡(luò)中的重復(fù)傳輸以及在集群中的重復(fù)計算。本文采用的查詢共享模塊的算法架構(gòu)和具體實現(xiàn)如下:
對于一個分布式集群,如果所有的計算能夠相對平均的分配到每一個計算節(jié)點上去,那么集群的計算能力就能夠得到最大程度的發(fā)揮,計算的吞吐量也得以提升。而實際情況中,在一個工作流中通常存在對于相同的流的計算的情況,如果這些計算能夠分區(qū)在一起將會共享計算結(jié)果,減少重復(fù)的計算,從而獲得更好的性能。同時,如果兩個不同的工作流程共享同一個傳感器計算或甚至相同的窗口聚集計算,那將這兩個工作流程合并在一起也能降低通信成本,從而提高性能。
基于以上想法,采用啟發(fā)式算法進(jìn)行分區(qū)優(yōu)化見算法1。
算法1分區(qū)優(yōu)化算法
輸入:分布式工作節(jié)點個數(shù)n,計算圖G,數(shù)據(jù)庫中存儲的傳感器數(shù)據(jù)頻率Freq[]
輸出:每個傳感器的分區(qū)結(jié)果Map:Partition
for i :=1 to n
W[i] :=0
//工作節(jié)點負(fù)載
foreach G的子圖G’
load[G’] := 0
foreach G’中所有傳感器sensor_id
load[G’] :=load[G’]+freq[sensor_id]
Arrays.sort(load)
foreach G的子圖G’(按load從大到小)
target :=W數(shù)組中最小值下標(biāo)
foreach G’中所有傳感器sensor_id
Partition.put(sensor_id,target)
算法首先將每一個結(jié)點的計算復(fù)雜度作為該節(jié)點的權(quán)重,然后以子圖的粒度進(jìn)行權(quán)重的計算。獲取劃分算法之后,通過加權(quán)輪詢算法判斷子圖和分區(qū)的對應(yīng)關(guān)系,以達(dá)到負(fù)載均衡。
分布式實時計算通過分布式Storm集群實現(xiàn),其Storm的拓?fù)浣Y(jié)構(gòu)如圖3所示。
圖3 Storm計算拓?fù)涫疽鈭D
在Storm拓?fù)渲?,?shù)據(jù)源模塊將持續(xù)發(fā)送原始傳感器數(shù)據(jù)的元組,在計算之前還需要一層Filter Bolt以過濾無關(guān)傳感器的流式數(shù)據(jù)。在查詢組中所設(shè)計到的要處理的數(shù)據(jù)種類是有限的,正如橋梁檢測系統(tǒng)中的原始數(shù)據(jù)通道可能有幾千個,而查詢組中涉及到的通道卻有可能只有非常少量的部分。因此本文增加了這一層過濾模塊,將不必要的傳感器數(shù)據(jù)從系統(tǒng)中過濾出去,有效減少了整個分布式系統(tǒng)的負(fù)載和計算壓力。
數(shù)據(jù)流中的元組從一個組件發(fā)往另外一個組件需要指定發(fā)送的分組方式,默認(rèn)的隨機分組并不能有效地解決大規(guī)模傳感器場景下數(shù)據(jù)不均衡所帶來的性能壓力。2.2節(jié)所述的分區(qū)優(yōu)化算法幫助系統(tǒng)產(chǎn)生了負(fù)載更均衡的數(shù)據(jù)分區(qū)對應(yīng)關(guān)系。我們利用了Storm系統(tǒng)提供的CustomGrouping API,將優(yōu)化算法輸出的Map
Calc Bolt接收代碼生成模塊傳遞的參數(shù)和代碼,能夠保證運算嚴(yán)格按照用戶腳本所定義的窗口運行。而上游的優(yōu)化分組策略能夠進(jìn)一步降低網(wǎng)絡(luò)傳輸?shù)拇鷥r,提升整個系統(tǒng)的計算效率。
Result Bolt和Calc Bolt的原理很類似,它會接收Calc Bolt的計算結(jié)果,以相對較低的負(fù)載完成上層結(jié)果的計算并在命令行進(jìn)行輸出。不同的是,Result Bolt的計算在接收數(shù)據(jù)的瞬間觸發(fā)而非等待窗口到達(dá)。
本系統(tǒng)結(jié)合物聯(lián)網(wǎng)傳感器計算的窗口聚合計算占比較大,計算同質(zhì)性較大等特點,完成了一個基于匹配的查詢優(yōu)化算法,使得對于海量流式數(shù)據(jù)在分布式系統(tǒng)中的處理更加平衡,從而節(jié)約資源,提高查詢效率和性能。
實驗環(huán)境使用由1個Master和4個Slave組成的Storm集群。每個節(jié)點都有64 GB內(nèi)存,6×2.0 GHz(Intel Xeon E5-2620)CPU和6 TB磁盤空間。所有節(jié)點都通過1 GB以太網(wǎng)連接。
實驗使用真實的上海市的大橋傳感器數(shù)據(jù),傳感器種類達(dá)21種,總數(shù)量達(dá)到了1 000,其中大多數(shù)傳感器的數(shù)據(jù)傳輸速率達(dá)到了20 Hz以上,每秒鐘傳輸?shù)臄?shù)據(jù)量約30 MB。數(shù)據(jù)通過傳感器采集系統(tǒng)的加密socket協(xié)議進(jìn)入系統(tǒng)。
工作流程由經(jīng)驗豐富的工程師設(shè)計,因此監(jiān)控結(jié)果在實際應(yīng)用中具有重要意義。此外,工程師來自不同的領(lǐng)域,包括一些交叉領(lǐng)域。不同類型的傳感器以復(fù)雜的方式使用。本文從工程師那里收集了1 024個不同的工作流程。
通過套接字獲取數(shù)據(jù)源并將其放入Apache Kafka[13]作為Spout的數(shù)據(jù)生成器。作為對比實驗,本文在sensor_id上使用fieldGrouping來對所有數(shù)據(jù)進(jìn)行分區(qū)(圖例中naive算法),而Storm的拓?fù)浣Y(jié)構(gòu)保持不變。這樣,所有的傳感器會以隨機的方式分配到不同的計算節(jié)點上進(jìn)行運算,可以有效地檢驗本文所述優(yōu)化和算法的有效性。
本文使用兩個性能指標(biāo):通信成本和節(jié)省的代碼行。通信成本通過每分鐘Filter Bolt與Calc Bolt任務(wù)之間傳遞的數(shù)據(jù)單元的數(shù)量來衡量。節(jié)省的代碼行是將腳本語言的行數(shù)和直接在Storm上編寫代碼執(zhí)行所有流處理邏輯的代碼行數(shù)的比較,該參數(shù)可以直觀地測量為傳感器監(jiān)測專家節(jié)省的工作量。
本文使用兩個評估參數(shù):計算涉及傳感器的數(shù)量n和計算中共享的傳感器的數(shù)量ns。n可以反映工作量的復(fù)雜性,而ns可以反映可以共享的數(shù)據(jù)量,也就是整個計算圖的連接性。
系統(tǒng)的通信成本遠(yuǎn)低于對比方法,在各種計算量的實驗中,平均提高了20%,并且特別在大計算量的實驗中更顯著(見圖4)。這是因為當(dāng)計算量更大時,會存在更多的重復(fù)計算的優(yōu)化空間,證明了分區(qū)算法的優(yōu)勢。
圖4 網(wǎng)絡(luò)傳輸量對比實驗結(jié)果
本文使用16個相同數(shù)據(jù)發(fā)送頻率的傳感器進(jìn)行了另一組腳本實驗,并且構(gòu)造了相同的計算邏輯。唯一的區(qū)別是本文改變不同的sensor_id以獲得不同的ns來改變數(shù)據(jù)計算的可共享性。當(dāng)ns增加時,系統(tǒng)的表現(xiàn)遠(yuǎn)遠(yuǎn)好于對比算法(參見圖5),實驗表明本文算法更好地利用了可利用的先驗知識讓計算盡可能在本地進(jìn)行??梢钥闯?,本系統(tǒng)在共享8個傳感器的計算中,達(dá)到最多20%的網(wǎng)絡(luò)傳輸減少,進(jìn)一步體現(xiàn)了本文算法的有效性。
圖5 網(wǎng)絡(luò)傳輸量和計算可共享傳感器數(shù)量關(guān)系實驗結(jié)果
另外本文測試了直接編寫Java代碼來直接實現(xiàn)計算邏輯,并和精簡的腳本語言進(jìn)行比較。實驗發(fā)現(xiàn):如果代碼不是由系統(tǒng)自動生成的,將會需要完成大量的重復(fù)編碼工作。從表1中看出,尤其是當(dāng)查詢數(shù)量很大時,腳本行數(shù)相對于Java的代碼行數(shù)有了極大的減少,這說明了本文提供的腳本語言為傳感器監(jiān)控專家節(jié)省了大量的精力。
表1 腳本優(yōu)化情況比較
本文提出了大規(guī)模傳感器流數(shù)據(jù)中的實時聚合計算框架的方法。主要貢獻(xiàn)是提供了適合這一類計算的簡單易用的腳本語言和相應(yīng)的分布式計算系統(tǒng)。腳本語言使分析人員能夠在滑動窗口的聚合的組合中構(gòu)建自己的計算邏輯。同時,該系統(tǒng)平臺可以將腳本語言解釋為Storm拓?fù)洌褂弥悄苡嬎愫头纸M方法來顯著提高性能。實驗證明了本文所述的系統(tǒng)使得傳感器監(jiān)控專家從編碼工作中解脫,在計算大規(guī)模傳感器的應(yīng)用中降低了流處理的通信成本,從而能夠在分布式環(huán)境中處理大量的查詢。