国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

Spark環(huán)境下基于子圖的異步迭代更新方法

2020-04-07 10:48董新華陳建峽
計算機工程與應(yīng)用 2020年7期
關(guān)鍵詞:子圖頂點全局

李 超,董新華,陳建峽

湖北工業(yè)大學 計算機學院,武漢430068

1 引言

隨著信息時代的快速發(fā)展,基于圖的迭代算法有著廣泛應(yīng)用[1]。例如,PageRank算法可以對網(wǎng)頁的重要性進行排序,SimRank算法可以對社交網(wǎng)絡(luò)中用戶之間的相似度進行分析。由于真實網(wǎng)絡(luò)的數(shù)據(jù)規(guī)模較大,通常采取分布式框架對圖數(shù)據(jù)進行迭代處理,基于全局同步更新機制[2](BSP)的圖處理系統(tǒng)由于實現(xiàn)簡單且易于擴展,為大規(guī)模圖數(shù)據(jù)的分析提供了便利。

受全局同步更新機制的啟發(fā)[3],Spark 環(huán)境下的Graphx[4]圖處理系統(tǒng)基于子圖對數(shù)據(jù)進行全局同步迭代更新。通過彈性分布式數(shù)據(jù)集以分區(qū)方式存放頂點和連接邊屬性,Graphx可以利用頂點的局部狀態(tài)更新頂點的全局狀態(tài),并且在迭代過程中將中間計算結(jié)果放入內(nèi)存,以此避免頻繁地I/O 訪問。但是,由于Graphx 要求子圖之間的計算任務(wù)保持全局同步,因此降低了圖的收斂速度。

針對全局同步機制收斂速度較慢的問題,研究人員提出了異步迭代更新方法[5],Zhang[6]闡述了異步迭代收斂需要滿足的條件。當?shù)惴M足該條件時,頂點的狀態(tài)更新能夠繞開同步路障。雖然異步迭代提高了算法的收斂速度,但是以頂點為中心的異步迭代需要鄰居節(jié)點頻繁跨連接邊發(fā)送消息[7-8],當鄰居節(jié)點狀態(tài)的變化對頂點狀態(tài)更新的作用不大時,會降低網(wǎng)絡(luò)的通信效率[9]。

為了解決圖迭代收斂速度較慢以及通信效率較低的問題,本文在Spark 環(huán)境下提出一種基于子圖的異步迭代更新方法,總體研究思路如下:首先,對圖的切分、全局同步和異步迭代更新等概念進行簡要介紹;其次,結(jié)合Spark 環(huán)境下圖數(shù)據(jù)存儲和更新的特點,推導(dǎo)出基于子圖的異步迭代更新條件,分別從異步消息通信和迭代更新機制等方面給出具體的研究方案,在此基礎(chǔ)上給出研究方案在分布式環(huán)境下的具體實現(xiàn);最后,通過PageRank 算法分別從圖的收斂結(jié)果、收斂速度和通信代價等方面驗證了方法有效性,并對實驗結(jié)果進行分析。實驗結(jié)果表明,本文方法不僅能夠提高算法收斂速度,同時還能降低通信開銷。

2 相關(guān)概念

2.1 圖切分

圖數(shù)據(jù)有較強的關(guān)聯(lián)性,因此圖的切分對數(shù)據(jù)的迭代更新有較大影響[10-11]。將圖1中頂點E 切分成4部分后,邊分區(qū)可以存放連接邊的完整屬性和頂點的局部狀態(tài),點分區(qū)可以存放頂點的全局狀態(tài),計算節(jié)點分別對邊分區(qū)中頂點的局部狀態(tài)和點分區(qū)中頂點的全局狀態(tài)進行迭代處理。

圖1 有向圖的點切分方法

在圖迭代過程中,連接邊上的頂點需要頻繁交互信息。由于圖1中邊分區(qū)存放了連接邊兩端的頂點,因此連接邊上的頂點可以直接在本地交互信息。對于連接邊數(shù)量多于頂點數(shù)量的真實網(wǎng)絡(luò)來說,這種切分方式可以顯著減少頂點通過連接邊跨分區(qū)發(fā)送消息的次數(shù)[11]。

