王珣
(陜西學(xué)前師范學(xué)院 教學(xué)設(shè)備與實(shí)驗(yàn)室管理處, 西安 710061)
基于Spark平臺(tái)的大數(shù)據(jù)挖掘技術(shù)研究
王珣
(陜西學(xué)前師范學(xué)院 教學(xué)設(shè)備與實(shí)驗(yàn)室管理處, 西安 710061)
大數(shù)據(jù)具備數(shù)據(jù)量大、富于多樣性的特點(diǎn)。因此在大數(shù)據(jù)分析方面,無(wú)論是對(duì)處理速度還是實(shí)時(shí)性都具有較高的要求。數(shù)據(jù)挖掘技術(shù)是從海量數(shù)據(jù)里采用某種建模算法,用來(lái)尋找隱藏在數(shù)據(jù)背后的信息,從而讓大數(shù)據(jù)產(chǎn)生更大的價(jià)值。Spark框架是一個(gè)針對(duì)超大數(shù)據(jù)集合的低延遲的集群分布式計(jì)算系統(tǒng)。本文基于該框架,對(duì)大數(shù)據(jù)挖掘技術(shù)進(jìn)行了具體研究,首先完成了基于Yarn部署上Spark集群搭建,然后提出并實(shí)現(xiàn)了并行Apriori算法,該算法成功補(bǔ)充了Spark MLlib分布式機(jī)器學(xué)習(xí)庫(kù)中所缺乏的關(guān)聯(lián)分析問(wèn)題的分布式算法。
大數(shù)據(jù); 數(shù)據(jù)挖掘; Spark
如今我國(guó)已進(jìn)入大數(shù)據(jù)時(shí)代,每年都會(huì)產(chǎn)生海量的數(shù)據(jù)。預(yù)計(jì)到2020年,我們每年產(chǎn)生的數(shù)據(jù)總量將超過(guò)8.5ZB[1],涉及到金融,互聯(lián)網(wǎng),醫(yī)療等各個(gè)領(lǐng)域,這必然對(duì)大數(shù)據(jù)的挖掘和分析提出了更嚴(yán)峻的挑戰(zhàn)。因此,在大數(shù)據(jù)時(shí)代,要進(jìn)行各行各業(yè)、各種應(yīng)用場(chǎng)景的天量數(shù)據(jù)挖掘,必然需要一個(gè)效率高、結(jié)果準(zhǔn)確的計(jì)算平臺(tái)來(lái)進(jìn)行處理。
Spark由加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室出品,它是一個(gè)開(kāi)源的計(jì)算框架,該框架可用于處理大數(shù)據(jù)的高性能分布式并行計(jì)算。Spark的主要優(yōu)點(diǎn)在于:支持Python、Java等多語(yǔ)言編程,使用方便;可在大數(shù)據(jù)集上進(jìn)行多種復(fù)雜查詢(xún);兼容性強(qiáng),可以兼容Yarn,Mesos等多個(gè)框架;處理快速且結(jié)果精確。目前,Spark已經(jīng)被應(yīng)用或?qū)⒁粦?yīng)用到國(guó)內(nèi)外很多公司的各類(lèi)應(yīng)用場(chǎng)景中。MLlib,作為Spark平臺(tái)中的分布式機(jī)器學(xué)習(xí)庫(kù),承擔(dān)著對(duì)機(jī)器學(xué)習(xí)算法實(shí)現(xiàn)的功能。它在經(jīng)歷過(guò)歷次擴(kuò)充改進(jìn)后,正逐步完善。然而,在數(shù)據(jù)挖掘和分布式機(jī)器學(xué)習(xí)方面,傳統(tǒng)的Spark MLlib庫(kù)仍有一些缺陷,欠缺關(guān)于關(guān)聯(lián)分析這類(lèi)的算法內(nèi)容,這給在需要應(yīng)用關(guān)聯(lián)分析處理眾多應(yīng)用場(chǎng)景的大數(shù)據(jù)時(shí)帶來(lái)諸多不便,因此,我們有必要對(duì)相應(yīng)的算法進(jìn)行改進(jìn)、擴(kuò)充和定制,使其能夠更加適用于數(shù)據(jù)挖掘和分布式機(jī)器學(xué)習(xí)技術(shù)。
一般來(lái)講,Spark是采用集群模式應(yīng)用于實(shí)際生產(chǎn)場(chǎng)景中的,因此構(gòu)建好Spark分布式集群是基于Spark進(jìn)行大數(shù)據(jù)挖掘技術(shù)的研究與實(shí)現(xiàn)的關(guān)鍵。Spark開(kāi)發(fā)環(huán)境及其分布式集群的構(gòu)建,主要包括以下幾個(gè)方面:
1.1 硬件系統(tǒng)要求
為了保證良好的運(yùn)行性和兼容性,所有構(gòu)建Spark分布式集群所用的物理主機(jī)均采用Linux 操作系統(tǒng)。本文采用的測(cè)試環(huán)境,由搭建在1臺(tái)主機(jī)上的3臺(tái)虛擬機(jī)組成。在此基礎(chǔ)上搭建Spark分布式集群,包括2個(gè)Worker節(jié)點(diǎn)和1個(gè)Master節(jié)點(diǎn)。Master節(jié)點(diǎn)作為單機(jī)編寫(xiě)和調(diào)試Spark分布式應(yīng)用程序的機(jī)器,配置必須高于Worker節(jié)點(diǎn)。Master節(jié)點(diǎn)的機(jī)器配置為4G內(nèi)存和4核處理器,Worker節(jié)點(diǎn)機(jī)器配置為2G內(nèi)存和2核處理器。各節(jié)點(diǎn)硬盤(pán)為基于PCIE[2]的SSD固態(tài)硬盤(pán),這種硬盤(pán)讀寫(xiě)速度快,可以有效提高工作及運(yùn)行效率。上述集群構(gòu)成形式,既可以減少Spark集群運(yùn)行成本,降低環(huán)境構(gòu)建失敗概率,又可以根據(jù)需要隨時(shí)對(duì)節(jié)點(diǎn)數(shù)量進(jìn)行增減。
1.2 構(gòu)造分布式Spark集群
本文選取Spark版本為Spark1.1。此版本下構(gòu)造分布式Spark集群,首先需要安裝Scala語(yǔ)言,然后將每臺(tái)虛擬機(jī)上的slaves文件內(nèi)容修改為集群上每個(gè)Worker節(jié)點(diǎn)的主機(jī)名,并修改集群每個(gè)節(jié)點(diǎn)的Spark安裝目錄下的Spark-env.sh文件;接著配置系統(tǒng)的jdk環(huán)境變量,修改系統(tǒng)Scala的安裝路徑為SCALA_HOME;集群中Master節(jié)點(diǎn)的主機(jī)名或IP地址采用SPARK_MASTER_IP的屬性值,其他項(xiàng)默認(rèn);最后,確保該集群中的所有節(jié)點(diǎn)的Spark-evn.sh文件和slaves文件的內(nèi)容完全相同。以上配置完成后便可通過(guò)jps命令查看集群的啟動(dòng)情況。
1.3 配置Spark的IDE開(kāi)發(fā)環(huán)境
IDEA作為Scala語(yǔ)言開(kāi)發(fā)環(huán)境,是良好支持Scala的IDE,故選擇其為Spark應(yīng)用程序的編程和開(kāi)發(fā)環(huán)境。但為了避免IDEA在使用過(guò)程中產(chǎn)生的過(guò)量緩存文件過(guò)量占用和消耗I/O資源,選擇SSD固態(tài)硬盤(pán)存儲(chǔ)文件以提高性能。
馬戴一生羈旅,東游江浙,南極瀟湘吳越,西至汧隴,北抵幽燕大漠,跋山涉水,足跡甚遠(yuǎn),嘗盡仕途坎坷的悲苦辛酸,因此他的羈旅之作除了對(duì)山水的描摹和懷古傷今之愁以外,也有懷鄉(xiāng)思?xì)w的深切悲痛,以及和著血淚的生活體驗(yàn),這些詩(shī)歌不僅是其內(nèi)心情感的集中寫(xiě)照,也是其政治命運(yùn)的真實(shí)反映,在飽含深情的描繪中,呈現(xiàn)出了既同于中晚唐詩(shī)人寫(xiě)作的共性——精于五律,格律嚴(yán)整的藝術(shù)成就,也展現(xiàn)了其羈旅行役詩(shī)獨(dú)特的典雅、清奇的藝術(shù)風(fēng)格。
IDEA配置完成后,即可以開(kāi)始進(jìn)行Spark程序測(cè)試。
2.1 Apriori算法概念和核心步驟
Apriori算法是一種挖掘關(guān)聯(lián)規(guī)則的頻繁項(xiàng)集算法,Apriori算法多次掃描交易數(shù)據(jù)庫(kù),每次利用候選頻繁集產(chǎn)生頻繁集。它的主要步驟可分為定義最小支持度,篩選所有頻繁項(xiàng)集和根據(jù)置信度產(chǎn)生關(guān)聯(lián)規(guī)則。
2.2 Apriori算法基于Spark的分布式實(shí)現(xiàn)
Apriori算法基于Spark分布式集群的基本流程圖,如圖1所示。
圖1 分布式Apriori算法的實(shí)現(xiàn)流程圖
算法的主要思路為:
(1) 產(chǎn)生頻繁1項(xiàng)集L1。將事務(wù)集T以RDD
(1) 得到頻繁1項(xiàng)集F1并保存,以下為該步驟核心代碼:
valfim1=transactions.flatMap(line=>line).Map((,1)).ReduceByKey(_+_).Filter(_._2>minSupport)savefim1(fim1,output+”result-1”,sum)defsavefim1(fim:RDD[(string,int)],path:string,count:double):Unit={fim.map(line=>{line._1+“:%.2f”.format(line._2/count)}).Coalesce(1,true).SaveAsTextFile(path)}
(2) 頻繁1項(xiàng)集L1自連接產(chǎn)生C1,以C1作為對(duì)比,對(duì)數(shù)據(jù)庫(kù)進(jìn)行掃描以產(chǎn)生fim2,將fim2保存下來(lái),以下為該步驟核心代碼。
var(trans,newfim)=LItofim2(fimI.rnap(_._1).collect,transactions,minSupport)save(newfim,output+"result-2",sum)defL1tofim2(L1:Array[String],trans:RDD[(String,Int)],minSupport:Double):RDD[(List[String],Int)]={valL1c=Ll.sizevalcitems=scala.collection.mutable.ArrayBuffer[List[String]]()for(i<-0untilLlc){ for(j<-i+1untilLlc){ Citems+=List(L1(i),L1(j)).sortWith(_<_) }}valbccFI=sc.broadcast(citems)valtemp1=transflatMap(linc=>{) vartmp=scala.collection.mutable.Set[(List[String],Int)]() for(citem<-bccFLvalue){ valtc=isContain(line._l.split("").toSet,citem.toSet) if(tc=1){ tmp+=citem一>line._2 }}tmp})valnewfim=temp1.ReduceByKey(_+_).Filter(_._2>minSupport).cachebccFIunpersist()returnnewfim
(3) 循環(huán)產(chǎn)生3項(xiàng)集到8項(xiàng)集。以下為核心代碼。
varfimk=newfim.collectfor(k<一3to8){ valtemp=mine(fimk.map(_._)1),trans,minSup-port) save(temp_2,output+"result-"+k,sum) fimk=temp.2.collect trans=temp._1}defisFrequent(orderitems:List[String],Lmap:scala.col-lection.mutable.Map[...]):Boolean valoCc=orderitems.sizefor(i<-0tooCc一3){ val11=orderitems.slice(0,i) val12}rderitems.slice(i+1,oCc) valkey=11.foldRight(12){(n:String,l2:List[String])=>n::12} valkeyl=key.slice(0,key.size-1)valvalues=Lmap.get(keyl)match{caseSome(n)=>ncaseNone=>List()if(!(values.exists(_=orderitems(oCc-1)))){returnfalse }}returntrue}defcombine(line,List[String]):scala.collection.mutable.ArrayBuffer[List[String]]={valCitems=scala.collection.mutable.ArrayBuffer[List[String]]()valtarray=line.2toArrayvaltc=tarray.sizefor(i<-0untiltc)foro(i<-1+1untiltc)} citems+=(tarray(j):aarray(i):aine._1).sortWith(_<_)}}citemsdefisContain():Int={varcontain=Iset.find(item=>{if(!trans.contains(item)){contain=0true}elseFalse})Contain}
3.1 實(shí)驗(yàn)環(huán)境與條件
分布式Apriori算法的測(cè)試環(huán)境為由前面搭建好的Spark on Yarn集群。單機(jī)Apriori算法的測(cè)試環(huán)境為該集群中的Master節(jié)點(diǎn)。本文以chess標(biāo)準(zhǔn)數(shù)據(jù)集[3]作為待測(cè)數(shù)據(jù)集,每一個(gè)候選集的編號(hào)為該數(shù)據(jù)集每一行的第一個(gè)數(shù)字,最小支持度選為85%,頻繁項(xiàng)集K設(shè)為8,然后將所設(shè)計(jì)的算法打包并以包的形式傳到Spark上集群上運(yùn)行,以進(jìn)行數(shù)據(jù)挖掘。算法運(yùn)行過(guò)程中需要依次輸入數(shù)據(jù)集路徑和輸出文件夾路徑。數(shù)據(jù)集路徑用于輸入數(shù)據(jù)的存儲(chǔ)和管理,自身存放于HDFS上的data文件夾下;輸出文件夾路徑用于存放需要輸入各項(xiàng)頻繁項(xiàng)集的結(jié)果,共包含K個(gè)文件夾。每個(gè)文件中的內(nèi)容的格式都為“項(xiàng)集:置信度”。
3.2 實(shí)驗(yàn)結(jié)果分析
輸出結(jié)果存放于result-1至result-8這8個(gè)文件中,集群中Worker節(jié)點(diǎn)都打開(kāi)的時(shí)候,程序總體運(yùn)行時(shí)間為74 s。每個(gè)文件中的項(xiàng)集數(shù)依次為:984、690、517、358、177、105、32和15個(gè),如圖2所示。
圖2 各文件中頻繁項(xiàng)集數(shù)
當(dāng)最多只有一個(gè)Worker節(jié)點(diǎn)工作,其它條件不變時(shí),測(cè)試結(jié)果,如圖3所示。
圖3 不同數(shù)量節(jié)點(diǎn)運(yùn)行時(shí)間
從圖3中可以看出,第1類(lèi):當(dāng)Spark集群中只有一個(gè)Master節(jié)點(diǎn)和一個(gè)Worker節(jié)點(diǎn)時(shí),節(jié)點(diǎn)運(yùn)行所消耗的時(shí)間為108s;第2類(lèi):當(dāng)Spark集群中的兩個(gè)Worker節(jié)點(diǎn)同時(shí)運(yùn)行時(shí),所花費(fèi)的時(shí)間為60 s;第3類(lèi):?jiǎn)螜C(jī)模式下,也就是當(dāng)Spark集群只打開(kāi)Master節(jié)點(diǎn),兩個(gè)Worker節(jié)點(diǎn)都被關(guān)閉時(shí),算法運(yùn)行所消耗的時(shí)間為195 s??梢钥闯?,不同模式下的分布式并行Apriori算法運(yùn)行具有較大的差異。在算法運(yùn)行過(guò)程中會(huì)產(chǎn)生大量的候選集,頻繁與HDFS進(jìn)行交互,導(dǎo)致時(shí)間的消耗。另外圖中第4類(lèi)情況反映的是當(dāng)Master節(jié)點(diǎn)運(yùn)行java編寫(xiě)的單機(jī)Apriori算法時(shí)的運(yùn)行情況,所消耗時(shí)間長(zhǎng)達(dá)759 s。
對(duì)于Apriori算法處理相同數(shù)據(jù)集時(shí),Spark集群中的所有節(jié)點(diǎn)在都打開(kāi)的情況下所消耗的時(shí)間遠(yuǎn)遠(yuǎn)少于單機(jī)模式或只有一個(gè)Worker節(jié)點(diǎn)和Master節(jié)點(diǎn)打開(kāi)時(shí)所花費(fèi)的時(shí)間,主要原因在于集群中的工作節(jié)點(diǎn)越多使得集群總體配置越高,處理速度自然也就越快。同時(shí)Spark支持可伸縮計(jì)算的特性也很多提高了原有的大數(shù)據(jù)集的效率。另外,我們也還發(fā)現(xiàn),不同的編程語(yǔ)言也對(duì)算法運(yùn)行結(jié)果有著很大的差別。這是因?yàn)镾park框架還支持內(nèi)存計(jì)算,部分算法被放入內(nèi)存中計(jì)算,使得Apriori算法的效率在原有的基礎(chǔ)上得到極大提高,這也正是Spark框架的優(yōu)勢(shì)之一。但Spark集群運(yùn)行分布式并行Apriori算法一般更適用于處理較大規(guī)模型的數(shù)據(jù)集,在處理小型數(shù)據(jù)集時(shí),Spark集群運(yùn)行分布式并行Apriori算法的效率要比單機(jī)模式下低。原因在于Spark集群在處理數(shù)據(jù)集時(shí),需要頻繁地與HFDS交互,對(duì)數(shù)據(jù)進(jìn)行RDD分塊和封裝,還有DAG備份恢復(fù)等一系列工作。所以Spark集群模式更適用于較大型數(shù)據(jù)集情況。
本文對(duì)基于Spark的大數(shù)據(jù)挖掘技術(shù)進(jìn)行了研究,并提出了基于Spark平臺(tái)分布式Apriori算法,有效彌補(bǔ)了MLlib化中的不足,即缺少的關(guān)聯(lián)分析類(lèi)算法,該算法可以應(yīng)用到關(guān)聯(lián)分析大規(guī)模數(shù)據(jù)的場(chǎng)景當(dāng)中。本文首先搭建起Spark on yarn的分布式Spark生產(chǎn)測(cè)試環(huán)境,即由3個(gè)以上節(jié)點(diǎn)構(gòu)成的集群,然后再在所搭建好的集群上對(duì)文中算法進(jìn)行了實(shí)驗(yàn)。實(shí)驗(yàn)以經(jīng)典算法Apriori為測(cè)試算法,測(cè)試對(duì)象為GB級(jí)別的大數(shù)據(jù)集,采用了Scala語(yǔ)言和Spark RDD的分布式算子分別對(duì)其進(jìn)行編碼并運(yùn)行,同時(shí)還比較了其與Apriori算法在運(yùn)行Java語(yǔ)言所編寫(xiě)的單機(jī)模式下運(yùn)行結(jié)果及效率。
[1] 中國(guó)產(chǎn)業(yè)調(diào)研網(wǎng).2015年中國(guó)大數(shù)據(jù)行業(yè)現(xiàn)狀研究分析與市場(chǎng)前景預(yù)測(cè)報(bào)告[EB/OL].http://www.cir.cn/2015-01/DaShuJuHangYeYanJiuFenXi219.html,2017-01-13.
[2] 道客巴巴.基于Spark的大數(shù)據(jù)挖掘技術(shù)的研究與實(shí)現(xiàn)[EB/OL].http://www.doc88.com/p-7758265704891.html,2017-01-14.
[3] 道客巴巴.基于Spark的大數(shù)據(jù)挖掘技術(shù)的研究與實(shí)現(xiàn)[EB/OL].http://www.doc88.com/p-7758265704891.html,2017-01-14.
Research on Technology of Big Data Mining Based on Spark
Wang Xun
(Section of Teaching Equipment & Lab Management, Shaanxi Xueqian Normal University, Xi’an 710061, China)
Because big data have the characteristics of large amount of data and rich diversity, it must be demanding large data analysis both in processing speed and real-time requirements. Data mining technology is to use some modeling algorithm from massive data, to look for hidden information behind the data, so that big data can produce greater value. Spark framework is a low latency cluster distributed computing system for super large data sets. Based on the framework, this paper studies the big data mining technology. This paper designs and implements the Yarn deployment on the Spark cluster firstly, and then proposes and implements parallel Apriori algorithm. This algorithm successfully adds to the distributed algorithm of association analysis by the lack of Spark MLlib distributed machine learning repository.
Big data; Data mining; Spark
王珣(1982-),男,漢中人,工程師,經(jīng)濟(jì)學(xué)碩士,研究方向:信息管理與信息技術(shù)。
1007-757X(2017)06-0064-03
文獻(xiàn)標(biāo)志碼:
2017.02.05)