魏星貝,李陶深,2**,許 嘉,2 ,呂 品,2 ,楊 寧
(1.廣西大學(xué)計算機(jī)與電子信息學(xué)院,廣西南寧 530004;2.廣西高校并行與分布式計算技術(shù)重點(diǎn)實(shí)驗(yàn)室,廣西南寧 530004)
近年來,隨著數(shù)據(jù)采集設(shè)備的普及,以傳感器網(wǎng)絡(luò)[1]、金融服務(wù)[2]、網(wǎng)絡(luò)監(jiān)控[3]、航空航天以及氣候監(jiān)測為代表的重要應(yīng)用源源不斷地產(chǎn)生數(shù)據(jù)流,這些數(shù)據(jù)流亟待分析處理。數(shù)據(jù)流的產(chǎn)生具有無限性、連續(xù)性和快速性,因此數(shù)據(jù)流的分析處理要求及時性,以保證分析結(jié)果的時效性。一條數(shù)據(jù)流S可以形式化表示為S={s1,s2,s3,…,si,…},其中si表示第i個到達(dá)后端分析處理系統(tǒng)的流元組,si.v表示該流元組的值,si.ts表示該流元組的產(chǎn)生時間,稱為該流元組的時間戳。對數(shù)據(jù)流的分析處理,通常是基于流元組的時間戳語義進(jìn)行的。例如,手機(jī)導(dǎo)航跟蹤用戶移動設(shè)備地理位置數(shù)據(jù)流,就是基于時間順序的最新元組信息,給用戶實(shí)時推薦行進(jìn)的路線。但是,由于網(wǎng)絡(luò)延遲、處理器的并行操作或是異步數(shù)據(jù)流合并等原因[4],使得數(shù)據(jù)流上流元組不能按其時間戳的先后順序到達(dá)后端分析處理系統(tǒng),導(dǎo)致數(shù)據(jù)流出現(xiàn)亂序現(xiàn)象。例如在高速公路上,當(dāng)手機(jī)導(dǎo)航上傳數(shù)據(jù)中心的數(shù)據(jù)流出現(xiàn)亂序現(xiàn)象時,定位信息會大量遺漏丟失,產(chǎn)生異常跳動的現(xiàn)象,破壞了連接結(jié)果的完整性,影響了實(shí)時推薦的路線引導(dǎo)建議的準(zhǔn)確性。
為了減少亂序的影響,提升連接結(jié)果完整性,人們提出了基于緩存的亂序數(shù)據(jù)流處理方法,即緩存一定已到達(dá)的流元組,等待遲來的流元組,換取結(jié)果質(zhì)量的提升。其中,Abadi等[5]提出的K-slack方法就是基于緩存的亂序數(shù)據(jù)流處理方法的典型代表。該方法通常用一個大小為K時間單位的緩存來存儲已到達(dá)的流元組,即每個流元組到達(dá)系統(tǒng)后還需等待K個時間單位才能被釋放以繼續(xù)處理,釋放按緩存內(nèi)流元組的時間戳從小至大依次進(jìn)行。在K-slack方法中,到達(dá)的流元組需等待K個時間單位后才被分析處理,有效避免了延遲時間小于K個時間單位的遲到元組對結(jié)果質(zhì)量帶來的負(fù)面影響,但仍然會丟失延遲時間大于K個時間單位的遲到元組的連接結(jié)果。之后,Babu等[6]和Mutschler等[7]進(jìn)一步改進(jìn)了K-slack方法,使緩存區(qū)參數(shù)K隨數(shù)據(jù)流延遲大小變化進(jìn)行動態(tài)調(diào)整,直到K值等于當(dāng)前最大的延遲,從而優(yōu)化了緩存的大小,降低了對遲到流元組的平均等待時間,提高了連接處理的執(zhí)行效率。近年,Ji等[8-10]基于用戶指定的結(jié)果質(zhì)量指標(biāo)優(yōu)化參數(shù)K的取值:將連接結(jié)果質(zhì)量定義為連接結(jié)果集的召回率,給定用戶指定的結(jié)果質(zhì)量指標(biāo),基于連接處理過程中收集的統(tǒng)計數(shù)據(jù)優(yōu)化和調(diào)整參數(shù)K的取值。由于參數(shù)K和流元組到達(dá)系統(tǒng)后的等待時間相關(guān),該方法在保證結(jié)果質(zhì)量指標(biāo)前提下盡可能降低了對遲到流元組的平均等待時間。上述方法雖然保證了連接結(jié)果在時間域上的有序性,但還是增大了流元組的連接處理時延。楊寧等[11]研究設(shè)計一種混合嵌入分布式流處理模塊和分布式批處理模塊的亂序數(shù)據(jù)流分布式聚合查詢處理技術(shù),該技術(shù)通過限制自適應(yīng)地優(yōu)化流處理模塊所用的緩沖區(qū)大小來降低流處理的查詢處理延遲;利用存儲的歷史流數(shù)據(jù),以批處理的方式實(shí)現(xiàn)對極其晚到流元組的查詢處理,進(jìn)而保障聚合查詢結(jié)果的最終正確性。
除了K-slack方法以外,人們在數(shù)據(jù)流亂序處理方法中還運(yùn)用了基于標(biāo)點(diǎn)元組的方法和基于推測的方法?;跇?biāo)點(diǎn)元組的方法是在數(shù)據(jù)流中插入標(biāo)志時間進(jìn)度的標(biāo)點(diǎn)元組,標(biāo)點(diǎn)元組后到來的流元組時間戳都比標(biāo)點(diǎn)元組時間戳大,以此避免錯過對一些遲到元組的處理。例如,心跳機(jī)制[12]以及部分有序保證機(jī)制[13-14]都是基于標(biāo)點(diǎn)元組的方法。Mencagli等[15]以多核系統(tǒng)為背景,研究解決亂序流式大數(shù)據(jù)上的連續(xù)偏好查詢(例如Top-k查詢和Skyline查詢)的并行執(zhí)行問題,采用基于K-slack的緩存技術(shù)產(chǎn)生標(biāo)點(diǎn)元組,并基于標(biāo)點(diǎn)元組確定亂序數(shù)據(jù)流發(fā)送進(jìn)度。在基于標(biāo)點(diǎn)元組的方法中,如果標(biāo)點(diǎn)元組遲遲不到,那么可能會使得窗口等待閉合的時間延長,不利于實(shí)時性要求較高的連接處理操作,嚴(yán)重影響查詢處理的效率?;谕茰y的方法是一種激進(jìn)的處理方法[16-17],該方法以假設(shè)數(shù)據(jù)流元組是有序到達(dá)的為前提,先激進(jìn)地處理已到達(dá)系統(tǒng)的流元組,輸出處理結(jié)果,直到后續(xù)遲到流元組的到來。僅當(dāng)確認(rèn)之前輸出結(jié)果不正確時,該方法才進(jìn)行結(jié)果撤回,利用存儲的歷史數(shù)據(jù)重新計算和輸出結(jié)果。基于推測的方法加快了亂序數(shù)據(jù)流的處理效率,常用于處理亂序事件流,實(shí)現(xiàn)對復(fù)合事件的實(shí)時檢測,但由于需存儲大量的歷史數(shù)據(jù),增大了內(nèi)存開銷,且遲到元組頻繁出現(xiàn)可能導(dǎo)致錯誤結(jié)果連續(xù)撤回,增大連接開銷。
一些研究人員從時間維度、外形輪廓和結(jié)構(gòu)變化上的相似性等3個角度,對基于時間關(guān)聯(lián)性的數(shù)據(jù)流相似性進(jìn)行研究。Aghabozorgi等[18]利用大量數(shù)據(jù)流的統(tǒng)計量對數(shù)據(jù)流進(jìn)行宏觀上的比對,聚類比較了數(shù)據(jù)流的不同階段或不同的數(shù)據(jù)流之間相似性。Mukhoti等[19]對數(shù)據(jù)流提取模糊關(guān)聯(lián)模式用以預(yù)測事件。Jacques-Silva等[20]討論了Facebook如何基于歷史數(shù)據(jù)構(gòu)建分布式計算環(huán)境下亂序流式大數(shù)據(jù)的流元組延遲估計模型,并基于該估計模型和用戶對系統(tǒng)處理單元的處理延遲的需求生成一定精度的標(biāo)點(diǎn)元組,從而權(quán)衡單處理單元的處理延遲和連接查詢的結(jié)果精度這兩個重要指標(biāo)。朱睿等[21]針對數(shù)據(jù)流上的連續(xù)Top-k查詢設(shè)計了哈希過濾器,可以有效過濾不可能成為查詢結(jié)果的亂序流元組,從而降低對亂序流元組的等待時間。許嘉等[22]提出了一種基于EMD距離的數(shù)據(jù)流分布式相似性連接技術(shù)(EMD-DDSJ),該技術(shù)基于數(shù)據(jù)局部性特征增強(qiáng)了連接算法對不相似直方圖元組對間EMD計算的過濾性能,提高了各連接計算節(jié)點(diǎn)的執(zhí)行效率;通過一種基于反饋的負(fù)載均衡策略,有效提升EMD-DDSJ技術(shù)的整體執(zhí)行性能。
為了降低亂序數(shù)據(jù)流的平均連接處理時延,滿足用戶及時性需求[23],本研究提出了質(zhì)量驅(qū)動的亂序數(shù)據(jù)流連接處理技術(shù)(簡稱QJoin)。該技術(shù)將通過緩存一定量的歷史數(shù)據(jù)并采用對稱連接的策略實(shí)現(xiàn)對到達(dá)系統(tǒng)流元組的即時處理并輸出連接結(jié)果,以期顯著降低流元組的平均處理時延,提高連接處理的速率;基于用戶指定的結(jié)果質(zhì)量指標(biāo)來優(yōu)化內(nèi)存使用量,降低平均內(nèi)存開銷。最后,基于真實(shí)數(shù)據(jù)集對QJoin技術(shù)進(jìn)行實(shí)驗(yàn)驗(yàn)證,以說明該技術(shù)的有效性。
在數(shù)據(jù)流的連接操作中,用戶非常關(guān)注處理的實(shí)時性和準(zhǔn)確性,因此必須考慮數(shù)據(jù)流亂序問題的處理。在處理數(shù)據(jù)流亂序問題上,基于緩存的方法是最常見的處理方法之一。經(jīng)研究分析,現(xiàn)有的基于緩存處理亂序方法多以最優(yōu)結(jié)果完整性或最優(yōu)處理效率為目的,以數(shù)據(jù)流的整個歷史的最大延遲或者平均延遲作為參考,對緩存大小進(jìn)行調(diào)整,沒有考慮數(shù)據(jù)流的時間關(guān)聯(lián)性,忽略了臨近時間段的延遲變化對緩存的影響?,F(xiàn)有的方法很少從用戶的角度來綜合考慮結(jié)果完整性、存儲開銷、處理效率的有效折中,使得晚到的元組到來后不能即時進(jìn)行連接處理,增加了數(shù)據(jù)流平均連接處理時延,導(dǎo)致處理效率不高。
針對以上問題,本研究提出了一種基于質(zhì)量驅(qū)動的亂序數(shù)據(jù)流連接處理技術(shù)QJoin的框架(圖1)。QJoin的設(shè)計思想:關(guān)注數(shù)據(jù)流的及時性處理需求,特別是晚到數(shù)據(jù)流的連接與調(diào)度,將基于緩存的方法和對稱連接方法[24]有機(jī)結(jié)合起來,實(shí)現(xiàn)對亂序數(shù)據(jù)流流元組的即時處理。其技術(shù)特點(diǎn)在于:綜合權(quán)衡了用戶結(jié)果質(zhì)量與緩存開銷,考慮了數(shù)據(jù)流上的時間關(guān)聯(lián)性,基于臨近周期連接處理過程收集統(tǒng)計的數(shù)據(jù),優(yōu)化緩存的大小,更好地實(shí)現(xiàn)對數(shù)據(jù)流的及時性處理。
圖1 QJoin的技術(shù)框架Fig.1 Technique framework of QJoin
QJoin采取了以下的技術(shù)處理手段:
(1)每條數(shù)據(jù)流的流元組到達(dá)系統(tǒng)后,進(jìn)入存儲流實(shí)現(xiàn)在內(nèi)存中的緩存,同時進(jìn)入連接流實(shí)現(xiàn)和另一條數(shù)據(jù)流在內(nèi)存中緩存元組之間的連接處理。以圖1中亂序流R的元組ri(i=1,2,…)為例,當(dāng)ri到來時,同時進(jìn)行兩個工作:一是進(jìn)入存儲流完成在流R緩存中的存儲;二是進(jìn)入連接流實(shí)現(xiàn)和數(shù)據(jù)流S緩存元組之間的連接處理,直到生成結(jié)果流,從連接流中丟棄。亂序流S的元組sj(j=1,2,…)到來時,操作是類似的。
(2)存儲流和連接流對于每個流元組的處理都是即時的。每條流在內(nèi)存中的緩存都運(yùn)行一定的過期清理策略,從緩存中刪除過期的流元組。
(3)在進(jìn)行對稱連接處理的過程中,QJoin技術(shù)不斷基于臨近的周期的歷史元組計算用戶指定質(zhì)量指標(biāo),收集統(tǒng)計信息進(jìn)行估計結(jié)果質(zhì)量,統(tǒng)計信息包括如圖1中各元組延遲和生產(chǎn)力、各周期結(jié)果數(shù)目,在滿足用戶指定的結(jié)果質(zhì)量的同時,盡可能降低對歷史數(shù)據(jù)的內(nèi)存緩存量,從而優(yōu)化緩存的大小。
QJoin技術(shù)采用對稱連接的方式處理亂序數(shù)據(jù)流連接,同時緩存一定量的歷史數(shù)據(jù)。假設(shè)緩存區(qū)大小設(shè)定為可以容納住所有需要連接的元組,具體的處理步驟如下:
Step 1:流元組r∈R到達(dá)系統(tǒng)后,由存儲流實(shí)現(xiàn)在內(nèi)存中的流R緩存區(qū)的存儲,同時由連接流即刻完成r和流S緩存區(qū)中落在滑動窗口內(nèi)的流元組的連接,輸出連接結(jié)果,連接流上的元組r丟棄;
Step 2:對于到達(dá)系統(tǒng)的流元組s∈S,同樣由連接流即刻完成s和對面流R緩存區(qū)中落在滑動窗口內(nèi)的流元組的連接,輸出連接結(jié)果,連接流上的元組s丟棄;
Step 3:流R緩存區(qū)和流S緩存區(qū)中,當(dāng)元組數(shù)目超出緩存區(qū)的大小就會被移出緩存區(qū),進(jìn)行丟棄。
數(shù)據(jù)流的延遲定義為當(dāng)前流上到來的最大時間戳與遲到元組時間戳的差,QJoin利用延遲統(tǒng)計量d,定時將流R緩存區(qū)和流S緩存區(qū)中滿足x.ts≤T-d的流元組清除,其中x為R流或S流的流元組,T為R流和S流上最大時間戳中的最小值,標(biāo)記為當(dāng)前時刻。QJoin技術(shù)在對稱連接方法的基礎(chǔ)上,考慮到流上延遲分布與待連接流緩存的關(guān)系,元組延遲與結(jié)果質(zhì)量存在關(guān)聯(lián)性,滿足用戶指定結(jié)果質(zhì)量的同時,自適應(yīng)調(diào)整元組過期,優(yōu)化內(nèi)存使用量。
由于在對稱連接中,只要連接流上元組到來就可以與對面的緩存內(nèi)元組即時連接,所以即使是因存在亂序問題而導(dǎo)致元組遲到的現(xiàn)象,只要其待連接的元組還在對面緩存區(qū)中,就可以有效地完成連接操作,保證了處理的及時性和結(jié)果的完整性。因此,緩存區(qū)的大小設(shè)定受到對面連接流上遲到元組的影響,需要儲存這些遲到元組待連接的元組。
QJoin技術(shù)中,使用結(jié)果召回率作為處理亂序數(shù)據(jù)流的質(zhì)量標(biāo)準(zhǔn)。結(jié)果召回率是實(shí)際連接得到的結(jié)果數(shù)目占本應(yīng)該連接得到的理想結(jié)果數(shù)目的百分比[25]。QJoin技術(shù)考慮用戶對連接處理結(jié)果的及時性需求,允許用戶指定一個用戶周期P,以P周期的結(jié)果召回率來替代整個流歷史的結(jié)果召回率。同時,由于數(shù)據(jù)流元組間的時間關(guān)聯(lián)性,用最新的P周期歷史來計算結(jié)果召回率,可敏銳地捕捉到結(jié)果召回率的變化,以幫助后續(xù)的亂序流處理操作得到更好的結(jié)果質(zhì)量。
在QJoin中,假設(shè)用戶給定了周期P,則周期P內(nèi)實(shí)際的流連接質(zhì)量為召回率QP:
(1)
QJoin中用戶可以指定結(jié)果質(zhì)量(召回率),表示為Quser,要求P周期內(nèi)求得的召回率QP滿足:QP≥Quser。
1.4.1 緩存自適應(yīng)調(diào)整
在QJoin中,需要緩存足夠大,能包含窗口內(nèi)所有應(yīng)到來的元組時,必須考慮到延遲元組的影響:需緩存的元組包括落在窗內(nèi)的元組和窗外的遲到元組,即緩存大小與窗內(nèi)元組和元組延遲分布有關(guān)。QJoin技術(shù)在用戶指定質(zhì)量要求下,自適應(yīng)調(diào)整緩存大小,方法如下:使用一個大小為周期P的大滑動窗口,滑動步長為自適應(yīng)周期L,從流上第一個P周期結(jié)束時刻起,利用最近的L周期歷史元組特性,進(jìn)行下一個L周期的緩存估計設(shè)置,即當(dāng)大窗口每滑動一次,前進(jìn)L周期,基于最近的L周期歷史進(jìn)行一次緩存自適應(yīng)調(diào)整,要求L
在每一次緩存自適應(yīng)調(diào)整中,需要滿足目標(biāo)函數(shù)。設(shè)R流與S流的占用的緩存分別為x、y,求出對應(yīng)的(x,y),使流占用的總緩存M(x,y)盡可能小的目標(biāo)函數(shù)如下:
minM(x,y)=x+y,
s.t.QL(x,y)≥QL,
0≤x≤X,
0≤y≤Y,
(2)
其中,M(x,y)為總緩存大小,是R流緩存大小x與S流緩存大小y的和。當(dāng)數(shù)據(jù)流的流速一定時,x與y受存放時間的影響。存放時間就是元組過期前在緩存中的時間,決定元組何時過期移出內(nèi)存,受元組的延遲d與窗口w大小影響。當(dāng)窗口大小固定,存放時間的變動只受元組的延遲影響,保存時間增加d時間單位時,延遲為d的元組就可進(jìn)入存儲流參與連接,因此設(shè)流R的流速為Vr,緩存x與R流延遲dx的關(guān)系可以表示為x=(dx+w)×Vr,同理,流S的流速為Vs,緩存y與S流延遲dy的關(guān)系可以表示為y=(dy+w)×Vs,緩存問題可以轉(zhuǎn)化為時間問題。
QL(x,y)為最近L周期歷史下,R流緩存大小設(shè)置為x與S流緩存大小設(shè)置為y時的結(jié)果質(zhì)量,QL為基于P周期內(nèi)用戶要求質(zhì)量求得的L周期的質(zhì)量期望(具體求解見1.4.2),X為受R流當(dāng)前最大延遲與窗口大小影響的最大緩存,Y為S流受當(dāng)前最大延遲與窗口影響的最大緩存。
1.4.2L周期用戶質(zhì)量期望
(3)
1.4.3L周期受緩存影響的質(zhì)量QL(x,y)
當(dāng)數(shù)據(jù)流的流速V一定時,L階段受緩存容量影響的實(shí)際質(zhì)量QL(x,y)轉(zhuǎn)化為受R流延遲dx與S流延遲dy影響的質(zhì)量QL(dx,dy):
(4)
其中,Nprod(dx,dy)為L階段內(nèi)受R流延遲dx與S流延遲dy影響產(chǎn)生的結(jié)果數(shù)目,NL為L周期理想狀態(tài)應(yīng)該產(chǎn)生的結(jié)果數(shù)目。Nprod(dx,dy)受到選擇度sel(dx,dy)與交叉連接的結(jié)果數(shù)N×(dx,dy)的影響,計算公式為
Nprod(dx,dy)=sel(dx,dy)×N×(dx,dy)。
(5)
下面分別給出L階段內(nèi)交叉連接數(shù)Nx(dx,dy),選擇度sel(dx,dy)的求解過程。
1)Nx(dx,dy)的求解
L時間段交叉連接數(shù)目,是L時間段內(nèi)到來的R流元組與其對應(yīng)的S流窗內(nèi)所有元組的連接數(shù)Nx(dy)和此時S流元組與其對應(yīng)的R流窗內(nèi)元組的連接數(shù)Nx(dx)的和。交叉連接數(shù)Nx(dx)的求解方式與交叉連接數(shù)Nx(dy)的求解方式類似,這里以流R的交叉連接數(shù)Nx(dx)求解為例。
設(shè)窗口大小為w,對于任意輸入元組r∈R,只有對應(yīng)的S流元組s滿足|r.ts-s.ts|≤w時,才能進(jìn)行連接,則對元組r而言,其交叉連接數(shù)是S流窗內(nèi)元組數(shù)目|W′s|。因此L周期內(nèi),若已知數(shù)據(jù)流R的平均流速Vr,可求輸入的R流元組數(shù)目,對每個R流元組對應(yīng)的S流窗內(nèi)元組數(shù),可求出流R的L階段交叉連接數(shù)Nx(dy):
N×(dy)=Vr×L×|W′s|,
(6)
其中,|W′s|受延遲dy影響,由實(shí)際情況可知,緩存越大,窗口內(nèi)遲到元組被連接上的數(shù)目越多,然而緩存中輸入流元組越新的地方,元組遲到的可能性越大,因此通過對窗口w進(jìn)一步切割,設(shè)置基礎(chǔ)窗b[21]來計算受遲到元組影響的窗口內(nèi)元組數(shù)目。
為了更清晰地描述遲到元組對窗口內(nèi)元組的影響,需先求出遲到元組t的延遲分布特性。設(shè)隨機(jī)變量D表示元組粗粒度的延遲,g表示實(shí)際的延遲粒度,當(dāng)delay(t)∈[0,g],令D=0;當(dāng)delay(t)∈(g,2g],令D=1;當(dāng)delay(t)∈(2g,3g],令D=2;余下的依次類推。設(shè)fD(d)為隨機(jī)變量D的概率密度,表示為fD(d)=P[D=d],d=1,2,3,…,是延遲為D=d的元組出現(xiàn)的概率。設(shè)基礎(chǔ)窗大小為b時間單位,將大小為w的窗口被分成n個小窗口,以S流窗舉例,S流窗內(nèi)元組數(shù)目相當(dāng)于n個小窗口內(nèi)元組數(shù)目的和,每個小窗口的元組數(shù)目W′s是由平均流速VS和基礎(chǔ)窗大小b及落入到基礎(chǔ)窗的元組概率的積決定的,計算公式如下:
(7)
L周期的本來應(yīng)該產(chǎn)生的結(jié)果數(shù)NL同樣是選擇度sel與交叉連接數(shù)N×的積,計算公式如下:
NL=sel×N×,
(8)
其中,交叉連接數(shù)N×表示在最理想狀態(tài),當(dāng)緩存能包含所有遲到元組的情形下,可能得到的交叉連接數(shù)N×,選擇度sel同樣放在后面講具體細(xì)節(jié)。對L階段內(nèi)交叉連接結(jié)果數(shù)N×:
N×=N×(Maxdx)+N×(Maxdy),
(9)
其中,Maxdx表示為在R流上最大的延遲,Maxdy表示為S流上最大的延遲,N×(Maxdy)和N×(Maxdx)分別是理想狀態(tài)下R流與S流交叉連接數(shù)目,求解方式類似。以流R的交叉連接數(shù)目N×(Maxdy)為例,設(shè)窗口大小為w,若已知數(shù)據(jù)流R的平均流速Vr,可求出在L周期輸入的R流元組數(shù)目,每個R流元組對應(yīng)的S流窗內(nèi)元組數(shù)在理想狀態(tài)下包括所有實(shí)際落在當(dāng)前窗口內(nèi)的元組與遲到元組,因此L階段內(nèi)流R的交叉連接數(shù)目N×(Maxdy)表示為
N×(Maxdy)=Vs×L×Vr×(w+Maxdy)。
(10)
2)sel(dx,dy)的求解
選擇度是符合相似度函數(shù)的實(shí)際連接次數(shù)占所有參與連接的實(shí)際連接次數(shù)的百分比,基于最近的L周期內(nèi)延遲與元組產(chǎn)出結(jié)果的關(guān)系來求得。在最近L周期內(nèi),當(dāng)元組t輸入時,統(tǒng)計延遲delay(t),元組的連接數(shù)N′(t)和元組的結(jié)果數(shù)N(t)。受延遲dx和dy影響的最近L階段的選擇度計算如下:
sel(dx,dy)=
(11)
同理,理想狀態(tài)下的選擇度可認(rèn)為是受最大延遲的影響,最近L階段的理想選擇度計算如下:
sel(Maxdx,Maxdy)=
(12)
假設(shè)有兩條亂序數(shù)據(jù)流R和S,QJoin技術(shù)中緩存自適應(yīng)調(diào)整的偽代碼為
算法1 QJoin技術(shù)中的緩存自適應(yīng)調(diào)整算法
輸入:自適應(yīng)間隔L、基礎(chǔ)窗口大小b、窗口大小w、延遲增加的粒度g、相似函數(shù)的閾值θ、流R中當(dāng)前最大延遲流Maxdx、流S中當(dāng)前最大延遲Maxdy、每個元組的連接數(shù)目、每個元組的連接結(jié)果數(shù)目、每個P-L周期實(shí)際連接結(jié)果數(shù)目、從用戶指定質(zhì)量Quser得到的L周期質(zhì)量期望QL、流R的流速Vr、流S的流速Vs
輸出:(x,y),其中x表示R緩存大小,y表示S緩存大小
Begin
1dx=0;dy=0; //對元組延遲的初
始化
2while(dy<=Maxdy)do//將延遲查找范圍限制在當(dāng)前歷史流上最大延遲內(nèi)
3while(dx<=Maxdx)do
4if(QL(dx,dy) 5elserecord(dx,dy); 6dy=dy+g; 7foreach(dx,dy) in record(dx,dy)do 8x=(dx+w) *Vr; //計算緩存的總使用量 9y=(dy+w) *Vs; 10M(x,y)=x+y; 11if(getMin(M(x,y)) //比較所有記錄值 12return(x,y); End. 在上述算法中,每個自適應(yīng)周期結(jié)束后對緩存進(jìn)行一次調(diào)整,其中1-6行是利用延遲特性與質(zhì)量的關(guān)系,求出所有可以滿足L周期質(zhì)量期望的需要緩存元組的延遲。如果緩存了小于等于該延遲值的元組后得到的結(jié)果質(zhì)量滿足L周期的質(zhì)量預(yù)期QL,就記錄下來,否則就增大一個g延遲粒度。第7-12行是利用延遲與緩存的關(guān)系,返回適宜的緩存。通過比較計算得到的所有記錄值,求出使總緩存值最小的R流緩存x,S流緩存y。QJoin技術(shù)中,考慮了緩存的最理想情況,即延遲最大的元組都可以在緩存中找到所有需要連接的元組,這時得到的召回率是L周期內(nèi)理想情況召回率;此外,還考慮了最近L周期召回率與緩存之間關(guān)系,使用用戶質(zhì)量指標(biāo)和統(tǒng)計量采樣,得到更合理的緩存,以降低緩存開銷。 本實(shí)驗(yàn)使用一臺CPU 3.1 GHz、16 G內(nèi)存、500 G硬盤的PC設(shè)備進(jìn)行試驗(yàn)測試。操作系統(tǒng)是Windows 10,所有代碼用Java語言編寫。實(shí)驗(yàn)數(shù)據(jù)集包括2段球賽訓(xùn)練數(shù)據(jù)D1和D2,源于一場足球比賽數(shù)據(jù)[1],由德國紐倫堡體育足球場上的傳感器系統(tǒng)采集。該數(shù)據(jù)包含兩條數(shù)據(jù)流(R流和S流),分別由足球上的傳感器和運(yùn)動員身上的傳感器采集。數(shù)據(jù)集中每個元組包含信息(sID,ts,location),其中sID用于區(qū)分R流和S流,ts表示元組時間戳,location是運(yùn)動員們在球場的位置信息。具體信息如表1。 表1 數(shù)據(jù)集特性Table 1 Feature of datasets 本實(shí)驗(yàn)使用的查詢語句為 SELECT * FROMR[2 sec],S[2 sec] WHERE distance(R.location,S.location)<=5 m。 重要參數(shù)默認(rèn)設(shè)置值包括用戶指定質(zhì)量周期P=1 min,自適應(yīng)調(diào)整周期為L=1 sec,基礎(chǔ)窗口大小為b=10 ms,自適應(yīng)調(diào)整粒度為g=10 ms。 為了使結(jié)果顯示更清晰明確,實(shí)驗(yàn)中使用連接過程中平均內(nèi)存開銷作為度量標(biāo)準(zhǔn)。當(dāng)數(shù)據(jù)流流速一定時,平均內(nèi)存開銷越大,可存儲的遲到元組延遲就越大。 首先考察QJoin技術(shù)中重要參數(shù)設(shè)置對內(nèi)存開銷的影響。為此,分別對用戶指定質(zhì)量周期P、自適應(yīng)調(diào)整周期L、基礎(chǔ)窗口大小b、自適應(yīng)調(diào)整粒度g進(jìn)行設(shè)定值調(diào)整來進(jìn)行比較實(shí)驗(yàn),其他條件為默認(rèn)設(shè)置值。圖2為用戶指定最小召回率為Quser=0.90和Quser=0.95,使用數(shù)據(jù)集合D1時,重要參數(shù)設(shè)置對算法影響的實(shí)驗(yàn)結(jié)果。實(shí)驗(yàn)中使用平均內(nèi)存開銷(即緩存的元組數(shù)目)來顯示實(shí)驗(yàn)的結(jié)果。圖2a中觀察到周期P對內(nèi)存的平均開銷影響并不大,只是在周期P設(shè)置為60 s,顯示微小的差異,因此最終周期P默認(rèn)設(shè)置為60 s。圖2b可以清晰顯示出當(dāng)自適應(yīng)周期為0.1 s時平均內(nèi)存開銷更少,實(shí)際應(yīng)用時可以設(shè)置自適應(yīng)周期L為0.1 s。由圖2c中觀察到基礎(chǔ)窗大小b的選取過于細(xì)小或者寬大,都會使估計不夠準(zhǔn)確,或平均內(nèi)存開銷增大。由圖2d可觀察到當(dāng)自適應(yīng)調(diào)整粒度g取10 ms時,平均內(nèi)存開銷較低。 圖2 不同參數(shù)對QJoin技術(shù)平均內(nèi)存開銷的影響Fig.2 Effect of different parameters on the average memory cost of QJoin technology 由于MP-K-slack[7]技術(shù)具有典型性,通常被作為相關(guān)技術(shù)研究的實(shí)驗(yàn)比較對象,因此本研究也是將QJoin技術(shù)和MP-K-slack技術(shù)進(jìn)行比較。 1)流元組平均處理時延比較 流元組平均處理時延是所有元組進(jìn)入系統(tǒng)到最終輸出連接結(jié)果的時間間隔平均值。圖3給出了QJoin技術(shù)和MP-K-slack技術(shù)關(guān)于流元組平均處理時延的實(shí)驗(yàn)對比結(jié)果。 MP-K-slack技術(shù)的處理思路是設(shè)置一個K時間單位的緩存,初始值為0,當(dāng)前流歷史上最大時間戳標(biāo)注為當(dāng)前時刻tcurr,每到來一個元組就插入到緩存中,與當(dāng)前時刻tcurr比較,若大于tcurr,就更新tcurr。當(dāng)tcurr更新時,做如下兩個操作:1)更新K=max{K,D(x)},其中D(x)=tcurr-x.ts,是元組x的延遲,是上一次tcurr更新時計算得到的;2)將滿足x.ts+K<=tcurr的元組,從緩存中彈出。從工作原理來看,MP-K-slack技術(shù)隨延遲分布波動,始終以當(dāng)前最大延遲作為等待時間,正常元組需要等待較長時間后才能釋放進(jìn)行連接處理,流元組平均處理時延較長。而本研究提出的QJoin技術(shù)中,元組一旦進(jìn)入系統(tǒng)就開始連接,并快速輸出結(jié)果。當(dāng)用戶要求的召回率超過0.85時,相比于MP-K-slack技術(shù),QJoin技術(shù)的流元組處理時延降低了約80%-95%(圖3),原因是MP-K-slack技術(shù)必須要緩存元組更久,才能有效處理盡可能多的遲到元組,滿足召回率,而QJoin技術(shù)在對稱連接和合理緩存的情形下可以直接參與連接,可以更快地進(jìn)行流元組的連接,有利于提高系統(tǒng)進(jìn)行連接處理的處理速率。 圖3 QJoin技術(shù)和MP-K-slack技術(shù)的流元組平均處理時延比較Fig.3 Comparision of average tuple processing delay of algorithms QJoin and MP-K-Slack 2)平均內(nèi)存開銷比較 與MP-K-slack技術(shù)相比,在用戶要求召回率越高的情況下,本研究提出的QJoin技術(shù)平均內(nèi)存的開銷較低,存儲使用量明顯降低了約50%-80%(圖4)。原因在于:MP-K-slack技術(shù)為了滿足足夠的召回率,必須要緩存阻塞元組更久,就會使更多的元組滯留在緩存區(qū)中,特別是在流速較快、延遲較大的遲到元組較多的數(shù)據(jù)流中(圖4a),而QJoin技術(shù)是基于對稱連接的技術(shù),在滿足召回率的情形下只需要合理緩存適量的歷史元組,因此優(yōu)化效果明顯,對內(nèi)存的需求更低。 圖4 QJoin技術(shù)和MP-K-slack技術(shù)平均內(nèi)存開銷比較Fig.4 Comparison of average memory cost between QJoin and MP-K-Slack 本文研究了質(zhì)量驅(qū)動下的亂序數(shù)據(jù)流連接處理問題,提出一種質(zhì)量驅(qū)動的亂序數(shù)據(jù)流連接處理技術(shù)QJoin。該技術(shù)基于數(shù)緩存和對稱連接方法實(shí)現(xiàn)對亂序數(shù)據(jù)流流元組的即時處理,顯著降低了流元組的平均等待時延,提升了基于滑動窗口語義的亂序數(shù)據(jù)流連接處理的處理速率。采用質(zhì)量驅(qū)動的理念,基于連接處理過程中收集的統(tǒng)計數(shù)據(jù)優(yōu)化緩存的大小,使得在滿足用戶指定的結(jié)果質(zhì)量的同時,大大降低了對歷史數(shù)據(jù)的內(nèi)存緩存量;利用歷史數(shù)據(jù)元組緩存,較好地保證了遲到元組的連接處理完整性,從而實(shí)現(xiàn)在滿足用戶結(jié)果質(zhì)量要求的前提下盡可能降低了系統(tǒng)內(nèi)存開銷。與現(xiàn)有的MP-K-slack方法相比,QJoin技術(shù)在滿足用戶結(jié)果質(zhì)量的同時,不僅能夠保證較低的數(shù)據(jù)流流元組處理時延,比MP-K-slack方法最大降低了約95%,還有效降低了內(nèi)存使用開銷,比MP-K-slack方法最大降低了約80%。2 結(jié)果與分析
2.1 實(shí)驗(yàn)環(huán)境設(shè)置
2.2 參數(shù)設(shè)置對內(nèi)存開銷的影響
2.3 QJoin技術(shù)和MP-K-slack技術(shù)性能比較
3 結(jié)論