2.2 全局同步更新

圖的狀態(tài)迭代更新可以用公式(1)描述:

公式(1)表明目的節(jié)點j 在第k 輪的狀態(tài)值根據(jù)連接邊上的源節(jié)點在第k-1 輪的狀態(tài)值計算得到。

在全局同步機制下,頂點的全局狀態(tài)依賴于所有邊分區(qū)內(nèi)頂點的局部狀態(tài)。當所有邊分區(qū)頂點的局部狀態(tài)全部計算完畢,頂點的全局狀態(tài)更新才能開始。異構(gòu)環(huán)境下如果連接邊分布在多個不同的邊分區(qū),那么本地計算耗時最長的邊分區(qū)將直接影響下一輪迭代開始的時間。

2.3 異步更新

為提高算法收斂速度,Zhang[6]認為對公式(1)作適當變形后,可以得到公式(2):

公式(2)表明目的節(jié)點j 在第k 輪的狀態(tài)可以通過nj個源節(jié)點在第k-1 輪的狀態(tài)計算得到。如果只考慮源節(jié)點在第k 輪與第k-1 輪的狀態(tài)變化值,令,那么公式(2)可變?yōu)楣剑?):

公式(2)和公式(3)表明異步更新能夠減少圖的收斂時間,但是分布式環(huán)境下圖的迭代對異步更新條件、消息通信和更新機制有著不同的要求,需要根據(jù)實際情況加以具體分析。本文首先給出Spark環(huán)境下基于子圖的異步迭代形式,在此基礎(chǔ)上給出消息通信模型和異步處理機制。

3 研究方案

3.1 基于子圖的異步迭代

式中的⊕算子定義為局部算子,將公式(5)帶入公式(4),可以得到公式(6):

式中的⊕算子定義為全局算子,公式(6)表明頂點的全局狀態(tài)改變值可以根據(jù)不同邊分區(qū)的局部改變值進行更新。由于不同分區(qū)的本地計算任務(wù)互不影響,因此全局算子滿足交換律。在函數(shù)fm作用下,如果局部算子也滿足交換律和分配律,那么頂點收到任一局部狀態(tài)變化值,就能立即更新頂點的全局狀態(tài)。

3.2 消息通信

分布式圖處理系統(tǒng)主要采取消息傳遞[12]和共享內(nèi)存的通信方式?;诠蚕韮?nèi)存通信的分布式圖處理系統(tǒng)一般采取分布式鎖保證數(shù)據(jù)的一致性。由于分布式環(huán)境下的計算節(jié)點有獨立的內(nèi)存地址,共享內(nèi)存的通信方式實現(xiàn)起來較為困難。基于消息傳遞的通信方式在計算節(jié)點之間發(fā)送消息,目前主要有基于Netty 的通信協(xié)議[13]、消息傳遞接口協(xié)議[14]和遠程過程調(diào)用協(xié)議。

根據(jù)Spark 環(huán)境下頂點狀態(tài)更新的特點,本文采取遠程過程調(diào)用協(xié)議實現(xiàn)分區(qū)數(shù)據(jù)集之間的異步消息通信,將每個分區(qū)數(shù)據(jù)集看作基于Actor 模型[15]的通信實體。另外,遠程過程調(diào)用不僅能夠?qū)崿F(xiàn)異步消息通信,底層通信協(xié)議還支持數(shù)據(jù)塊傳輸。由于頂點的局部狀態(tài)值分布在不同的邊分區(qū),而頂點唯一的全局狀態(tài)值存放在點分區(qū),邊分區(qū)m 與點分區(qū)n 之間存在交集。當邊分區(qū)在本地獲取本地目的節(jié)點的局部匯聚值之后,可以將本地局部匯聚的結(jié)果以數(shù)據(jù)塊Blockm->n(local(ΔSID))的形式發(fā)送給頂點所在的點分區(qū),數(shù)據(jù)塊中頂點ID 滿足式(7):

與頂點為中心的消息發(fā)送方式相比,以數(shù)據(jù)塊為單位將計算結(jié)果集中發(fā)給點分區(qū),不僅可以批量處理數(shù)據(jù)塊,同時還能提高通信效率。

