柴昱含 李道全
摘要:針對(duì)詐騙短信近年來出現(xiàn)的主叫號(hào)碼多變、被叫號(hào)碼具有隨機(jī)性、短信內(nèi)容難以識(shí)別等新特性,在綜合分析數(shù)據(jù)時(shí),需要實(shí)時(shí)的處理海量的數(shù)據(jù),而現(xiàn)有數(shù)據(jù)并發(fā)量大、實(shí)時(shí)性要求高等特點(diǎn),所以采用什么樣的技術(shù)去處理這樣的數(shù)據(jù)十分關(guān)鍵。針對(duì)以上問題,對(duì)實(shí)時(shí)處理技術(shù)進(jìn)行調(diào)研。由于Storm在海量數(shù)據(jù)實(shí)時(shí)處理方面具有很好的特性,對(duì)Storm進(jìn)行相關(guān)調(diào)研。
關(guān)鍵詞:Storm;詐騙短信;大數(shù)據(jù);Topology;實(shí)時(shí)流
中圖分類號(hào):TP311 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1009-3044(2014)16-3768-06
Abstract: In recent years, in the area of SMS scam, the calling numbers are highly changeable, the called numbers are of great randomicity, and the contents are unidentifiable. With these new features, an enormously great amount of data should be processed simultaneously when analyzing relevant data. Due to the great amount and the simultaneous nature, it is critical that what technique should be adopted to process these data. For the above issues, simultaneously processing technique is studied. As Storm boasts of brilliant capabilities in simultaneous mass data processing, in this paper Storm is investigated.
Key words: Storm ;SMS scam;mass dada;Topology;real-time streaming
實(shí)時(shí)數(shù)據(jù)流應(yīng)用的一個(gè)普通模式是對(duì)輸入數(shù)據(jù)進(jìn)行滾動(dòng)計(jì)數(shù),也被稱為滑動(dòng)窗口分析.對(duì)于滾動(dòng)計(jì)數(shù)的一個(gè)典型應(yīng)用是在一個(gè)用戶社區(qū)內(nèi)分析熱門話題-例如在Twitter-當(dāng)一個(gè)話題已經(jīng)在一個(gè)給定的時(shí)間窗口內(nèi)排名前N位時(shí),則其已經(jīng)是一個(gè)熱門話題.本篇文章將介紹如何基于分布式和可擴(kuò)展的實(shí)時(shí)流數(shù)據(jù)處理平臺(tái)Storm實(shí)現(xiàn)這個(gè)算法.相同的代碼也能使用在其它領(lǐng)域例如安全監(jiān)控。
1 Storm簡介
1.1 Storm基本概念
Storm是一個(gè)分布式的實(shí)時(shí)處理系統(tǒng),由主節(jié)點(diǎn)和從節(jié)點(diǎn)構(gòu)成。其中,主節(jié)點(diǎn)只有一個(gè),并運(yùn)行名為“Nimbus”的守護(hù)進(jìn)程;從節(jié)點(diǎn)有多個(gè),每個(gè)工作節(jié)點(diǎn)都運(yùn)行一個(gè)名為“Supervisor”的守護(hù)進(jìn)程?!癗imbus”進(jìn)程用于分配代碼、布置任務(wù)及故障檢測,“Supervisor”進(jìn)程用于監(jiān)聽工作,開始并終止工作進(jìn)程。Nimbus和Supervisor都能快速失敗,而且是無狀態(tài)的,這樣一來它們就變得十分健壯。主節(jié)點(diǎn)和從節(jié)點(diǎn)通過ZooKeeper來進(jìn)行交互,主節(jié)點(diǎn)通過ZooKeeper來發(fā)布指令,從節(jié)點(diǎn)從ZooKeeper讀取指令并執(zhí)行。ZooKeeper用于管理集群中的不同組件,ZeroMQ是內(nèi)部消息系統(tǒng)。
Storm的基本元素包括Topology、Stream、Spout、Bolt等。
Topology[2]:一個(gè)計(jì)算任務(wù)被稱為一個(gè)Topology,由多個(gè)Spout和Bolt組成。Topology任務(wù)一旦提交將會(huì)一直運(yùn)行,除非主動(dòng)停止任務(wù),如圖2。
Stream:即數(shù)據(jù)流,是Storm中對(duì)數(shù)據(jù)的抽象,它是時(shí)間上無界的tuple元組序列。在Topology中,Spout是Stream的發(fā)射器,從特定數(shù)據(jù)源獲取數(shù)據(jù)發(fā)射Stream;一個(gè)Bolt可以接收多個(gè)Stream作為輸入,然后對(duì)數(shù)據(jù)進(jìn)行加工處理, Bolt還可以發(fā)射新的Stream給下級(jí)Bolt進(jìn)行處理。
Spout:消息源,可從外部獲取數(shù)據(jù)并將獲取的數(shù)據(jù)作為消息源提交給Topology。Spout包括可靠消息源和不可靠消息源兩類,可靠消息源將會(huì)對(duì)沒有被成功處理的tuple進(jìn)行重發(fā),不可靠消息源不會(huì)重發(fā)。
Bolt:消息處理單元,可以執(zhí)行過濾, 聚合, 查詢等操作。
Topology[3]定義代碼示例:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“spout”, new KafkaSpout(),2);
builder.setBolt(“smsBolt”, new SMSParerBolt (), 8).shuffleGrouping(“spout”);
builder.setBolt(“filterBolt”, new FilterCallingNumberBolt (), 2)
. shuffleGrouping (“smsBolt”). setNumTasks(4);
通過TopologyBuilder的setSpout()方法設(shè)置Spout,示例中”spout”為Spout的別名,KafkaSpout為Spout的一個(gè)實(shí)現(xiàn)類,后面的數(shù)字2為并發(fā)線程數(shù)。通過setBolt()方法設(shè)置Bolt,實(shí)例中分別設(shè)置了別名為”smsBolt”、線程并發(fā)度為8的SMSParerBolt和別名為”filterBolt”、線程并發(fā)度為2的FilterCallingNumberBolt,setBolt后的shuffleGrouping為流分組策略,setNumTasks為該Bolt對(duì)應(yīng)Task數(shù)。關(guān)于并發(fā)度、流分組策略、Task數(shù)的概念將在之后小節(jié)介紹。endprint
1.2 Storm流分組策略
流分組策略(StreamGrouping)[4],用于設(shè)置Bolt的Task間數(shù)據(jù)的分配策略,包括以下幾類:
Shuffle Grouping:隨機(jī)分組,隨機(jī)派發(fā)stream里面的tuple,保證每個(gè)bolt接收到的tuple數(shù)目大致相同。
Fields Grouping:按字段分組,比如按userid來分組, 具有同樣userid的tuple會(huì)被分到相同的Bolts里的一個(gè)task, 而不同的userid則會(huì)被分配到不同的bolts里的task。
All Grouping:廣播發(fā)送,對(duì)于每一個(gè)tuple,所有的bolts都會(huì)收到。
Global Grouping:全局分組,這個(gè)tuple被分配到storm中的一個(gè)bolt的其中一個(gè)task。再具體一點(diǎn)就是分配給id值最低的那個(gè)task。
Non Grouping:不分組,這個(gè)分組的意思是說stream不關(guān)心到底誰會(huì)收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點(diǎn)不同的是storm會(huì)把這個(gè)bolt放到這個(gè)bolt的訂閱者同一個(gè)線程里面去執(zhí)行。
Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè)task處理這個(gè)消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發(fā)射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會(huì)返回task的id)。
Local or shuffle grouping:如果目標(biāo)bolt有一個(gè)或者多個(gè)task在同一個(gè)工作進(jìn)程中,tuple將會(huì)被隨機(jī)發(fā)生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
2 基于Storm的滑動(dòng)窗口實(shí)現(xiàn)
2.1 熱門話題和滑動(dòng)窗口
首先,解釋一下什么是熱門話題,以便于我們有一個(gè)共識(shí)。
2.1.1 熱門話題
一個(gè)單詞、短語或話題相比其它的標(biāo)簽有更多地被標(biāo)記的概率則被稱為熱門話題。成為熱門話題一方面來自于用戶的一致關(guān)注,另一方面因?yàn)槟承┨厥馐录鹑藗兊年P(guān)注。這些話題有助于用戶了解當(dāng)前世界正在發(fā)生什么。
換一句話,它說明了在一個(gè)用戶社區(qū)內(nèi)“熱點(diǎn)是什么”。通常,你會(huì)對(duì)一個(gè)給定時(shí)間范圍內(nèi)的熱門話題感興趣,例如,在過去五分鐘內(nèi)或一天內(nèi)最流行的話題。所以,對(duì)于“熱點(diǎn)是什么”更準(zhǔn)確的描述應(yīng)該是“今天的熱點(diǎn)是什么”或“這周的熱點(diǎn)是什么”。
在這篇文章中,我們假設(shè)我們有一個(gè)系統(tǒng),這個(gè)系統(tǒng)使用TwitterAPI去獲取最新的數(shù)據(jù)。更進(jìn)一步假設(shè),我們有一種機(jī)制可以從Twitter的消息中以單詞的形式標(biāo)識(shí)主題。例如,我們可以選擇用一個(gè)簡單的模式匹配算法處理主題標(biāo)簽#。
我們?cè)O(shè)計(jì)我們的系統(tǒng),如果一個(gè)主題A比主題B更多次數(shù)被提到,我們則認(rèn)為主題A比主題B更流行。這意味著我們只需要去統(tǒng)計(jì)Twitter中主題被引用的次數(shù)。
對(duì)于本文我們不關(guān)心這些主題是怎樣從用戶內(nèi)容或用戶活動(dòng)中衍生出來的,只要知道這些衍生出來的主題用文本的方式表示就行了。然后,Storm的拓?fù)溥壿嫊?huì)通過滾動(dòng)計(jì)算和排名計(jì)算分析出輸入數(shù)據(jù)的實(shí)時(shí)熱門話題。前期關(guān)注去對(duì)一定時(shí)間范圍內(nèi)用戶輸入數(shù)據(jù)的過濾,后期關(guān)注與對(duì)熱門話題的排名。
我們期望Storm拓?fù)溥壿嫸ㄆ诘漠a(chǎn)生TopN熱門話題,就像下面的輸出,t0-t2是不同的時(shí)間點(diǎn):
Rank @ t0 ——> t1 ——> t2 —————————————————————— 1. java (33) ruby (41) scala (32) 2. php (30) scala (28) python (29) 3. scala (21) java (27) ruby (24) 4. ruby (16) python (21) java (21) 5. python (15) php (14) erlang (18)
在這個(gè)例子中我們可以看出“scala”已經(jīng)成為最熱門話題。
2.1.2 滑動(dòng)窗口
在之前的背景介紹中,我想要說明的是滑動(dòng)窗口即滾動(dòng)計(jì)算。一張圖片勝過千言萬語:
在上述例子中,對(duì)滑動(dòng)窗口內(nèi)的數(shù)據(jù)求和。
一個(gè)公式可能是很好的解釋:
從大小到時(shí)間:如果我們假設(shè)窗口每5分鐘前進(jìn)一次,則輸入數(shù)據(jù)中每個(gè)塊內(nèi)的數(shù)字代表過去相同時(shí)間間隔內(nèi)收集的數(shù)據(jù)量。在本例中,窗口大小是N*m分鐘。簡單的說,如果N=1和m=5,則我們的滑動(dòng)窗口算法沒一分鐘提交過去五分鐘的數(shù)據(jù)。
現(xiàn)在我們已經(jīng)介紹過了熱門話題和時(shí)間窗口,我們最后來談?wù)勅绾瓮ㄟ^代碼實(shí)現(xiàn)。
2.2 滑動(dòng)窗口的實(shí)現(xiàn)
2.2.1 實(shí)現(xiàn)數(shù)據(jù)結(jié)構(gòu)
接下來我們介紹一下核心數(shù)據(jù)結(jié)構(gòu)。正如你所看到的,一個(gè)有趣的特性是這些數(shù)據(jù)結(jié)構(gòu)與Storm的內(nèi)部特性是完全獨(dú)立的。我們的Storm bolts將要使用它們,當(dāng)然,數(shù)據(jù)結(jié)構(gòu)對(duì)于Storm沒有依賴。
計(jì)數(shù)所使用的類:SlotBasedCounter, SlidingWindowCounter
排名所使用的類:Rankings, Rankable, RankableObjectWithFields
另一個(gè)顯著的改善是代碼刪除了不必的代碼并且使用線程相關(guān)代碼,例如同步或手動(dòng)啟動(dòng)后臺(tái)線程。并且數(shù)據(jù)結(jié)構(gòu)也不與系統(tǒng)時(shí)間交互。消除直接調(diào)用系統(tǒng)時(shí)間并且手動(dòng)開啟后臺(tái)線程是得新代碼比老代碼更簡單和容易測試。endprint
// such code from the old RollingCountObjects bolt is not needed anymore
long delta = millisPerBucket(_numBuckets) - (System.currentTimeMillis() % millisPerBucket(_numBuckets));
Utils.sleep(delta);
SlotBasedCounter
SlotBasedCounter類提供了對(duì)于對(duì)象計(jì)數(shù)的功能。用于計(jì)數(shù)的Slots的數(shù)量是固定的。當(dāng)前類提供了四個(gè)公共方法:
public void incrementCount(T obj, int slot);
public void wipeSlot(int slot):
public long getCount(T obj, int slot)
public Map
例子:
SlotBasedCounter counter = new SlotBasedCounter
Object trackMe = ...;
int currentSlot = 0;
counter.incrementCount(trackMe, currentSlot);
long counts = counter.getCount(trackMe, currentSlot);
Map
SlotBasedCounter內(nèi)部使用Map
在上面的例子中SlotBasedCounter有5個(gè)slot用于計(jì)數(shù)。
SlotBasedCounter是我們可以使用的一個(gè)比較原始的類,它是滑動(dòng)計(jì)數(shù)窗口的一個(gè)組成部分,接下來我們繼續(xù)進(jìn)行介紹。
2.2.2 SlidingWindowCounter
SlidingWindowCounter類提供了滾動(dòng)計(jì)數(shù)的功能。它的計(jì)數(shù)功能基于SlotBasedCounter類。滑動(dòng)窗口的大小與SlidingWindowCounter實(shí)例的slot的數(shù)量是相等的。RollingCountBolt使用它對(duì)輸入的tuple進(jìn)行計(jì)數(shù)。
這個(gè)類提供了2個(gè)方法:
public void incrementCount(T obj);
Map
讀者可能驚奇的發(fā)現(xiàn)我們的滑動(dòng)窗口與時(shí)間沒有什么關(guān)聯(lián),因?yàn)橥ǔ;瑒?dòng)窗口是基于時(shí)間的。在我們的例子中,窗口不隨著時(shí)間前進(jìn),除了調(diào)用getCountsThenAdvanceWindow方法。這意味著SlidingWindowCounter的行為就像一個(gè)環(huán)形緩沖區(qū),從一個(gè)窗口前進(jìn)到下一個(gè)窗口。
請(qǐng)注意,例子中是一個(gè)8個(gè)slot的滑動(dòng)窗口,其中每個(gè)slot只顯示了一個(gè)計(jì)數(shù)器。實(shí)際上有多個(gè)計(jì)數(shù)器對(duì)對(duì)象進(jìn)行跟蹤。
下面是一個(gè)圖解,展示了SlidingWindowCounter多次迭代的效果:
2.2.3 Rankings and Rankable
Rankings類用于一定數(shù)量的排名,例如前十名。它依據(jù)對(duì)象的自然順序從大到小進(jìn)行排序。這個(gè)類由AbstractRankerBolt使用,它的bolt用于跟蹤過去一段時(shí)間內(nèi)對(duì)象的當(dāng)前排名情況。
這個(gè)類有5個(gè)方法:
public void updateWith(Rankable r);
public void updateWith(Rankings other);
public List
public int maxSize();
public int size();
無論什么時(shí)候你更新排名的時(shí)候,它都會(huì)丟棄低于topN的數(shù)據(jù),N是排名的最大數(shù)量。
正常情況下我們的排序是依據(jù)對(duì)象的自然次序。在我們的特殊例子中,我創(chuàng)建了一個(gè)Rankable接口,這個(gè)接口實(shí)現(xiàn)了Comparable。實(shí)際情況中,你可以傳遞一個(gè)Rankable對(duì)象到Rankings類中,這樣排序條件將會(huì)更新。
Rankings topTen = new Rankings(10);
Rankable C = ...;
topTen.updateWith(r);
List
實(shí)現(xiàn)了Rankable的具體類是RankableObjectWithFields。IntermediateRankingsBolt類通過工廠方法將輸入數(shù)據(jù)創(chuàng)建為Rankable對(duì)象。
@Override
void updateRankingsWithTuple(Tuple tuple) {
客服熱線:400-656-5456??客服專線:010-56265043??電子郵箱:longyuankf@126.com
電信與信息服務(wù)業(yè)務(wù)經(jīng)營許可證:京icp證060024號(hào)
Dragonsource.com Inc. All Rights Reserved
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔細(xì)看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去實(shí)現(xiàn)這些類并且你是一個(gè)有經(jīng)驗(yàn)的工程師,那么你一定會(huì)實(shí)現(xiàn)equals() 和 hashCode()方法。
2.2.4 實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
在閱讀這一部分的時(shí)候,“words”代表了我們所設(shè)想的系統(tǒng)中用戶提到的主題。
Rolling Top Words拓?fù)溥壿嬘蒚estWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt組成
滑動(dòng)窗口的大小和提交頻率只是個(gè)例子,在我們例子中有一個(gè)5分鐘的滑動(dòng)窗口并且每分鐘提交一次。
主要工作如下:
1)拓?fù)溥壿嫷牡谝粚覶estWordSpout模擬輸入數(shù)據(jù)-用戶提到的主題。
2)第二層RollingCountBolt對(duì)輸入數(shù)據(jù)進(jìn)行計(jì)數(shù)
3)第三層IntermediateRankingsBolt對(duì)數(shù)據(jù)進(jìn)行排名
4)最后,TotalRankingsBolt匯聚數(shù)據(jù),輸出總排名。
拓?fù)溥壿嫶a如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
參考文獻(xiàn):
[1] 張春麟.手機(jī)垃圾短信過濾平臺(tái)的分析與應(yīng)用[D].北京:北京郵電大學(xué),2010.
[2] 互聯(lián)網(wǎng)文檔資源.storm-0.8.2源碼分析之topology啟動(dòng)[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互聯(lián)網(wǎng)文檔資源.Storm-源碼分析-Component,Executor,Task之間關(guān)系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互聯(lián)網(wǎng)文檔資源.使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互聯(lián)網(wǎng)文檔資源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互聯(lián)網(wǎng)文檔資源.徐明明.Twitter Storm: Transactional Topolgoy簡介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互聯(lián)網(wǎng)文檔資源.Storm之trident聚合操作介紹[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互聯(lián)網(wǎng)文檔資源.[翻譯][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔細(xì)看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去實(shí)現(xiàn)這些類并且你是一個(gè)有經(jīng)驗(yàn)的工程師,那么你一定會(huì)實(shí)現(xiàn)equals() 和 hashCode()方法。
2.2.4 實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
在閱讀這一部分的時(shí)候,“words”代表了我們所設(shè)想的系統(tǒng)中用戶提到的主題。
Rolling Top Words拓?fù)溥壿嬘蒚estWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt組成
滑動(dòng)窗口的大小和提交頻率只是個(gè)例子,在我們例子中有一個(gè)5分鐘的滑動(dòng)窗口并且每分鐘提交一次。
主要工作如下:
1)拓?fù)溥壿嫷牡谝粚覶estWordSpout模擬輸入數(shù)據(jù)-用戶提到的主題。
2)第二層RollingCountBolt對(duì)輸入數(shù)據(jù)進(jìn)行計(jì)數(shù)
3)第三層IntermediateRankingsBolt對(duì)數(shù)據(jù)進(jìn)行排名
4)最后,TotalRankingsBolt匯聚數(shù)據(jù),輸出總排名。
拓?fù)溥壿嫶a如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
參考文獻(xiàn):
[1] 張春麟.手機(jī)垃圾短信過濾平臺(tái)的分析與應(yīng)用[D].北京:北京郵電大學(xué),2010.
[2] 互聯(lián)網(wǎng)文檔資源.storm-0.8.2源碼分析之topology啟動(dòng)[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互聯(lián)網(wǎng)文檔資源.Storm-源碼分析-Component,Executor,Task之間關(guān)系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互聯(lián)網(wǎng)文檔資源.使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互聯(lián)網(wǎng)文檔資源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互聯(lián)網(wǎng)文檔資源.徐明明.Twitter Storm: Transactional Topolgoy簡介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互聯(lián)網(wǎng)文檔資源.Storm之trident聚合操作介紹[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互聯(lián)網(wǎng)文檔資源.[翻譯][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔細(xì)看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去實(shí)現(xiàn)這些類并且你是一個(gè)有經(jīng)驗(yàn)的工程師,那么你一定會(huì)實(shí)現(xiàn)equals() 和 hashCode()方法。
2.2.4 實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
實(shí)現(xiàn)Rolling Top Words 拓?fù)溥壿?/p>
在閱讀這一部分的時(shí)候,“words”代表了我們所設(shè)想的系統(tǒng)中用戶提到的主題。
Rolling Top Words拓?fù)溥壿嬘蒚estWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt組成
滑動(dòng)窗口的大小和提交頻率只是個(gè)例子,在我們例子中有一個(gè)5分鐘的滑動(dòng)窗口并且每分鐘提交一次。
主要工作如下:
1)拓?fù)溥壿嫷牡谝粚覶estWordSpout模擬輸入數(shù)據(jù)-用戶提到的主題。
2)第二層RollingCountBolt對(duì)輸入數(shù)據(jù)進(jìn)行計(jì)數(shù)
3)第三層IntermediateRankingsBolt對(duì)數(shù)據(jù)進(jìn)行排名
4)最后,TotalRankingsBolt匯聚數(shù)據(jù),輸出總排名。
拓?fù)溥壿嫶a如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
參考文獻(xiàn):
[1] 張春麟.手機(jī)垃圾短信過濾平臺(tái)的分析與應(yīng)用[D].北京:北京郵電大學(xué),2010.
[2] 互聯(lián)網(wǎng)文檔資源.storm-0.8.2源碼分析之topology啟動(dòng)[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互聯(lián)網(wǎng)文檔資源.Storm-源碼分析-Component,Executor,Task之間關(guān)系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互聯(lián)網(wǎng)文檔資源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互聯(lián)網(wǎng)文檔資源.使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互聯(lián)網(wǎng)文檔資源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互聯(lián)網(wǎng)文檔資源.徐明明.Twitter Storm: Transactional Topolgoy簡介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互聯(lián)網(wǎng)文檔資源.Storm之trident聚合操作介紹[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互聯(lián)網(wǎng)文檔資源.[翻譯][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint