余勝輝,李玲娟
(南京郵電大學(xué) 計(jì)算機(jī)學(xué)院,江蘇 南京 210023)
大數(shù)據(jù)時(shí)代的來(lái)臨導(dǎo)致行業(yè)領(lǐng)域產(chǎn)生的信息數(shù)據(jù)呈爆炸式的增長(zhǎng)。對(duì)海量數(shù)據(jù)的挖掘以及利用成為了當(dāng)下最熱門(mén)的重點(diǎn)研究課題之一。聚類方法是數(shù)據(jù)挖掘領(lǐng)域中最為重要的方法之一,是一種無(wú)監(jiān)督的機(jī)器學(xué)習(xí)算法,能夠?qū)?shù)據(jù)對(duì)象中具有相同屬性的數(shù)據(jù)劃分為若干個(gè)子集,每個(gè)子集形成一個(gè)簇,同一個(gè)簇中的數(shù)據(jù)具有相似的特征,不同簇中的數(shù)據(jù)相異[1]。層次聚類算法是聚類算法中最常見(jiàn)的一類方法,“類內(nèi)的點(diǎn)都足夠近,類間的點(diǎn)都足夠遠(yuǎn)”[2]是其最重要的判斷標(biāo)準(zhǔn)。CURE算法是一種典型的層次聚類算法,該算法簡(jiǎn)單、速度快,對(duì)大數(shù)據(jù)集有較高的聚類效率和可伸縮性,時(shí)間復(fù)雜度近于線性,適合對(duì)大規(guī)模數(shù)據(jù)集進(jìn)行挖掘[3-4]。Spark[5-6]是專門(mén)為大規(guī)模數(shù)據(jù)處理設(shè)計(jì)的快速通用計(jì)算引擎,是以Hadoop MapReduce為基礎(chǔ)框架的一個(gè)新的開(kāi)源平臺(tái)。Spark支持交互式計(jì)算和復(fù)雜算法,速度很快;具有很高的通用性;支持多種資源管理器。為了進(jìn)一步提升CURE層次聚類算法的效率和可伸縮性,文中基于Spark平臺(tái)對(duì)其進(jìn)行并行化,并通過(guò)對(duì)不同的數(shù)據(jù)集的聚類實(shí)驗(yàn)來(lái)證明該并行化方案的有效性。
CURE是一種基于取樣和代表點(diǎn)的層次聚類算法,它采用迭代的方式,自底向上地合并兩個(gè)距離最近的簇[7]。CURE算法在傳統(tǒng)的劃分聚類算法受異常數(shù)據(jù)比較敏感這一特點(diǎn)上能夠得到很好的解決。CURE算法在計(jì)算過(guò)程中將每一類當(dāng)成一個(gè)點(diǎn),用這個(gè)點(diǎn)代表其中一個(gè)類型的數(shù)據(jù),然后計(jì)算一個(gè)類與另一個(gè)類之間的距離,最相似的兩個(gè)點(diǎn)進(jìn)行合并,不需要計(jì)算類內(nèi)每個(gè)數(shù)據(jù)點(diǎn)之間的距離,這樣計(jì)算有利于減少運(yùn)算量,提高效率。但此算法的復(fù)雜度較高,運(yùn)行時(shí)所消耗的資源較多。CURE算法的具體特征及思想主要體現(xiàn)在如下幾個(gè)方面[8-9]:
(1)最初,每一個(gè)對(duì)象就是一個(gè)獨(dú)立的類,然后從最相似的對(duì)象開(kāi)始進(jìn)行合并。
(2)為處理大數(shù)據(jù)集,采用了隨機(jī)抽樣和分割手段。采用抽樣的方法可降低數(shù)據(jù)量,提高算法效率。一般能得到比較好的聚類結(jié)果。分割手段,即樣本分割為幾個(gè)部分,然后針對(duì)各個(gè)部分中的對(duì)象分別進(jìn)行局部聚類,形成子類。再針對(duì)子類進(jìn)行聚類,形成新的類。
(3)CURE算法由分散的若干對(duì)象按收縮因子移向其所在類的中心之后來(lái)代表該類。由于CURE算法采用多個(gè)對(duì)象代表一個(gè)類,并通過(guò)收縮因子類調(diào)節(jié)類的形狀,因此能夠處理非球形的對(duì)象分布。
(4)消除異常值可以分兩個(gè)階段進(jìn)行,第一個(gè)階段的工作,是將聚類過(guò)程中增長(zhǎng)非常緩慢的類作為異常值除去,第二個(gè)階段的工作(聚類基本結(jié)束的時(shí)候)是將數(shù)目明顯少的類作為異常值除去。
CURE的具體操作流程如下[10]:
(1)從原始數(shù)據(jù)中隨機(jī)抽取樣本,得數(shù)據(jù)集S。
(2)對(duì)S進(jìn)行分區(qū),對(duì)每個(gè)分區(qū)分別進(jìn)行聚類。
(3)如果一個(gè)類增長(zhǎng)很慢,說(shuō)明它是噪聲要去除。
(4)將局部的類進(jìn)行聚類。
(5)對(duì)原始數(shù)據(jù)進(jìn)行標(biāo)記。
Spark是為了提高計(jì)算速度、易用性和復(fù)雜分析而構(gòu)建的大數(shù)據(jù)處理框架。為了提高Spark處理大量數(shù)據(jù)的實(shí)時(shí)性,因此計(jì)算都是基于內(nèi)存計(jì)算的,Spark的集群一般都是部署在很多廉價(jià)的硬件上的,這種部署對(duì)容錯(cuò)性和可伸縮性都能得到保障。
(1)Spark與傳統(tǒng)大數(shù)據(jù)平臺(tái)的對(duì)比。
在Spark之前已有Hadoop和Strom等大數(shù)據(jù)平臺(tái),這些大數(shù)據(jù)平臺(tái)都是基于MapReduce[11]技術(shù),與之相比Spark具有十分明顯的優(yōu)勢(shì)[12]:
①Spark底層通過(guò)開(kāi)辟了線程池,利用了線程池能夠復(fù)用線程,不必反復(fù)創(chuàng)建銷毀線程的優(yōu)勢(shì),能夠減少資源,大大減少了任務(wù)調(diào)度的開(kāi)銷。
②Spark中的RDD可以理解為一個(gè)集合,可以在上面進(jìn)行計(jì)算,傳統(tǒng)數(shù)據(jù)模型的容錯(cuò),可伸縮性等特點(diǎn)RDD都具有,也可將數(shù)據(jù)緩存,方便后續(xù)的重復(fù)使用。
③Spark在每一輪計(jì)算過(guò)程中都會(huì)得到一個(gè)中間結(jié)果,因此可以將每一輪的輸出結(jié)果在內(nèi)存緩存,在后續(xù)的運(yùn)算需要時(shí),可以直接從緩存中對(duì)數(shù)據(jù)進(jìn)行讀取,代替了從HDFS中讀取數(shù)據(jù),從而避免了大量耗時(shí)的磁盤(pán)I/O操作,直接從內(nèi)存里進(jìn)行讀取,對(duì)運(yùn)行速度有很大的提升[13]。
(2)Spark的組成。
Spark主要包含以下幾個(gè)部分[5]:
①Spark Core:這是Spark最為核心的一個(gè)部分,是一個(gè)通用分布式數(shù)據(jù)引擎,它自成一個(gè)體系,可適應(yīng)在任何商用服務(wù)器集群上。對(duì)Spark一些基礎(chǔ)功能進(jìn)行了定義,尤其是RDD的API,操作及這兩者的動(dòng)作。
②Spark SQL:是提供在大數(shù)據(jù)上的SQL查詢語(yǔ)句,這一功能擴(kuò)充了SQL算子,豐富了Spark的算子和功能。
③Spark Streaming:利用Spark Core的快速調(diào)度能力來(lái)執(zhí)行流數(shù)據(jù)。它以最小批次獲取數(shù)據(jù),并對(duì)批次上的數(shù)據(jù)執(zhí)行RDD轉(zhuǎn)化,其核心是采用微批處理引擎,支持多種數(shù)據(jù)源的導(dǎo)入,可根據(jù)程序配置的時(shí)間,將數(shù)據(jù)打成一個(gè)RDD,發(fā)送給Spark Core進(jìn)行處理。
④MLib:一個(gè)常用算法庫(kù),包含了幾種常見(jiàn)的機(jī)器學(xué)習(xí)算法。
(3)Spark的架構(gòu)。
與傳統(tǒng)經(jīng)典的分布式計(jì)算架構(gòu)模型類似,Spark沿用了傳統(tǒng)的架構(gòu)模式:Master-Slave模型[14],如圖1所示,Master對(duì)應(yīng)ClusterManger進(jìn)程的節(jié)點(diǎn),能夠控制整個(gè)集群的運(yùn)行,是集群中最不可缺少的部分。Slave對(duì)應(yīng)著Worker進(jìn)程的節(jié)點(diǎn),它的作用是對(duì)現(xiàn)在的形態(tài)向上匯報(bào)和接受特定的命令和向下進(jìn)行分發(fā)具體的任務(wù),Executor是執(zhí)行任務(wù)的具體部分進(jìn)行運(yùn)算,Client是與用戶進(jìn)行交互的界面提供了可視化的功能,同時(shí)也負(fù)責(zé)任務(wù)的提交。Driver相當(dāng)于處理中心,進(jìn)行對(duì)應(yīng)用的執(zhí)行。當(dāng)新建一個(gè)任務(wù)被提交上來(lái)之后,Driver節(jié)點(diǎn)會(huì)創(chuàng)建一個(gè)SparkContext,由SparkContext向資源管理器(ClusterManger)申請(qǐng)資源,資源分配完畢后Spark會(huì)啟動(dòng)Worker上負(fù)責(zé)執(zhí)行具體任務(wù)的進(jìn)程Executor,并把任務(wù)分給Executor,計(jì)算完成后Worker會(huì)將結(jié)果發(fā)回Driver,然后釋放資源[15]。這就是一次具體的任務(wù)提交處理流程。
圖1 Spark架構(gòu)
基于上述的分析,CURE算法具有適合處理海量數(shù)據(jù)集,需要對(duì)數(shù)據(jù)進(jìn)行分區(qū)處理,以及需要對(duì)數(shù)據(jù)進(jìn)行反復(fù)迭代等特點(diǎn),這與Spark大數(shù)據(jù)計(jì)算框架RDD的特性十分相契合。文中正好通過(guò)此特性,實(shí)現(xiàn)CURE算法基于Spark的并行化。
CURE算法基于Spark的并行化步驟如下:
(1)Spark的配置與數(shù)據(jù)源的讀取。
構(gòu)建Spark Application的運(yùn)行環(huán)境后,配置文件會(huì)被Spark的驅(qū)動(dòng)器程序自行加載,首先會(huì)生成一個(gè)SparkConf對(duì)象,之后通過(guò)該對(duì)象又會(huì)自動(dòng)生成一個(gè)新的SparkContext,Exctuor資源就是通過(guò)SparkContext向資源管理器注冊(cè)并申請(qǐng)運(yùn)行的。通過(guò)上述的分析,得知Spark計(jì)算流程實(shí)際上是將待處理的數(shù)據(jù)集讀取后轉(zhuǎn)換成為源RDD,然后通過(guò)Spark Core提供的transformation算子,對(duì)該RDD進(jìn)行轉(zhuǎn)換來(lái)獲取新的RDD,最后調(diào)用Action操作求得結(jié)果值。使用HDFS文件創(chuàng)建RDD是最常用的方式,可以針對(duì)HDFS上存儲(chǔ)的大數(shù)據(jù),進(jìn)行離線批處理操作。在RDD計(jì)算過(guò)程中,每個(gè)分區(qū)都會(huì)運(yùn)行一個(gè)task,所以RDD的分區(qū)數(shù)目決定了總的task數(shù)目,在Spark集群資源有限的條件下,需要對(duì)RDD進(jìn)行合理的分區(qū),分區(qū)太多或太少都會(huì)導(dǎo)致計(jì)算效率的降低。
(2)CURE算法并行化執(zhí)行的流程。
Step1:對(duì)原始數(shù)據(jù)進(jìn)行分區(qū),隨機(jī)分成N個(gè)數(shù)據(jù)片,將數(shù)據(jù)讀入HDFS轉(zhuǎn)化成DataFrame。相比于RDD,DataFram能提供更為詳細(xì)的結(jié)構(gòu)信息,使得Spark SQL可以清楚地知道數(shù)據(jù)的詳細(xì)信息。DataFrame相比RDD不僅提供了更豐富的算子,最重要的是可提升執(zhí)行效率。
Step2:采用收縮因子的方法為每個(gè)數(shù)據(jù)塊選取代表點(diǎn),通過(guò)mapPartiotions對(duì)N個(gè)數(shù)據(jù)片進(jìn)行MapReduce聚類計(jì)算,計(jì)算出中心點(diǎn)。為了提升聚類結(jié)果的準(zhǔn)確度有兩種刪除異常點(diǎn)的操作:①如果出現(xiàn)兩次及以上的聚類計(jì)算則會(huì)統(tǒng)計(jì)該類的增長(zhǎng)速度,發(fā)現(xiàn)增長(zhǎng)速度慢的類則當(dāng)作噪聲去除;②當(dāng)類內(nèi)的數(shù)目低于某一個(gè)閾值時(shí)也會(huì)進(jìn)行刪除,因?yàn)橛锌赡芮『幂^近的幾個(gè)異常點(diǎn)在同一個(gè)類中。
Step3:對(duì)上一步聚類產(chǎn)生的類,通過(guò)拉格朗日距離來(lái)計(jì)算它們之間的中心點(diǎn)距離,并將距離最小的兩個(gè)類合并,合并之后得到一個(gè)新的類。
Step4:對(duì)新的類重新進(jìn)行MapReduce計(jì)算,得到新類的中心點(diǎn)以及代表點(diǎn)。
Step5:判斷聚類后的個(gè)數(shù)是否滿足設(shè)定的聚類數(shù),如果不滿足則繼續(xù)重復(fù)Step2,如果滿足則得到最終聚類結(jié)果。
CURE算法基于Spark的并行化流程如圖2所示。
圖2 CURE算法并行化流程
CURE算法本身在不確定的數(shù)據(jù)上的計(jì)算時(shí)間消耗可能會(huì)非常大,在上述執(zhí)行過(guò)程中通過(guò)Spark平臺(tái)MapReduce以及緩存特性,執(zhí)行器會(huì)將需要緩存的數(shù)據(jù)緩存到內(nèi)存中,調(diào)用驅(qū)動(dòng)器節(jié)點(diǎn)對(duì)需要處理的數(shù)據(jù)進(jìn)行再次計(jì)算,當(dāng)達(dá)到終止條件時(shí)會(huì)將計(jì)算好的結(jié)果存儲(chǔ)在HDFS中。Spark的核心就是RDD是分發(fā)到不同executer節(jié)點(diǎn)上進(jìn)行并行計(jì)算,并將中間結(jié)果緩存到內(nèi)存中,對(duì)需要大量迭代重復(fù)計(jì)算的CURE算法在時(shí)間效率上有很大的提升。
為了驗(yàn)證和分析基于Spark的并行化CURE算法聚類結(jié)果的準(zhǔn)確性與時(shí)間效率,設(shè)計(jì)了以下實(shí)驗(yàn),分別在單機(jī)和Spark上對(duì)CURE算法對(duì)相同大小的實(shí)驗(yàn)數(shù)據(jù)集進(jìn)行聚類操作,比較對(duì)多種數(shù)據(jù)集在不同運(yùn)行模式下的聚類準(zhǔn)確率和時(shí)間效率。
(1)實(shí)驗(yàn)環(huán)境和實(shí)驗(yàn)數(shù)據(jù)。
實(shí)驗(yàn)環(huán)境如下:共搭建了包括1臺(tái)驅(qū)動(dòng)器節(jié)點(diǎn),2臺(tái)執(zhí)行器節(jié)點(diǎn)的Spark。節(jié)點(diǎn)的CPU是Intel CORE i5-7440H,每個(gè)節(jié)點(diǎn)配有2個(gè)處理器,硬盤(pán)數(shù)據(jù)讀寫(xiě)速度為600.00 MB/s,驅(qū)動(dòng)器節(jié)點(diǎn)的內(nèi)存大小為6 G,執(zhí)行器節(jié)點(diǎn)的內(nèi)存大小為4 G。操作系統(tǒng)是centos6.5;Java版本是JDK1.8.0_144;Spark版本為1.6.0;Scala版本為2.11.0。實(shí)驗(yàn)分別采用了UCI實(shí)驗(yàn)室提供的Bag of Words數(shù)據(jù)集,該數(shù)據(jù)集包含五個(gè)文字集合,八百萬(wàn)個(gè)實(shí)例數(shù);YouTube MuLtiview Video Games Dataset數(shù)據(jù)集,該數(shù)據(jù)集包含大約120 000個(gè)實(shí)例,每個(gè)實(shí)例由13種要素類型描述;Plants數(shù)據(jù)集,該數(shù)據(jù)集包含了70個(gè)屬性,22 632個(gè)實(shí)例數(shù)。
(2)聚類準(zhǔn)確率對(duì)比實(shí)驗(yàn)。
通過(guò)CURE算法對(duì)上述的數(shù)據(jù)集進(jìn)行聚類,聚類準(zhǔn)確率如表1表示。
表1 不同數(shù)據(jù)集上CURE算法的聚類準(zhǔn)確率
由表1可以看出,對(duì)于三個(gè)數(shù)據(jù)集,基于Spark的并行CURE算法相對(duì)于傳統(tǒng)單機(jī)運(yùn)行模式的CURE算法的聚類準(zhǔn)確率略微有所提高。可以看出分布式數(shù)據(jù)處理對(duì)結(jié)果的精準(zhǔn)度和穩(wěn)定性有較好的保障。
(3)時(shí)間效率對(duì)比實(shí)驗(yàn)的聚類時(shí)耗。
圖3 單機(jī)和基于Spark平臺(tái)的CURE算法聚類時(shí)間對(duì)比
由圖3可知,在開(kāi)始階段數(shù)據(jù)量比較少,兩種不同環(huán)境的CURE聚類運(yùn)行所消耗的時(shí)間相差不大,因?yàn)镾park啟動(dòng)消耗了大量的時(shí)間,在數(shù)據(jù)量較少的情況下無(wú)法體現(xiàn)其在時(shí)間方面的優(yōu)勢(shì)。伴隨著數(shù)據(jù)量增大,基于Spark的并行化計(jì)算所消耗的時(shí)間明顯少于傳統(tǒng)的單機(jī)計(jì)算方式。因此,在處理大量數(shù)據(jù)的情況下Spark的并行化計(jì)算具有良好的時(shí)效性。
主要研究了CURE算法基于Spark平臺(tái)的并行化實(shí)現(xiàn)方案,給出了通過(guò)將數(shù)據(jù)集分區(qū)后分發(fā)給多個(gè)子節(jié)點(diǎn)和進(jìn)行RDD轉(zhuǎn)換來(lái)實(shí)現(xiàn)CURE算法的并行化步驟;通過(guò)在兩種運(yùn)行模式和三種公開(kāi)數(shù)據(jù)集上的聚類實(shí)驗(yàn),驗(yàn)證了CURE算法基于Spark的并行化運(yùn)行比傳統(tǒng)的單機(jī)運(yùn)行的聚類準(zhǔn)確率略有提升,且隨著聚類數(shù)據(jù)量的增加,基于Spark平臺(tái)的并行化CURE算法展示了更好的時(shí)效性。