3.3 更新機制

當點分區(qū)收到數(shù)據(jù)塊之后,為避免頂點狀態(tài)在異步更新過程中產(chǎn)生數(shù)據(jù)讀寫沖突,本文創(chuàng)建數(shù)據(jù)塊緩存隊列以及線程池,在頂點狀態(tài)表上設(shè)置讀寫鎖,保證頂點狀態(tài)在異步更新過程中的一致性。

圖2中,線程池分別執(zhí)行數(shù)據(jù)塊的接收(receive)、更新(update)和發(fā)送(send)任務(wù)。receive線程將收到的數(shù)據(jù)塊放入緩存隊列,update線程從緩存隊列中取出數(shù)據(jù)塊進行處理,將數(shù)據(jù)塊中頂點的局部狀態(tài)變化delta 寫進狀態(tài)表(state table),同時更新頂點的全局狀態(tài)。如果更新后的頂點作為源節(jié)點指向其他目的節(jié)點,則將最新的狀態(tài)變化值發(fā)送給該點作為源節(jié)點的邊分區(qū),以更新其指向的目的節(jié)點。另外,update線程通過設(shè)定發(fā)送閾值,將滿足條件的狀態(tài)變化值放入待發(fā)送數(shù)據(jù)塊,當數(shù)據(jù)塊達到一定規(guī)模后喚醒send 線程,由send 線程將數(shù)據(jù)塊發(fā)給邊分區(qū),以更新邊分區(qū)中源節(jié)點的狀態(tài)。當點分區(qū)內(nèi)頂點全局狀態(tài)變化值小于閾值時,終止迭代過程。

圖2 點分區(qū)和邊分區(qū)之間異步迭代更新機制

4 方法實現(xiàn)

4.1 PageRank定義

本文以PageRank[16]算法驗證基于子圖的異步迭代更新方法,PageRank的迭代計算公式為:

4.2 PageRank的異步迭代形式

Spark 環(huán)境下邊分區(qū)以三元組((srcId,srcAttr),(dstId,dstAttr),attr)的格式存儲本地連接邊的狀態(tài)值,其中srcId、dstId 是連接邊上源節(jié)點的ID 和目的節(jié)點的ID,srcAttr 是源節(jié)點在分區(qū)內(nèi)當前的狀態(tài)值,dstAttr 是目的節(jié)點的狀態(tài)值,attr是連接邊完整的屬性值,取決于源節(jié)點的出度數(shù)。

根據(jù)圖數(shù)據(jù)的特點,在連接邊上定義map 函數(shù)和reduce函數(shù),頂點的局部狀態(tài)可以直接利用數(shù)據(jù)的本地性計算獲取。與傳統(tǒng)的MapReduce 分布式計算模型[12]不同,由于采用彈性分布式數(shù)據(jù)集存放圖數(shù)據(jù),Spark環(huán)境下通過map 函數(shù)和reduce 函數(shù)實現(xiàn)的消息映射和匯聚結(jié)果不需要頻繁寫入外存,每條連接邊上的Map函數(shù)定義如下:

在邊分區(qū)內(nèi)執(zhí)行map 函數(shù)時,源節(jié)點src 以并行方式向本地目的節(jié)點集合dstIds發(fā)送消息,消息度量值是源節(jié)點當前的狀態(tài)值與該條連接邊屬性的乘積。當目的節(jié)點存在多條入度連接邊,目的節(jié)點dst 在本地局部聚合的結(jié)果是該點所有入度連接邊上消息聚合的總和,通過reduce函數(shù)得到:

Spark 環(huán)境下的局部計算可以通過公式(9)和公式(10)實現(xiàn),⊕算子是乘積算子和加法算子,滿足交換律、結(jié)合律。根據(jù)公式(6),目的節(jié)點dst 的全局狀態(tài)值是上一次更新后的全局狀態(tài)值與最新的狀態(tài)變化值直接求和:

