許德心,李玲娟
(南京郵電大學(xué) 計算機(jī)學(xué)院,江蘇 南京 210023)
隨著信息技術(shù)的迅猛發(fā)展,數(shù)據(jù)量急劇增加。如何從海量的隨機(jī)數(shù)據(jù)中挖掘出有價值的信息,已成為一個必須面對的課題,數(shù)據(jù)挖掘技術(shù)由此誕生。數(shù)據(jù)挖掘是從大量的、不完全的、有噪聲、模糊的、隨機(jī)的實際應(yīng)用數(shù)據(jù)中,提取隱含在其中的、人們事先不知道但又潛在有用的信息和知識的過程[1-2]。而關(guān)聯(lián)規(guī)則挖掘則是數(shù)據(jù)挖掘中一個非常重要的研究課題。關(guān)聯(lián)規(guī)則挖掘的主要任務(wù)分為兩項:從大量數(shù)據(jù)中找到頻繁項集,根據(jù)頻繁項集提取有價值的強(qiáng)關(guān)聯(lián)規(guī)則。其中Apriori算法是其典型代表。該算法簡單準(zhǔn)確,但是面對急速增長的數(shù)據(jù)量,該算法對強(qiáng)關(guān)聯(lián)規(guī)則的提取效率有待提高[3-5]。
Spark作為新一代的大數(shù)據(jù)運算框架,將數(shù)據(jù)載入內(nèi)存,之后的迭代計算可以直接使用內(nèi)存中的中間結(jié)果作運算,避免了從磁盤中頻繁讀取數(shù)據(jù),擁有更高的執(zhí)行效率,很適合運行迭代運算較多的數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)算法。基于此,文中研究了Apriori算法在Spark平臺上的并行化方案,以提高算法提取強(qiáng)關(guān)聯(lián)規(guī)則的準(zhǔn)確性與實效性,并設(shè)計了實驗對該方案的使用效果進(jìn)行了檢驗。
關(guān)聯(lián)規(guī)則一般描述:項集→項集,如X→Y。
支持度(support):表示X,Y同時出現(xiàn)的概率。關(guān)聯(lián)規(guī)則X→Y的支持度可表示為:
(1)
置信度(confidence):表示在X出現(xiàn)的情況下Y也出現(xiàn)的概率。
(2)
其中,δ(X)=|ti|X?ti,ti∈T|,是項集X出現(xiàn)的次數(shù),ti表示某個事務(wù)的標(biāo)識TID,T表示事務(wù)的集合。
頻繁項集:是支持度大于最小支持度閾值的項集。
強(qiáng)關(guān)聯(lián)規(guī)則:頻繁項集中置信度大于最小置信度閾值的關(guān)聯(lián)規(guī)則。
Apriori算法利用逐層搜索迭代的方法找出項集之間的關(guān)系,最終形成規(guī)則。這個過程由連接與剪枝組成,為了降低連接與剪枝的時間復(fù)雜度,需借助以下定理:
定理1:如果一個項集是頻繁的,則其所有的子集也一定是頻繁的。
定理2:如果一個項集是非頻繁的,那么其所有的超集也一定是非頻繁的。
Apriori算法步驟如下:
Step1:設(shè)定最小支持度s和最小置信度c。
Step2:根據(jù)數(shù)據(jù)集產(chǎn)生出候選1-項集,若其支持度大于等于最小支持度,那么它就是頻繁1-項集。
Step3:頻繁1-項集通過連接產(chǎn)生候選2-項集,再通過候選2-項集獲得頻繁2-項集。
Step4:不斷迭代產(chǎn)生下一級候選項集,直到不再產(chǎn)生新的候選項集為止。
Step5:由頻繁項集生成關(guān)聯(lián)規(guī)則。具體過程是:根據(jù)頻繁項集Fk,找出可以出現(xiàn)在規(guī)則右部大小為m的元素列表Hm,如果頻繁項集可以移除大小為m的子集,且對于規(guī)則Fk-Hk→Hm的置信度大于最小置信度閾值,則該條規(guī)則為強(qiáng)關(guān)聯(lián)規(guī)則[6-7]。在這個過程中,同樣可以借助以下定理進(jìn)行剪枝。
下面給出Apriori算法產(chǎn)生頻繁項集及由頻繁項集生成關(guān)聯(lián)規(guī)則的偽代碼,其中Ck為候選K-項集的集合,F(xiàn)k為頻繁K-項集的集合。
產(chǎn)生頻繁項集:
1.K=1
2.Fk={i|i∈I∧δ{i}≥n*minsup|}
//找出頻繁一項集
3.Repeat
4.K=K+1
5.ti
//由頻繁項集生成下一級候選項集
6.for each transactiont∈Tdo
7.Ct=subset(Ck,t)
//確定所有屬于t的候選項集
8.for each candidate itemsetsc∈Ct
9.c∈Ct
//項集c出現(xiàn)的次數(shù)+1
10.end for
11.end for
Fk={c|c∈Ck∧δ(c)≥N*minsup|}
//獲取頻繁K-項集
12.直到頻繁K-項集為空
13.將所有的頻繁項集取并集
生成關(guān)聯(lián)規(guī)則:
1.k=|fk| //頻繁項集的大小
2.m=|Hm| //可以出現(xiàn)在規(guī)則右部元素列表的大小
3.ifk>m+1 then //如果頻繁項集可以移除大小為m的子集
4.Hm+1=apriori-gen(Hm)
//使用apriori-gen函數(shù)生成無重復(fù)組合
5.for eachhm+1∈Hm+1do //對每個元素進(jìn)行處理
6.conf=δ(fk)/δ(fk-hm+1) //得出它們的置信度
7.if conf≥minconf then //如果大于最小置信度閾值
8.output the rule (fk-hm+1)→hm+1//輸出關(guān)聯(lián)規(guī)則
9.else deletehm+1fromHm+1
10.end if
11.end for
12.call ap-genrules(fk,Hm+1)
13.end if
定理3:如果規(guī)則X→Y不滿足置信度閾值,那么對于X的子集X',規(guī)則X'→Y也不滿足置信度閾值。
從Apriori算法的執(zhí)行過程可以看出該算法多次進(jìn)行循環(huán),產(chǎn)生了大量的候選項集,需要多次掃描數(shù)據(jù)庫,這就需要很大的I/O負(fù)載。為此,借助Spark平臺設(shè)計相應(yīng)的并行化方案來解決這個問題。
Spark是由美國UC Berkeley的AMP實驗室開發(fā)的基于內(nèi)存計算的大數(shù)據(jù)并行計算框架,它是MapReduce的拓展,可用于構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序,性能優(yōu)于Hadoop。Spark的設(shè)計遵循“一個軟件棧滿足不同應(yīng)用場景”的理念,逐漸形成了一套完整的生態(tài)系統(tǒng),具有一系列優(yōu)勢。
(1)運行速度快。Spark使用先進(jìn)的DAG(directed acyclic graph,有向無環(huán)圖)執(zhí)行引擎,以支持循環(huán)數(shù)據(jù)流與內(nèi)存計算,基于內(nèi)存的執(zhí)行速度可比Hadoop MapReduce快上百倍,基于磁盤的執(zhí)行速度也能快十倍;
(2)容易使用。Spark支持Scala、Java、Python和R語言進(jìn)行編程,簡潔的API設(shè)計有助于用戶輕松構(gòu)建并行程序,并且可以通過Spark Shell進(jìn)行交互式編程;
(3)通用性。Spark提供了完整而強(qiáng)大的技術(shù)棧,包括SQL查詢、流式計算、機(jī)器學(xué)習(xí)和圖算法組件,這些組件可以無縫整合在同一個應(yīng)用中,足以應(yīng)對復(fù)雜的計算;
(4)運行模式多樣。Spark可運行于獨立的集群模式中,或者運行于Hadoop中,也可運行于Amazon EC2等云環(huán)境中,并且可以訪問HDFS、Cassandra、HBase、Hive等多種數(shù)據(jù)源。
Spark的核心是彈性分布式數(shù)據(jù)集(resilient distributed dataset,RDD)。一個RDD本質(zhì)上是一個只讀的分區(qū)記錄集合,每個RDD可以分成多個分區(qū),并且不同分區(qū)可以被保存到集群中不同的節(jié)點上,從而可以在各節(jié)點進(jìn)行并行計算。RDD提供兩種類型的操作,分別為行動(Action)和轉(zhuǎn)換(Transformation),前者用于計算并指定輸出形式,后者指定RDD之間的依賴關(guān)系。Transformation操作都是延時執(zhí)行的,它只是邏輯操作,不會進(jìn)行具體計算,只有通過Action操作才會將RDD放入內(nèi)存中求得結(jié)果值[8]。在RDD的設(shè)計中,數(shù)據(jù)只讀,不可修改,如果需要修改數(shù)據(jù),必須從父RDD轉(zhuǎn)換到子RDD,由此在不同RDD之間建立了血緣關(guān)系。所以,RDD通過RDD父子血緣關(guān)系重新計算得到丟失的分區(qū)來實現(xiàn)容錯,無需回滾整個系統(tǒng),避免了數(shù)據(jù)復(fù)制的高開銷。此外,數(shù)據(jù)在內(nèi)存中的多個RDD操作之間進(jìn)行傳遞,避免了讀寫磁盤的開銷。
Spark的運行架構(gòu)如圖1所示。
Spark運行架構(gòu)包含集群資源管理器、運行作業(yè)任務(wù)的工作節(jié)點(Worker)、每個應(yīng)用的任務(wù)控制節(jié)點(Driver)和每個工作節(jié)點負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)。當(dāng)一個任務(wù)被用戶提交時,Driver節(jié)點會創(chuàng)建一個SparkContext,由SparkContext向資源管理器(Cluster Manager)申請資源;資源分配完畢后,Spark會啟動Worker上負(fù)責(zé)執(zhí)行具體任務(wù)的進(jìn)程Executor,并會將任務(wù)分發(fā)給Executor;計算完成后,Worker會將結(jié)果發(fā)回Driver,然后釋放相關(guān)資源。Spark的Executor利用多線程來執(zhí)行具體任務(wù),減少任務(wù)的啟動開銷;其中有一個BlockManager存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,當(dāng)進(jìn)行多輪迭代計算時(Apriori算法是典型的例子),可以將中間結(jié)果存到這個存儲模塊里,下次需要時,就可以直接讀該存儲模塊里的數(shù)據(jù),而不需要對HDFS等文件系統(tǒng)讀寫,從而大大減少了I/O的開銷[9-10]。
由以上分析可知,Spark平臺提供的機(jī)制與Apriori算法多次迭代計算的需求十分契合。
文中基于Spark的特性和“分而治之”的思想,通過把事務(wù)數(shù)據(jù)集分發(fā)給多個子節(jié)點和RDD轉(zhuǎn)換來設(shè)計Apriori算法基于Spark的并行化方案,用局部查找頻繁項集、剪枝代替全局操作,避免全局查找產(chǎn)生的內(nèi)存負(fù)擔(dān)。
Apriori算法的并行化步驟如下:
(1)Spark的配置與數(shù)據(jù)源的讀取。
首先將原始的交易數(shù)據(jù)存放在分布式文件系統(tǒng)HDFS上,并將其轉(zhuǎn)化為壓縮矩陣。此時Spark驅(qū)動程序會讀取相關(guān)的配置文件生成SparkConf對象,接著創(chuàng)建SparkContext對象用來連接訪問Spark集群,進(jìn)一步采用textFile算子掃描HDFS上的壓縮矩陣,根據(jù)壓縮矩陣數(shù)據(jù)創(chuàng)建RDD。如前文大數(shù)據(jù)編程模型所述,Spark并行框架計算流程實際上是通過數(shù)據(jù)集轉(zhuǎn)化為待處理RDD,然后根據(jù)待處理RDD進(jìn)行一系列的Transformation操作得到新的RDD,最后調(diào)用Action操作求得結(jié)果值。由于RDD是Spark大數(shù)據(jù)框架的最大特點,也是其高運行效率的重要原因,因此在進(jìn)行計算任務(wù)時,RDD的數(shù)量要與Spark集群為程序分配的計算資源相匹配,否則會導(dǎo)致Spark框架計算效率降低,程序并發(fā)度下降。
(2)Apriori算法的并行化。
基于Spark平臺的Apriori算法分為兩部分:頻繁項集生成和關(guān)聯(lián)規(guī)則提取[11-13]。其中頻繁項集的生成使用MapReduce思想來實現(xiàn),通過Map操作計算候選項集的局部支持度計數(shù),Reduce計算候選項集的全局支持度計數(shù),具體流程如圖2所示。
圖2 Apriori算法基于Spark的并行化流程
Apriori算法并行化實現(xiàn)的關(guān)鍵是迭代調(diào)用Transformation和Action操作,每次迭代中利用上一次的迭代結(jié)果進(jìn)行求解[14]。為了實現(xiàn)并行化,每個Worker節(jié)點通過多線程方式對其RDD分區(qū)中的數(shù)據(jù)使用Apriori算法計算。首先從分布式文件系統(tǒng)中獲取數(shù)據(jù)并創(chuàng)建彈性分布式數(shù)據(jù)集RDD_1,接著在單機(jī)的情況下統(tǒng)計項的個數(shù)來計算頻繁1-項集RDD_2,判斷候選2-項集是否存在,若存在,則對數(shù)據(jù)集使用map和cache操作,將數(shù)據(jù)集緩存到內(nèi)存中,同時進(jìn)入頻繁K-項集循環(huán);在循環(huán)中,根據(jù)Worker節(jié)點上的頻繁1-項集使用reduceByKey算子過濾數(shù)據(jù)集,生成局部候選K-項集RDD_3,并求出局部候選K-項集的支持度計數(shù);RDD_4進(jìn)行連接生成全局支持度計數(shù),產(chǎn)生頻繁K-項集,再判斷候選K+1項集是否存在,重復(fù)迭代,直到候選K+1項集不存在,則頻繁K-項集生成完畢。根據(jù)頻繁項集提取關(guān)聯(lián)規(guī)則,其中置信度大于最小置信度閾值的關(guān)聯(lián)規(guī)則為強(qiáng)關(guān)聯(lián)規(guī)則[15]。
(1)實驗環(huán)境與數(shù)據(jù)。
為了驗證基于Spark的并行化Apriori算法的執(zhí)行效率,設(shè)計了相關(guān)實驗。實驗環(huán)境包含四個節(jié)點,1個Master節(jié)點,3個Worker節(jié)點。CPU版本為Intel Core i7-4710HQ,每個CPU都擁有4個752.3 MB/s的處理單元。Master節(jié)點與Worker節(jié)點內(nèi)存均為4 G。Spark版本為1.6.1;Spark運行的操作系統(tǒng)為CentOS6.5;Java版本為JDK1.8.0_144;Scala版本為2.11.0。
實驗采用數(shù)據(jù)生成器生成的數(shù)據(jù)集,大小為4 MB,模擬了網(wǎng)上商城共10 W次交易記錄,將該數(shù)據(jù)集分為2 W、4 W、6 W、8 W、10 W進(jìn)行多次實驗,以確保實驗結(jié)果的準(zhǔn)確性。這些數(shù)據(jù)集中的強(qiáng)關(guān)聯(lián)規(guī)則數(shù)量分別是:3 150,5 718,9 463,12 620,15 780。
使用的軟件為IDEA+Spark,用Scala語言進(jìn)行算法實現(xiàn)。
(2)實驗結(jié)果與分析。
對數(shù)據(jù)進(jìn)行采樣,將支持度置為0.8,做了兩組實驗,統(tǒng)計運行時間。
實驗一:對比在單機(jī)Apriori算法和集群上并行化Apriori算法的執(zhí)行速度,以及兩者對強(qiáng)關(guān)聯(lián)規(guī)則挖掘的準(zhǔn)確性。結(jié)果分別如圖3和表1所示。
由實驗結(jié)果可以看出,Spark集群上Apriori算法實施挖掘所需的時間明顯少于單機(jī)所需的時間;同時,集群上挖掘出的強(qiáng)關(guān)聯(lián)規(guī)則與單機(jī)上挖掘出的強(qiáng)關(guān)聯(lián)規(guī)則保持一致,說明該并行化方案在保證準(zhǔn)確性的前提下,具有良好的時效性。
圖3 單機(jī)和Spark集群上的時間效率對比
數(shù)據(jù)量單機(jī)強(qiáng)關(guān)聯(lián)規(guī)則數(shù)集群強(qiáng)關(guān)聯(lián)規(guī)則數(shù)2 W3 1503 1504 W5 7185 7186 W9 4639 4638 W12 62012 62010 W15 78015 780
實驗二:測試并行化Apriori算法運行時間隨節(jié)點增加的變化情況。用了10 W條數(shù)據(jù)集,分別測量節(jié)點數(shù)為1、2、3、4時的Apriori算法產(chǎn)生強(qiáng)關(guān)聯(lián)規(guī)則的時間,結(jié)果如圖4所示。
圖4 不同節(jié)點數(shù)下并行化Apriori算法的執(zhí)行時間
由圖4可知,隨著節(jié)點個數(shù)增多,算法執(zhí)行時間不斷縮短。另外,得出的強(qiáng)關(guān)聯(lián)規(guī)則數(shù)是15 780,因此節(jié)點的個數(shù)并不影響算法的準(zhǔn)確率。
以提高關(guān)聯(lián)規(guī)則挖掘的時效性為目標(biāo),設(shè)計了一種Apriori算法基于Spark集群的并行化方案,并用數(shù)據(jù)生成器生成的數(shù)據(jù)集對算法的執(zhí)行效率進(jìn)行了驗證。實驗結(jié)果表明,基于Spark的并行化Apriori算法在面對大量數(shù)據(jù)的情況下,具有時間效率上的明顯優(yōu)勢,并且隨著執(zhí)行節(jié)點的增加,并行化效果更好。