王寶軍 詹英
摘要: 對(duì)于許多應(yīng)用領(lǐng)域不斷產(chǎn)生的數(shù)據(jù)流,面向數(shù)據(jù)流聚集查詢的應(yīng)用最為廣泛。本文在構(gòu)造壓縮桶的基礎(chǔ)上,提出了基于時(shí)間維度壓縮數(shù)據(jù)流的算法,來(lái)動(dòng)態(tài)地形成壓縮數(shù)據(jù)流,并進(jìn)一步給出了使用壓縮桶獲得數(shù)據(jù)流聚集查詢的數(shù)學(xué)方法。
關(guān)鍵詞: 數(shù)據(jù)流; 壓縮桶; 聚集查詢; 時(shí)間維度
中圖分類號(hào):TP393文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1006-8228(2012)04-29-03
Aggregate compression algorithm for data stream
Wang Baojun, Zhan Ying
(Zhejiang Institute of Communications, Hangzhou, Zhejiang 311112, China)
Abstract: In many fields, data stream continues to grow in terms of generation speed. Aggregate query for data stream was most widely used. By constructing compression buckets, the authors provides in this paper a compression algorithm for data stream based on time dimension, in order to dynamically form compression data stream, and give mathematical method of aggregate query for data stream, by use of compression buckets.
Key words: data stream; compression buckets; aggregate query; time dimension
0 引言
數(shù)據(jù)流是隨著網(wǎng)絡(luò)的廣泛應(yīng)用而出現(xiàn)的一種新的數(shù)據(jù)形式。數(shù)據(jù)流聚集查詢是數(shù)據(jù)流管理與知識(shí)發(fā)現(xiàn)系統(tǒng)中一種重要的數(shù)據(jù)知識(shí)發(fā)現(xiàn)模型,但快速流動(dòng)的流數(shù)據(jù)與有限的處理能力之間的矛盾使得流數(shù)據(jù)的聚集查詢分析比關(guān)系數(shù)據(jù)庫(kù)的聚集分析更困難。
目前國(guó)內(nèi)外已經(jīng)對(duì)數(shù)據(jù)流聚集查詢模式展開(kāi)了研究。Dobra A等人研究利用隨機(jī)草圖技術(shù),提取數(shù)據(jù)流的輪廓,減少數(shù)據(jù)的處理量來(lái)加快數(shù)據(jù)處理速度,并提出了一種草圖分割技術(shù)來(lái)提高算法的性能[1]。Gilbert A C等人研究采用小波技術(shù)對(duì)數(shù)據(jù)流進(jìn)行壓縮,實(shí)現(xiàn)了近似聚集查詢[2]。Madden研究了傳感器網(wǎng)絡(luò)中的聚集查詢問(wèn)題,重點(diǎn)是如何動(dòng)態(tài)地建立路由樹(shù),實(shí)現(xiàn)流水線聚集操作[3,4]。Ahnad Y提出了數(shù)據(jù)流查詢的分布式操作[5]。張冬冬等人提出了一種新的數(shù)據(jù)流傳輸方式,有效地減少網(wǎng)絡(luò)中分布式數(shù)據(jù)流的傳輸量[9]。傅鸝等人建立了基于數(shù)據(jù)流驅(qū)動(dòng)的數(shù)據(jù)流連續(xù)查詢模型,設(shè)計(jì)并使用查詢算子在查詢鏈中的有序組合來(lái)構(gòu)造出各種復(fù)雜的連續(xù)查詢語(yǔ)句[7]。李建中等人提出利用多元線性回歸方法來(lái)預(yù)測(cè)具有線性關(guān)系的數(shù)據(jù)流的未來(lái)聚集值,但如果數(shù)據(jù)不具有線性關(guān)系,該模型誤差就會(huì)增大[10]。
以上的數(shù)據(jù)流聚集查詢相關(guān)算法采用近似聚集、壓縮數(shù)據(jù)流等技術(shù)來(lái)提高查詢速度。由于數(shù)據(jù)流的“流”性和隨機(jī)性,使得流量的變化具有突發(fā)性,然而,商業(yè)活動(dòng)中,普遍要求能夠?qū)崟r(shí)地檢索面向數(shù)據(jù)流的聚集查詢結(jié)果,并獲得更高的準(zhǔn)確率。
1 數(shù)據(jù)流壓縮
1.1 相關(guān)問(wèn)題描述
數(shù)據(jù)流是一個(gè)以數(shù)據(jù)到達(dá)時(shí)間為戳的數(shù)據(jù)序列。流數(shù)據(jù)的聚集查詢分為預(yù)定義查詢(Predefined Query)和即席查詢(Real-time Query)兩類。預(yù)定義查詢主要針對(duì)數(shù)據(jù)流后續(xù)到來(lái)的數(shù)據(jù)計(jì)算查詢結(jié)果;而即席查詢是針對(duì)數(shù)據(jù)流中流過(guò)的所有數(shù)據(jù)。數(shù)據(jù)流源源不斷地流入系統(tǒng),因此無(wú)法將所有數(shù)據(jù)流保存起來(lái),為了獲得更為準(zhǔn)確的即時(shí)查詢結(jié)果,在聚集查詢中,需要對(duì)數(shù)據(jù)流進(jìn)行壓縮。由于數(shù)據(jù)流動(dòng)態(tài)振蕩流動(dòng),面向數(shù)據(jù)流的數(shù)據(jù)流聚集查詢系統(tǒng)無(wú)法存儲(chǔ)所有流數(shù)據(jù),而用戶有查詢分析過(guò)去與未來(lái)流數(shù)據(jù)的需求,因此需要不斷地壓縮數(shù)據(jù)流,來(lái)滿足用戶需求。壓縮后的數(shù)據(jù)流結(jié)構(gòu)應(yīng)該是簡(jiǎn)單的,方便為用戶提供各類流數(shù)據(jù)聚集查詢,并能夠最大程度地反映原始流數(shù)據(jù)。壓縮后的數(shù)據(jù)流結(jié)構(gòu)是對(duì)壓縮后的數(shù)據(jù)流的靜態(tài)特征的描述,它描述數(shù)據(jù)的內(nèi)容和流數(shù)據(jù)之間的相互關(guān)系。
由于數(shù)據(jù)流連續(xù)無(wú)限地流動(dòng),數(shù)據(jù)流具有時(shí)間特征,因此可以在時(shí)間維度上壓縮數(shù)據(jù)流。本文采用基于對(duì)數(shù)尺度的時(shí)間傾斜框架模型[8]來(lái)壓縮數(shù)據(jù)流。面向數(shù)據(jù)流壓縮算法以增量的方式對(duì)壓縮數(shù)據(jù)流進(jìn)行更新,從而提高數(shù)據(jù)流的壓縮速度,滿足數(shù)據(jù)流聚集查詢的實(shí)時(shí)性要求。用戶會(huì)根據(jù)需求向系統(tǒng)提出多種聚集查詢,這要求壓縮數(shù)據(jù)流盡可能地反映原數(shù)據(jù)流的信息。隨著時(shí)間的流逝,流過(guò)的流數(shù)據(jù)被不斷地壓縮,歷史流數(shù)據(jù)被不斷地拋棄。
1.2 相關(guān)定義
定義1. 設(shè)PT為時(shí)間分區(qū)長(zhǎng)度。構(gòu)造壓縮桶Buckcets(BuckcetsID=0…n),壓縮桶有三個(gè)抽屜drawer(drawerID=0…2),每個(gè)抽屜存放流數(shù)據(jù)的時(shí)間長(zhǎng)度為2 BuckcetsID×PT。壓縮桶的結(jié)構(gòu)如圖1所示。其中每個(gè)桶的2號(hào)抽屜是臨時(shí)存儲(chǔ)單元。如果0號(hào)抽屜是空的,則同一個(gè)桶的1號(hào)抽屜也空。
設(shè)i(i=0…n)為壓縮桶的編號(hào), i號(hào)桶中的抽屜存儲(chǔ)流數(shù)據(jù)的時(shí)間長(zhǎng)度為2i×PT。每個(gè)壓縮桶的第0號(hào)與第1號(hào)抽屜存放流數(shù)據(jù),2號(hào)抽屜是臨時(shí)存儲(chǔ)空間,只有當(dāng)這個(gè)桶中的第0號(hào)與第1號(hào)抽屜非空,此時(shí)只能將新流入的流數(shù)據(jù)臨時(shí)存放到2號(hào)抽屜,系統(tǒng)合并此桶的第0號(hào)與第1號(hào)抽屜,并推入下一桶后,新流入到2號(hào)抽屜的流數(shù)據(jù)被轉(zhuǎn)移到同一桶的0號(hào)抽屜。例如,第0號(hào)桶流入第3個(gè)PT時(shí)間長(zhǎng)度的流數(shù)據(jù),而第0號(hào)桶的第0號(hào)與第1號(hào)抽屜已經(jīng)分別存儲(chǔ)了第1個(gè)和第2個(gè)PT時(shí)間長(zhǎng)度,系統(tǒng)壓縮第0號(hào)桶的第0號(hào)與第1號(hào)抽屜,并將流數(shù)據(jù)推入第1號(hào)桶的第0號(hào)抽屜后,第3個(gè)PT時(shí)間長(zhǎng)度的流數(shù)據(jù)才可以流入第0號(hào)桶的第0號(hào)抽屜。也就是說(shuō),桶號(hào)為i的流數(shù)據(jù)來(lái)源于桶號(hào)為i-1的桶,系統(tǒng)壓縮第i-1號(hào)桶的第0號(hào)與第1號(hào)抽屜,并將流數(shù)據(jù)推入第i號(hào)桶的第0號(hào)或第1號(hào)抽屜。壓縮桶間的數(shù)據(jù)壓縮與流動(dòng)示意圖如圖2所示。
圖1壓縮桶的結(jié)構(gòu)
圖2壓縮桶間的數(shù)據(jù)壓縮與流動(dòng)示意圖
引理保存流數(shù)據(jù)的最大時(shí)間長(zhǎng)度為L(zhǎng)ongTime,MaxBCount為保存LongTime時(shí)長(zhǎng)的流數(shù)據(jù)所需壓縮桶的數(shù)量。則
⑴
證明:設(shè)m個(gè)桶最多可以存儲(chǔ)流數(shù)據(jù)的時(shí)間長(zhǎng)度為MaxT(m),則
MaxT=(2×20+2×21+…+2×2m) ×PT
所以MaxT(m)=(2m+1-2)×PT
設(shè)m-1個(gè)桶最多可以存儲(chǔ)流數(shù)據(jù)的時(shí)間長(zhǎng)度為MaxT(m-1),則
當(dāng)時(shí)間長(zhǎng)度LongTime滿足:
MaxT(m-1)<LongTime≤MaxT(m)
則存儲(chǔ)時(shí)間長(zhǎng)度LongTime的流數(shù)據(jù)至少需要m個(gè)桶。所以:
證畢。
1.3 數(shù)據(jù)流壓縮算法
以商業(yè)零售實(shí)際業(yè)務(wù)數(shù)據(jù)流為例,本文將探索針對(duì)數(shù)據(jù)流的聚集查詢與壓縮方法。商業(yè)零售數(shù)據(jù)流結(jié)構(gòu)如下:sale(ProductID,OrderQty),sale是超市商業(yè)零售數(shù)據(jù)流,ProductID表示產(chǎn)品編號(hào),OrderQty表示訂貨量。用戶根據(jù)需求提交各類查詢,并請(qǐng)求實(shí)時(shí)獲得各類查詢結(jié)果。例如,系統(tǒng)根據(jù)用戶提交的產(chǎn)品號(hào) ProductID,選擇相關(guān)產(chǎn)品進(jìn)行壓縮。
定義初始數(shù)據(jù)流結(jié)構(gòu):
Datasourse(timestamp;productID;orderqty),timestamp記錄了流數(shù)
據(jù)到達(dá)的時(shí)間點(diǎn)。
定義壓縮后數(shù)據(jù)流的數(shù)據(jù)結(jié)構(gòu):
Compresssourse(starttime; productID; maxorderqty;minorderqty;sumorderqty; countorderqty),starttime表示壓縮的初
始時(shí)間; maxorderqty表示訂貨量的最大值;minorderqty表示訂
貨量的最小值;sumorderqty表示訂貨量的總和; countorderqty表
示訂貨次數(shù)。
算法1:數(shù)據(jù)流壓縮算法
輸入:初始數(shù)據(jù)流。
輸出:經(jīng)過(guò)壓縮后的數(shù)據(jù)流存儲(chǔ)在桶中,每個(gè)抽屜存儲(chǔ)壓縮后的數(shù)據(jù)流。
定義桶的數(shù)據(jù)結(jié)構(gòu):
public struct buckets
{public compresssourse drawer0;
public compresssourse drawer1;
public compresssourse drawer2;}
根據(jù)需存儲(chǔ)的最大時(shí)間長(zhǎng)度,計(jì)算需要的桶數(shù)MAXBcount;
定義桶DataS:
buckets[] DataS = new buckets[MAXBcount];
初始化桶中的所有抽屜;定義記錄時(shí)間長(zhǎng)度的變量feng;定義時(shí)間分區(qū)PT;
While(true)
{根據(jù)用戶提交查詢的產(chǎn)品號(hào)ProductID 獲取原始數(shù)據(jù)流;
獲得產(chǎn)生數(shù)據(jù)流的當(dāng)前時(shí)間;
if(接收的是第一個(gè)流數(shù)據(jù))
{壓縮后直接推入0號(hào)桶2號(hào)抽屜,它的starttime為被推入流數(shù)據(jù)的
timestamp。接著進(jìn)入下一循環(huán)等待下一個(gè)流數(shù)據(jù);}
計(jì)算新流入的流數(shù)據(jù)的timestamp與0號(hào)桶2號(hào)抽屜的starttime相隔時(shí)間feng:
if (是同一個(gè)時(shí)間分區(qū)feng < PT)
{壓縮同一時(shí)間分區(qū)內(nèi)的數(shù)據(jù)到0號(hào)桶2號(hào)抽屜;
回到循環(huán)開(kāi)頭,繼續(xù)讀下一個(gè)數(shù)據(jù);
continue;}
else
{ 記錄當(dāng)前桶號(hào)碼;
while (DBcount < MAXBcount)
{if (桶0號(hào)抽屜有空)
{將桶2號(hào)抽屜的數(shù)據(jù)移到桶0號(hào)抽屜;
break;}
else
{if (桶1號(hào)抽屜有空)
{將桶2號(hào)抽屜的數(shù)據(jù)移到桶1號(hào)抽屜;
break; }
else
{if (不是最后一桶)
{將該桶的0號(hào)與1號(hào)抽屜合并后放入下一桶中的2號(hào)抽屜;
該桶的0號(hào)與1號(hào)抽屜變空;}
else
{丟棄該桶的0號(hào)與1號(hào)抽屜;}
合并后,該桶0號(hào)抽屜空出來(lái),放入該桶2號(hào)抽屜的流數(shù)據(jù); }}}
if (0號(hào)桶2號(hào)抽屜空))
{將新讀入的數(shù)據(jù)放入0桶2號(hào)抽屜;
重新設(shè)置starttime;}
else {break;}}}
2 獲得壓縮桶狀態(tài)的數(shù)學(xué)方法
當(dāng)用戶向系統(tǒng)提出面向數(shù)據(jù)流的查詢請(qǐng)求時(shí),系統(tǒng)首先判斷流數(shù)據(jù)被壓縮到哪些桶中,而壓縮流數(shù)據(jù)存儲(chǔ)了最大值、總和等聚集值,使得用戶獲得聚集值變得非常方便。
在壓縮過(guò)程的任意時(shí)刻,用戶均可能提出獲得流數(shù)據(jù)的聚集值,這要求系統(tǒng)能夠迅速判斷各個(gè)桶的狀態(tài),也就是每個(gè)桶中的0號(hào)抽屜或1號(hào)抽屜是否存儲(chǔ)了壓縮數(shù)據(jù)。
假設(shè)j為最后流入桶中的時(shí)間分區(qū)流數(shù)據(jù),求每個(gè)桶中含有數(shù)據(jù)的抽屜數(shù)。存儲(chǔ)第j個(gè)時(shí)間分區(qū),需要BCount個(gè)桶。則:
,⑵
如果,BCount大于MaxBCount,則從MaxBCount+1到BCount號(hào)桶的流數(shù)據(jù)被丟棄。所以,
,j∈N+。
則,
,⑶
其中ai的取值僅為0或1,表示第i個(gè)桶中有ai+1個(gè)抽屜有流數(shù)據(jù)。ai=0表示0號(hào)抽屜存儲(chǔ)了壓縮流數(shù)據(jù),ai=1表示0號(hào)與1號(hào)抽屜存儲(chǔ)了壓縮流數(shù)據(jù)。
例如j=33,表示持續(xù)流入數(shù)據(jù)流的時(shí)間長(zhǎng)度為33×PT個(gè)時(shí)間長(zhǎng)度。根據(jù)公式⑵,此時(shí)需要的桶數(shù)為5。根據(jù)公式⑶,得到33-25+1=2。則2=0×20+1×21+0×21+0×21+0×21,由此,我們可以得到壓縮桶的狀態(tài)為0號(hào)桶、2號(hào)桶、3號(hào)桶、4號(hào)桶的0號(hào)抽屜存儲(chǔ)了壓縮數(shù)據(jù),1號(hào)桶的0號(hào)與1號(hào)抽屜存儲(chǔ)了壓縮數(shù)據(jù)。
3 結(jié)束語(yǔ)
本文提出了在時(shí)間維度上壓縮數(shù)據(jù)流的方法:不斷流入壓縮桶的流數(shù)據(jù)被不斷地以2為底的對(duì)數(shù)尺度進(jìn)行壓縮。實(shí)驗(yàn)表明,壓縮桶結(jié)構(gòu)在滿足了壓縮數(shù)據(jù)的存儲(chǔ)需求的同時(shí),大大減少了存儲(chǔ)空間,桶中的壓縮數(shù)據(jù)能夠隨著時(shí)間不斷地更新,基于時(shí)間傾斜的數(shù)據(jù)流壓縮算法能夠提高數(shù)據(jù)流的壓縮速度。能夠滿足數(shù)據(jù)流聚集查詢的實(shí)時(shí)性要求,也能夠提高數(shù)據(jù)流動(dòng)態(tài)聚集查詢的效率及靈活性。
參考文獻(xiàn):
[1] Dobra A,Garofalakis M,Gehrke J,et a1.Processing Complex Aggregate Queries over Data Streams[C].Proceedings of the 2002ACM SIGMOD International Conference on Management of Data,M acIison.W isconsin.2002.
[2] Gilbert A C,Kotidis M uthukrishnan S M ,et a1.Surfing Wavelets on Streams: One—pass Summaries for Approximate Aggregate Queries[C] .Proceedings of the 27th International conference on Very Large Data Bases.2001
[3] Madden S R,Franklin M J,Hellerstein J M ,et a1.TAG :A Tiny Aggregation Service for Ad—hoc Sensor Networks[C] .Proc.of the 5thSymp.on Operating Systems Design and Implementation,Boston,USA 2002.
[4] Madden S R.Szewczyk R.Franklin M J.et a1.Supporting Aggregate Queries Over Ad—hoc Wireless Sensor Networks[C].Proceedings of the Workshop on Mobile Computing and Systems Applications.Los Alamitos:IEEE Computer Press.2002.
[5] Ahnad Y,Berg B,Cetintemel U,et a1.Distributed operation in the borealis stream processing engine[C].Proc of ACM SIGMOD Conference.Baltimore:[s.n.],2005:882~884
[6] 詹英,吳春明,王寶軍.一種與緩沖區(qū)緊耦合的環(huán)形循環(huán)滑動(dòng)窗口的數(shù)據(jù)流抽取算法[J].電子學(xué)報(bào),2011.39(4):2262~2267
[7] 傅鸝,魯先志,蔡斌.一種基于數(shù)據(jù)流驅(qū)動(dòng)的數(shù)據(jù)流連續(xù)查詢模型[J].重慶工學(xué)院學(xué)報(bào)(自然科學(xué)),2008.22(10)
[8] Jiawei Han,Micheline Kamber.Data Mining Concepts and Techniques[M].China Machine Press.
[9] 張冬冬,李建中,王偉平,等.分布式復(fù)式數(shù)據(jù)流的處理[J].計(jì)算機(jī)研究與發(fā)展,2004.41(10):1780~1785
[10] Li Jian -zhong,Guo Long-jiang,Zhang Dong-dong,et a1.Processing algorithms or predictive aggregate queries over data streams[J].Journal of Software,2005.16(7):1251~1261