式中,⊕是加法算子,滿足交換律和結(jié)合律,因此頂點在第k 輪的全局狀態(tài)與不同邊分區(qū)中頂點的局部狀態(tài)值到達的次序無關(guān),而通過式(9)、(10)計算得到,因此全局狀態(tài)值的更新滿足異步迭代條件。本文接下來結(jié)合公式(9)、(10)和公式(11)給出PageRank算法異步更新的具體實現(xiàn)。

4.3 分布式環(huán)境中的實現(xiàn)

根據(jù)公式(9)、(10),Spark 環(huán)境下邊分區(qū)內(nèi)部的活躍節(jié)點沿其出度連接邊給目的節(jié)點發(fā)消息,而目的節(jié)點對其每條入度連接邊上對收到的消息進行局部聚合,如算法1。

算法1在邊分區(qū)內(nèi)對目的節(jié)點的局部狀態(tài)聚合

輸入:點分區(qū)的活躍源節(jié)點集合activeSet,源節(jié)點狀態(tài)值變化的數(shù)據(jù)塊newSrcAttrblock

輸出:邊分區(qū)內(nèi)部目的節(jié)點的局部狀態(tài)信息local_aggregates,并將localBlockForVertexPartition發(fā)給點分區(qū)vertexPartition

/*遍歷本地的源節(jié)點集合*/

1.for each srcId in localSrcIds

/*如果活躍節(jié)點集合包含源節(jié)點sccId*/

2.if activeSet.contains(srcId)

/*更新邊分區(qū)三元組上活躍源節(jié)點srcId的狀態(tài)值*/

3. newSrcAttr=update(srcId,newSrcAttrblock)

4. newedgeTriplet=updateEdgeTriple(tsrcId,news rcAttr)

/*在源節(jié)點srcId的出度連接邊上給目的節(jié)點發(fā)送消息*/

5. mapFunc(newedgeTriplet=>dstMsg([dstId,msg)])

/*在目的節(jié)點dstId入度連接邊上作本地局部聚合*/

6. for each dstId,msg in dstMsg([dstId,msg)]

7. reduceFunc(local_aggregates(dstId),msg)

/*根據(jù)目的節(jié)點所在的點分區(qū)對局部聚合結(jié)果進行切分*/

8.dstBlockToVertexPartition=split(dstVidsInVertexPartition,local_aggregates)

/*將分區(qū)后的數(shù)據(jù)塊依次發(fā)送至所在的點分區(qū)*/

9.for each vertexRef in vertexPartitionRefs

10.vertexPartitionRef.send(dstBlockToVertexPartition)

算法1中,邊分區(qū)中源節(jié)點是否對其出度連接邊上的目的節(jié)點發(fā)送消息,取決于源節(jié)點是否處于活躍狀態(tài)(active)。因此,邊分區(qū)首先接收來自點分區(qū)的活躍節(jié)點集(activeSet)以及包含源節(jié)點變化值的數(shù)據(jù)塊newSrcAttrblock,并且檢查本地源節(jié)點是否在活躍節(jié)點集中。對于活躍的源節(jié)點,更新其狀態(tài)變化值得到最新的以該點為中心的三元組集合newedgeTriplet。此后,mapFunc 并行作用于更新后的以srcId 為源節(jié)點的三元組上,同時reduceFunc 對目的節(jié)點dstId 入度連接邊上的消息進行聚合,得到目的節(jié)點在本地局部聚合的結(jié)果local_aggregates。由于邊分區(qū)中的目的節(jié)點分布在不同的點分區(qū)中,按照式(7)對本地聚合結(jié)果進行切分,將切分后的結(jié)果dstBlockToVertexPartition 通過每個點分區(qū)地址引用vertexPartitionRef發(fā)送給對應(yīng)的點分區(qū)。

另一方面,點分區(qū)收到不同邊分區(qū)的數(shù)據(jù)塊,按照圖2更新節(jié)點的狀態(tài)信息,如算法2。

算法2點分區(qū)對收到的數(shù)據(jù)塊后進行處理,將更新后的源節(jié)點狀態(tài)變化值以數(shù)據(jù)塊的形式發(fā)送到所在的邊分區(qū)

輸入:點分區(qū)從邊分區(qū)收到的局部匯聚結(jié)果dstBlockTo-VertexPartition

輸出:更新后的源節(jié)點狀態(tài)變化值數(shù)據(jù)塊newSrcAttrblock

/*receive線程將邊分區(qū)發(fā)送的數(shù)據(jù)塊dstBlockToVertex-Partition放入阻塞隊列blockingQueue*/

1.blockingQueue.pu(tdstBlockToVertexPartition)

/*update線程池從阻塞隊列取出數(shù)據(jù)塊curBlock*/

2.curBlock=blockingQueue.take()

/*update 線程池遍歷數(shù)據(jù)塊中的節(jié)點vid 及其變化值delta*/

3.for each vid,delta in curBlock

/*將節(jié)點和狀態(tài)變化值寫入狀態(tài)信息表stateTable*/

4.stateTable.write(vid,delta)

/*將滿足條件的源節(jié)點放入狀態(tài)更新數(shù)據(jù)集srcDelta-ToEdgePartition,活躍節(jié)點集activeSet中*/

5.if(delta>DELTA_THRESHHOLD)

6.srcDeltaToEdgePartition.append(vid,delta)

7.activeSet.add(vid)

/*根據(jù)源節(jié)點所在的邊分區(qū)srcVidToEdgePartition 對狀態(tài)更新數(shù)據(jù)集srcDeltaToEdgePartition進行切分*/

8.newSrcAttrblock=split(srcVidToEdgePartition,srcDelta-ToEdgePartition)

/*send線程將切分后的數(shù)據(jù)塊newSrcAttrblock依次發(fā)送至所在的邊分區(qū)*/

9.for each edgePartitionRef in edgePartitionRefs

10.edgePartitionRef.send(newSrcAttrblock)

算法2中,receive線程首先將來自邊分區(qū)的數(shù)據(jù)塊放入緩存隊列,update線程從緩存隊列取出數(shù)據(jù)塊處理目的節(jié)點的變化值。為了避免對狀態(tài)表中同一頂點同時寫入狀態(tài)變化值,update線程在寫入數(shù)據(jù)之前需要首先獲取狀態(tài)表的寫鎖,在寫入數(shù)據(jù)之后檢查該頂點的狀態(tài)變化值是否超過頂點狀態(tài)變化的閾值,并將滿足條件的頂點放入待發(fā)送數(shù)據(jù)塊以及活躍頂點集activeSet中。另外,由于點分區(qū)中更新后的源節(jié)點分布在不同的邊分區(qū)中,需要對點分區(qū)內(nèi)更新后的結(jié)果進行切分,并將切分后的結(jié)果newSrcAttrblock 通過每個邊分區(qū)地址引用edgePartitionRef 依次發(fā)送給對應(yīng)的邊分區(qū)。當邊分區(qū)收到點分區(qū)的數(shù)據(jù)塊之后再次執(zhí)行算法1,并繼續(xù)執(zhí)行新一輪的局部聚合任務(wù),當邊分區(qū)內(nèi)所有源節(jié)點為非活躍狀態(tài)時,終止算法1和算法2。

5 實驗和結(jié)果分析

5.1 實驗準備

本文選取真實網(wǎng)絡(luò)樣本數(shù)據(jù)集wiki-topcats[17],該數(shù)據(jù)集共包含1 791 489個頂點,28 511 807條連接邊。為實現(xiàn)負載均衡,以哈希方式對圖數(shù)據(jù)作點切分,頂點和連接邊的狀態(tài)值分別存放在4 個點分區(qū)和4 個邊分區(qū)。通過兩組實驗驗證方法有效性:第一組實驗統(tǒng)計PageRank 在全局同步和異步更新的收斂結(jié)果;第二組實驗給出不同迭代方式下的收斂時間和通信開銷。

5.2 初始化和結(jié)束條件

在執(zhí)行迭代算法之前,首先對圖中頂點狀態(tài)進行初始化。根據(jù)公式(11),在迭代過程中需要保證第k+1輪狀態(tài)值是第k 輪狀態(tài)值與下一輪狀態(tài)變化值求和的結(jié)果,因此設(shè)定點分區(qū)內(nèi)頂點的初始值為0,邊分區(qū)內(nèi)頂點初始值1-d,邊分區(qū)內(nèi)所有頂點的狀態(tài)為激活狀態(tài)。另外,PageRank 算法中頂點狀態(tài)值在迭代過程中呈單調(diào)增長趨勢,因此采用頂點全局狀態(tài)值的總和作為收斂程度的度量值,頂點的狀態(tài)初始值以及整個圖中活躍節(jié)點的個數(shù)將影響整個圖的最終收斂結(jié)果。當d 值越小,頂點的初始狀態(tài)值越大,并且圖中活躍頂點個數(shù)越多時,圖中頂點狀態(tài)值的收斂總和越大。根據(jù)PageRank 算法的迭代公式,通常情況下設(shè)定d 值為0.8,使得孤立頁面隨機跳轉(zhuǎn)到其他頁面的概率為0.2。在異步迭代方式下,不同點分區(qū)內(nèi)頂點的全局狀態(tài)相互獨立,只要點分區(qū)內(nèi)頂點全局狀態(tài)總和的增長區(qū)間小于設(shè)定的閾值,即認為該分區(qū)的頂點達到全局收斂。當設(shè)定閾值越大,圖越容易達到收斂狀態(tài),當所有分區(qū)的頂點全部收斂,結(jié)束整個迭代過程。

5.3 收斂結(jié)果

首先,按照全局同步方式對圖數(shù)據(jù)迭代。Spark 環(huán)境下的全局同步通過reduce 算子觸發(fā)邊分區(qū)內(nèi)部的局部消息聚合任務(wù),再將聚合后的結(jié)果發(fā)送給點分區(qū)作全局同步,圖中所有頂點的狀態(tài)總和與迭代次數(shù)之間的關(guān)系如圖3所示。

圖3 全局同步迭代下頂點狀態(tài)總和

在圖3中,頂點狀態(tài)值的總和隨著迭代次數(shù)增長不斷增大。迭代前10 輪頂點狀態(tài)總和的增長速度較快,隨后增長速度減緩,迭代到22輪時,頂點狀態(tài)值的總和接近收斂狀態(tài)。

其次,異步迭代不受全局同步的限制,因此異步更新沒有迭代次數(shù)的概念,不能通過迭代次數(shù)判斷圖的收斂狀態(tài)??紤]到頂點的全局狀態(tài)更新取決于邊分區(qū)數(shù)據(jù)塊到達的時間,并且不同分區(qū)內(nèi)頂點的全局狀態(tài)之間相互獨立,因此可以統(tǒng)計不同點分區(qū)內(nèi)頂點的全局狀態(tài)總和判斷圖數(shù)據(jù)的收斂狀態(tài)。圖4 給出不同點分區(qū)內(nèi)頂點狀態(tài)總和隨著數(shù)據(jù)塊處理的變化關(guān)系。

圖4 異步迭代下各個點分區(qū)的頂點狀態(tài)總和

在異步迭代初始階段,邊分區(qū)內(nèi)所有源節(jié)點的初始狀態(tài)都為激活狀態(tài),因此邊分區(qū)內(nèi)所有源節(jié)點都能向目的節(jié)點發(fā)送消息,使得初始階段各個點分區(qū)內(nèi)頂點的全局狀態(tài)總和增長較快。另外,異步更新并不要求所有邊分區(qū)的數(shù)據(jù)塊同時到達,只要點分區(qū)收到數(shù)據(jù)塊就能立即更新部分頂點的狀態(tài),因此各個點分區(qū)的頂點狀態(tài)總和在收斂過程中出現(xiàn)不同幅度的震蕩。當4 個點分區(qū)處理完65~70個數(shù)據(jù)塊后,頂點狀態(tài)總和與全局同步迭代到22輪的狀態(tài)值接近,認為異步迭代接近收斂。

5.4 收斂速度

為比較全局同步和異步迭代的收斂速度,以圖3和圖4 的收斂值統(tǒng)計全局同步和異步迭代的收斂時間。圖5給出全局同步下reduce和collect算子在每輪迭代過程中的平均運行時間。

圖5 全局同步每輪迭代的運行時間

從圖5 可以看到,全局同步迭代輪數(shù)較少,但是由于存在路障限制,全局同步迭代需要等到所有邊分區(qū)的結(jié)果全部到達才能對頂點的狀態(tài)進行更新。每輪迭代過程中reduce 算子平均運行時間為12~18 s,collect算子平均運行時間在8~10 s。在已知圖數(shù)據(jù)收斂狀態(tài)情況下,僅考慮每輪迭代過程中reduce 算子運行的時間,并且只在最后收斂階段使用collect算子將頂點狀態(tài)匯總至driver 計算收斂結(jié)果,全局同步的平均收斂時間為335.7 s。

由于初始階段點分區(qū)需要等待邊分區(qū)局部聚合的結(jié)果,導(dǎo)致點分區(qū)發(fā)送數(shù)據(jù)塊的時間變長,因此圖6 中前后兩個階段消耗的時間比其他階段長。在對圖數(shù)據(jù)處理多次后,4個點分區(qū)的平均收斂時間分別為112.4 s、114.5 s、117.2 s、119.7 s,相同數(shù)據(jù)集下以頂點為中心異步更新的收斂時間為102.7 s。

圖6 給出4 個點分區(qū)處理數(shù)據(jù)塊的時間,起始點設(shè)為點分區(qū)收到數(shù)據(jù)塊的時間,結(jié)束點為點分區(qū)將處理后的頂點狀態(tài)變化值以數(shù)據(jù)塊的形式發(fā)送給邊分區(qū)的時間,圖中每個點分區(qū)對數(shù)據(jù)塊的接收、更新到發(fā)送時間集中在0.5~2.5 s。

圖6 異步更新下點分區(qū)對數(shù)據(jù)塊的處理時間

5.5 通信代價

全局同步的通信開銷主要由每輪迭代過程中的reduce算子產(chǎn)生,當邊分區(qū)在本地的局部聚合全部結(jié)束之后,需要對所有邊分區(qū)中具有相同索引頂點的局部狀態(tài)作全局聚合,并將結(jié)果發(fā)送給點分區(qū)。統(tǒng)計發(fā)現(xiàn)全局同步迭代方式下每輪迭代的通信開銷在125~132 MB。對圖數(shù)據(jù)進行多次全局同步迭代后,通信量均值為2 850 MB。

相比全局同步要求4 個邊分區(qū)將數(shù)據(jù)塊匯總后同時發(fā)送到點分區(qū),以子圖為中心的異步迭代不需要等待其他分區(qū)的局部聚合結(jié)果,能夠直接將邊分區(qū)聚合后的數(shù)據(jù)塊發(fā)送給點分區(qū)。分別對邊分區(qū)和點分區(qū)發(fā)送的消息量進行統(tǒng)計,結(jié)果表明邊分區(qū)給點分區(qū)發(fā)送的數(shù)據(jù)塊大小在3~4 MB,點分區(qū)給邊分區(qū)發(fā)送的數(shù)據(jù)塊大小在2~3 MB。當各個點分區(qū)接近收斂狀態(tài),產(chǎn)生的網(wǎng)絡(luò)通信量共1 950 MB?;陧旤c為中心的異步迭代通過頂點更新次數(shù)統(tǒng)計網(wǎng)絡(luò)通信開銷,統(tǒng)計發(fā)現(xiàn)以頂點為中心的異步迭代方式下每個頂點平均更新9 次達到收斂狀態(tài)。對圖數(shù)據(jù)進行多次異步迭代后,網(wǎng)絡(luò)通信量均值為2 520 MB。

根據(jù)以上分析,圖7給出了不同迭代方式下的圖迭代的收斂時間和通信開銷。從圖7可以看到,與全局同步迭代方式相比,以子圖為中心的異步迭代不僅能有效降低收斂速度同時能提高通信效率。與頂點為中心的異步迭代方式相比,基于子圖為中心的異步更新方式在收斂時間上雖略有增長,但是能夠顯著降低通信開銷。

圖7 不同迭代方式下的收斂時間和通信開銷

5.6 原因分析

圖3 表明全局同步迭代輪數(shù)較少,但是由于存在路障限制,全局同步迭代需要等到所有邊分區(qū)的結(jié)果全部到達才能對頂點的狀態(tài)進行更新。圖4 中異步更新處理的數(shù)據(jù)塊個數(shù)雖然較多,但只要迭代算法滿足異步更新條件,點分區(qū)收到任一邊分區(qū)的局部聚合結(jié)果,就能夠立即從緩存隊列中取出數(shù)據(jù)塊進行處理,因此基于子圖的異步更新方式能極大縮短圖的收斂時間。由于整個迭代過程時間較短,這也使得異步迭代產(chǎn)生的通信量遠少于全局同步產(chǎn)生的通信量。

另外,以頂點為中心的異步迭代以頂點為單位更新頂點狀態(tài),以子圖為中心的異步迭代以數(shù)據(jù)塊為單位更新頂點狀態(tài),因此以頂點為單位進行異步更新能夠更快地加速圖狀態(tài)收斂。相比較于以頂點的異步迭代在收斂時間上略有增長,基于子圖的異步迭代可以通過以下方式極大提高通信效率:

(1)大部分網(wǎng)絡(luò)拓撲服從冪律分布,網(wǎng)絡(luò)中連接邊的數(shù)量遠遠超過頂點的個數(shù),因此以頂點為對象發(fā)送消息的次數(shù)遠少于跨連接邊發(fā)送消息的次數(shù)[18]。

(2)基于子圖的劃分方式將大量頂點連接邊存放在同一分區(qū),少部分頂點的連接邊分布在不同的邊分區(qū),這種存儲方式不僅減小了消息發(fā)送的次數(shù),基于子圖為中心的異步迭代通過在邊分區(qū)內(nèi)通過聚合機制獲取頂點的局部狀態(tài)后,以批量方式集中將分區(qū)的局部聚合結(jié)果發(fā)送給點分區(qū)。

(3)單個頂點在同一邊分區(qū)中存在多條連接邊,更新后的頂點狀態(tài)發(fā)往同一邊分區(qū)后在很大程度上能夠?qū)旤c狀態(tài)信息重用,因此進一步減少了同一頂點跨越計算節(jié)點發(fā)送消息的次數(shù)。

6 結(jié)束語

Spark 環(huán)境下的Graphx 圖處理系統(tǒng)要求子圖之間的計算任務(wù)保持全局同步,因此限制了圖迭代的收斂速度[19]。根據(jù)Spark 環(huán)境下圖切分和數(shù)據(jù)存儲的特點,本文提出了一種基于子圖的異步迭代更新方法。實驗結(jié)果表明,該方法能夠有效提高圖迭代的收斂速度,同時降低網(wǎng)絡(luò)通信開銷。未來,將對方法的擴展性[20]作進一步研究。

猜你喜歡
子圖頂點全局
Cahn-Hilliard-Brinkman系統(tǒng)的全局吸引子
量子Navier-Stokes方程弱解的全局存在性
過非等腰銳角三角形頂點和垂心的圓的性質(zhì)及應(yīng)用(下)
過非等腰銳角三角形頂點和垂心的圓的性質(zhì)及應(yīng)用(上)
臨界完全圖Ramsey數(shù)
不含3K1和K1+C4為導(dǎo)出子圖的圖色數(shù)上界?
關(guān)于l-路和圖的超歐拉性
落子山東,意在全局
新思路:牽一發(fā)動全局
圖G(p,q)的生成子圖的構(gòu)造與計數(shù)
措勤县| 偏关县| 莱芜市| 张家界市| 太湖县| 葵青区| 当雄县| 白城市| 东方市| 桃园县| 额济纳旗| 文成县| 溧阳市| 陈巴尔虎旗| 巫山县| 谢通门县| 黄陵县| 若尔盖县| 防城港市| 新丰县| 开化县| 武乡县| 惠水县| 尤溪县| 拉孜县| 长兴县| 禄丰县| 鸡西市| 托里县| 上思县| 加查县| 石家庄市| 江北区| 阜城县| 义马市| 巴南区| 阿坝县| 防城港市| 曲阜市| 闻喜县| 四子王旗|