趙潤(rùn)發(fā),婁淵勝+,葉 楓,石 宏
(1.河海大學(xué) 計(jì)算機(jī)與信息學(xué)院,江蘇 南京 211100; 2.南京廣廈軟件有限公司 工業(yè)大數(shù)據(jù)開(kāi)發(fā)部,江蘇 南京 210000)
針對(duì)工業(yè)大數(shù)據(jù)平臺(tái)的研究[1-4],文獻(xiàn)[5]采用Dubbo與NoSQL構(gòu)建了工業(yè)領(lǐng)域大數(shù)據(jù)平臺(tái),為工業(yè)領(lǐng)域不斷增長(zhǎng)的數(shù)據(jù)提供了解決辦法。文獻(xiàn)[6]提出了一個(gè)面向工業(yè)的數(shù)據(jù)處理系統(tǒng),其以Spark為框架,選取MySQL和HDFS為存儲(chǔ)介質(zhì),實(shí)現(xiàn)了工業(yè)數(shù)據(jù)的快速分析。文獻(xiàn)[7]將物聯(lián)網(wǎng)與大數(shù)據(jù)相結(jié)合,構(gòu)建了一個(gè)工廠能耗分析平臺(tái),實(shí)現(xiàn)了能耗數(shù)據(jù)的查詢以及數(shù)據(jù)的分析。
目前工業(yè)大數(shù)據(jù)平臺(tái)已得到廣泛研究,但工業(yè)大數(shù)據(jù)平臺(tái)技術(shù)架構(gòu)不同,差異性較大,再者對(duì)于很多工業(yè)大數(shù)據(jù)平臺(tái)而言,其數(shù)據(jù)處理效率較低,預(yù)警時(shí)間較長(zhǎng)。針對(duì)上述問(wèn)題,本文研究了一種基于Flink的工業(yè)大數(shù)據(jù)平臺(tái),主要貢獻(xiàn)如下:
(1)采用Kafka和Flink進(jìn)行集成,對(duì)數(shù)據(jù)進(jìn)行傳輸和處理,并將處理過(guò)的數(shù)據(jù)按照類(lèi)型存儲(chǔ)至數(shù)據(jù)池中;
(2)利用Flink對(duì)工業(yè)大數(shù)據(jù)進(jìn)行預(yù)處理,提高平臺(tái)運(yùn)行的準(zhǔn)確性;
(3)采用多種大數(shù)據(jù)技術(shù),實(shí)現(xiàn)工業(yè)大數(shù)據(jù)平臺(tái)的數(shù)據(jù)查詢以及預(yù)警功能,且相對(duì)于典型大數(shù)據(jù)平臺(tái)而言,速度更快、效率更高。
Apache Flink是一個(gè)分布式處理框架,可在無(wú)邊界和有邊界數(shù)據(jù)流上進(jìn)行計(jì)算[8]。Flink不僅能運(yùn)行在YARN、Mesos等資源管理框架上,而且能在單獨(dú)集群中運(yùn)行,適用于具有不可靠數(shù)據(jù)源、海量數(shù)據(jù)處理等場(chǎng)景。此平臺(tái)采用Flink的最主要原因是:工業(yè)大數(shù)據(jù)類(lèi)型雜,既包括流數(shù)據(jù),又包括批數(shù)據(jù),而Flink兩者都可以處理。它適用的主要場(chǎng)景是流數(shù)據(jù)方面的,而批數(shù)據(jù)是“特殊的流數(shù)據(jù)”,所有任務(wù)都可以當(dāng)成流來(lái)處理[8],并且數(shù)據(jù)處理延遲性較低。其架構(gòu)[8]如圖1所示。
對(duì)于流數(shù)據(jù)應(yīng)用來(lái)說(shuō),F(xiàn)link提供DataStream API。對(duì)于批數(shù)據(jù)處理應(yīng)用來(lái)說(shuō),提供DataSet API。它支持Java和Scala語(yǔ)言,同時(shí)支持Kafka的輸入數(shù)據(jù)和ElasticSearch、MySQL、InfluxDB多種數(shù)據(jù)庫(kù)。Flink同時(shí)具有高度靈活的窗口操作,包括time、count等窗口操作,如:每隔多久發(fā)送數(shù)據(jù)至客戶端、每次發(fā)送數(shù)據(jù)的個(gè)數(shù)等,十分適用于工業(yè)場(chǎng)景。
圖1 Flink架構(gòu)
Kafka是一個(gè)基于Zookeeper的分布式消息系統(tǒng),它具有高吞吐、低延遲、可靠性好、容錯(cuò)能力強(qiáng)的良好特性[9]。低延遲體現(xiàn)在Kafka每秒能夠處理巨量信息且延遲很低,只有幾毫秒,適用于工業(yè)生產(chǎn)過(guò)程中海量數(shù)據(jù)的處理;高吞吐率體現(xiàn)在即使應(yīng)用在廉價(jià)的商用機(jī)器上,Kafka也能進(jìn)行每秒100 K消息的傳輸。Kafka也較為可靠,傳輸?shù)臄?shù)據(jù)可以在本地磁盤(pán)持久保存,同時(shí)數(shù)據(jù)會(huì)自動(dòng)進(jìn)行備份,數(shù)據(jù)丟失后仍可找到數(shù)據(jù)。Kafka容錯(cuò)性較好,集群中節(jié)點(diǎn)是允許失敗的(如副本數(shù)量為n,則n-1個(gè)節(jié)點(diǎn)是允許失敗的)[9]。此平臺(tái)選用Kafka消息隊(duì)列能夠更好地解耦,也增強(qiáng)了平臺(tái)的擴(kuò)展性,即使企業(yè)數(shù)據(jù)發(fā)生改變,不需要改變代碼和調(diào)節(jié)參數(shù)就可以輕松實(shí)現(xiàn)用戶要求。同時(shí)也保證了數(shù)據(jù)傳送的順序性和安全性。
Grafana是一個(gè)可靠性較好的可視化和度量分析工具。它具有靈活和快捷的客戶端圖表,有多種可視化指標(biāo)和面板插件,官方庫(kù)里有圖表、折線圖、文本文檔等豐富的儀表盤(pán)插件;它支持多種數(shù)據(jù)庫(kù)如:MySQL、InfluxDB、Prometheus、OpenTSDB、Elasticsearch和KairosDB等等;Grafana可通過(guò)直觀的可視化方式進(jìn)行預(yù)警并發(fā)送通知,當(dāng)獲得的數(shù)據(jù)大于用戶設(shè)定的閾值時(shí)通知Slack、DingDing、Email等;并且數(shù)據(jù)源不同,但仍可以使用在同一圖表中,數(shù)據(jù)源的來(lái)源可以根據(jù)每個(gè)查詢決定,也可以自定義數(shù)據(jù)源;Grafana同時(shí)具有豐富的注釋圖,注釋圖表能顯示不同數(shù)據(jù)源的豐富事件,當(dāng)鼠標(biāo)停留在圖表時(shí),會(huì)以全面的標(biāo)記來(lái)顯示出元數(shù)據(jù)。
InfluxDB是一個(gè)用于處理海量數(shù)據(jù)寫(xiě)入與數(shù)據(jù)查詢的時(shí)間序列數(shù)據(jù)庫(kù),應(yīng)用于有大量時(shí)間戳數(shù)據(jù)的場(chǎng)景下,例如DevOps(過(guò)程、方法、系統(tǒng))監(jiān)控,物聯(lián)網(wǎng)工業(yè)數(shù)據(jù)實(shí)時(shí)分析等。它是分布式擴(kuò)展的,不依賴外部任何條件。它還可以對(duì)ETL進(jìn)行后臺(tái)處理并實(shí)時(shí)監(jiān)控預(yù)警。它有類(lèi)似SQL的查詢語(yǔ)言,可輕松方便查詢到需要的數(shù)據(jù)。不僅如此,InfluxDB連續(xù)查詢自動(dòng)計(jì)算聚合數(shù)據(jù),大大提高了頻繁查詢的效率。本平臺(tái)中的數(shù)據(jù)量較大,時(shí)間戳數(shù)據(jù)較多,因此InfluxDB是工業(yè)大數(shù)據(jù)存儲(chǔ)的絕佳選擇。
本平臺(tái)旨在實(shí)現(xiàn)一個(gè)能滿足對(duì)工業(yè)大數(shù)據(jù)進(jìn)行存儲(chǔ)、集成、分析的平臺(tái),能夠?yàn)槠髽I(yè)多種業(yè)務(wù)提高指導(dǎo)和決策支持。其架構(gòu)如圖2所示,其主要分為5個(gè)部分,包括:數(shù)據(jù)源模塊、消息隊(duì)列模塊、數(shù)據(jù)存儲(chǔ)模塊、數(shù)據(jù)處理模塊、可視化模塊。
此平臺(tái)的數(shù)據(jù)源主要分為兩種,一種是靜態(tài)系統(tǒng)數(shù)據(jù),第二種是實(shí)時(shí)流數(shù)據(jù)。數(shù)據(jù)源獲取的方式主要如下:
(1)靜態(tài)系統(tǒng)數(shù)據(jù)一般是由公司專(zhuān)門(mén)人員去收集,如設(shè)備生產(chǎn)日期、企業(yè)名稱(chēng)、設(shè)備編號(hào)等,這些數(shù)據(jù)以特定的形式整理形成一個(gè)Excel表格,能夠直接使用;
(2)大多數(shù)的企業(yè)獲取數(shù)據(jù)的方式都是通過(guò)各種傳感器,傳感器獲取到的設(shè)備的狀態(tài)、運(yùn)行時(shí)間等實(shí)時(shí)數(shù)據(jù),然后將這些數(shù)據(jù)發(fā)送給此平臺(tái)的處理系統(tǒng);
(3)企業(yè)的很多數(shù)據(jù)會(huì)分布在不同地區(qū)的不同公司,所以這時(shí)候它們通常會(huì)以日志的形式存在,而Flume是一個(gè)很好的日志收集工具[10]。這個(gè)工具能夠?qū)⑦@些日志文件識(shí)別出來(lái),并整理收集在一起,并發(fā)往此工業(yè)大數(shù)據(jù)平臺(tái);
(4)工業(yè)生產(chǎn)中會(huì)產(chǎn)生很多業(yè)務(wù)靜態(tài)數(shù)據(jù),但它們的格式可能不是我們所需要的,此時(shí)可以使用Sqoop數(shù)據(jù)源轉(zhuǎn)換工具,將它們轉(zhuǎn)換為我們所需要的格式,然后再將這些數(shù)據(jù)發(fā)送給工業(yè)大數(shù)據(jù)平臺(tái)。
圖2 體系架構(gòu)
消息隊(duì)列主要指數(shù)據(jù)在傳輸過(guò)程中保存數(shù)據(jù)的一個(gè)容器。工業(yè)大數(shù)據(jù)類(lèi)型多,數(shù)據(jù)量大,面對(duì)此場(chǎng)景使用消息隊(duì)列是一個(gè)極佳的選擇,因?yàn)橄㈥?duì)列能夠極大地降低系統(tǒng)響應(yīng)時(shí)間、提高系統(tǒng)穩(wěn)定性,同時(shí)保證數(shù)據(jù)傳輸?shù)捻樞蛐院桶踩?,最重要的是?shí)現(xiàn)數(shù)據(jù)的異步化,并起到解耦的作用。
此模塊選用Kafka作為消息隊(duì)列系統(tǒng),利用Flink將數(shù)據(jù)源模塊中的實(shí)時(shí)數(shù)據(jù)和批數(shù)據(jù)都暫存至消息隊(duì)列中。Flink作為生產(chǎn)者,會(huì)源源不斷地生產(chǎn)出消息,然后發(fā)送給消息隊(duì)列Kafka中,而Kafka就成為了消費(fèi)者,它會(huì)不斷地從Flink中獲取到消息,從而對(duì)這些數(shù)據(jù)進(jìn)行進(jìn)一步處理。
本模塊主要采用Flink來(lái)處理實(shí)時(shí)大數(shù)據(jù)和離線批數(shù)據(jù)。根據(jù)數(shù)據(jù)類(lèi)型,將此模塊又分為實(shí)時(shí)數(shù)據(jù)處理模塊和批數(shù)據(jù)處理模塊。Flink能夠同時(shí)支持批處理與流處理任務(wù),它包含兩種預(yù)先定義的函數(shù):DataStream API和DataSet API。DataStream API 包括reduce、aggregations、filter等方法。DataSet API包括distinct、Hash-Partition、window等方法。
批數(shù)據(jù)處理模塊中,此平臺(tái)會(huì)利用aggregations中的sum()、min()、max()方法對(duì)批數(shù)據(jù)進(jìn)行統(tǒng)計(jì),求出工業(yè)數(shù)據(jù)的最大值、最小值、總和等,并在前端顯示出來(lái)。
流數(shù)據(jù)處理模塊主要是對(duì)數(shù)據(jù)進(jìn)行預(yù)處理。在工業(yè)大數(shù)據(jù)的實(shí)際生產(chǎn)過(guò)程中,由于人工失誤或者數(shù)據(jù)采集設(shè)備因生產(chǎn)環(huán)境惡劣會(huì)導(dǎo)致收集到的數(shù)據(jù)不準(zhǔn)確,這些數(shù)據(jù)如果直接存入到數(shù)據(jù)庫(kù)中不僅會(huì)降低大數(shù)據(jù)平臺(tái)查詢數(shù)據(jù)的準(zhǔn)確性,而且會(huì)大大降低平臺(tái)的運(yùn)行效率。此模塊主要利用Flink來(lái)去除實(shí)際業(yè)務(wù)處理中的無(wú)效數(shù)據(jù)、重復(fù)數(shù)據(jù)以及缺失率較高的數(shù)據(jù),其預(yù)處理的流程如圖3所示。
圖3 預(yù)處理流程
數(shù)據(jù)預(yù)處理方法具體步驟如下:
(1)首先利用Flink從Kafka中獲取到數(shù)據(jù),然后通過(guò)Flink自帶的RocksDB狀態(tài)后端去重方式對(duì)工業(yè)大數(shù)據(jù)進(jìn)行去重。我們需要開(kāi)啟RocksDB狀態(tài)后端并對(duì)其參數(shù)進(jìn)行配置,如數(shù)據(jù)過(guò)期的時(shí)間、是否過(guò)期的數(shù)據(jù)能再次被訪問(wèn)等,接著注冊(cè)Flink定時(shí)器。我們也可以利用Flink的TTL機(jī)制,打開(kāi)RocksDB狀態(tài)后端的TTL compaction filter,這樣能在后臺(tái)實(shí)現(xiàn)重復(fù)數(shù)據(jù)的自動(dòng)刪除。在處理重復(fù)數(shù)據(jù)時(shí),如果數(shù)據(jù)的key(如事件ID)對(duì)應(yīng)的狀態(tài)不存在,說(shuō)明此數(shù)據(jù)沒(méi)有出現(xiàn)過(guò),可以更新?tīng)顟B(tài)并且輸出數(shù)據(jù)。反之,說(shuō)明此數(shù)據(jù)已經(jīng)出現(xiàn)過(guò),RocksDB就會(huì)將其自動(dòng)刪除。同時(shí)我們可利用Flink SQL提供的distinct去重方法來(lái)統(tǒng)計(jì)重復(fù)數(shù)據(jù)的明細(xì)結(jié)果;
(2)然后對(duì)實(shí)際生產(chǎn)過(guò)程中的無(wú)效數(shù)據(jù)進(jìn)行刪除。這里利用FlinkDataStream API的Evictor()方法對(duì)WindowFunction前后的數(shù)據(jù)進(jìn)行處理。Evictor()方法包括Count-Evictor、DeltaEvictor和TimeEvictor以及自定義的Evictor。CountEvictor是在窗口中設(shè)置保持的數(shù)據(jù)數(shù)量,如:evictor(CountEvictor.of(10000)),意思是窗口中最大的數(shù)據(jù)量為10 000,若大于10 000條,則剔除。在實(shí)際生產(chǎn)過(guò)程中也會(huì)產(chǎn)生很多已過(guò)時(shí)的無(wú)效數(shù)據(jù),其不僅會(huì)影響平臺(tái)數(shù)據(jù)查詢的正確性,而且會(huì)增加平臺(tái)的資源消耗,進(jìn)而影響執(zhí)行效率,而Flink 自帶的TimeEvictor方法能將最新時(shí)間的數(shù)據(jù)篩選出來(lái),去除過(guò)時(shí)的數(shù)據(jù)。其主要將當(dāng)前窗口中最新元素的時(shí)間減去時(shí)間間隔,然后將小于該結(jié)果的數(shù)據(jù)全部剔除。DeltaEvictor方法通過(guò)定義DeltaFunction和指定threshold(閾值),計(jì)算出窗口間數(shù)據(jù)的Delt大小,如果超過(guò)了閾值則將當(dāng)前數(shù)據(jù)元素刪除,這樣可以去除那些因?yàn)闄C(jī)器故障或者外部原因產(chǎn)生的差別較大的無(wú)效數(shù)據(jù)。同時(shí)也可以根據(jù)用戶的需求自定義Evictor方法來(lái)去除那些無(wú)效數(shù)據(jù);
(3)利用步驟(1)中distinct去重方式的Distinct-Accumulator 與CountAccumulator方法統(tǒng)計(jì)單條數(shù)據(jù)value值的數(shù)量,DistinctAccumulator()內(nèi)部包含一個(gè)map結(jié)構(gòu),key包含的是一條數(shù)據(jù)的屬性值,而value則是屬性值出現(xiàn)的次數(shù)。若缺少的value值過(guò)多(大于50%),直接刪除缺失數(shù)據(jù)的記錄。反之認(rèn)定數(shù)據(jù)為有效數(shù)據(jù);
(4)原數(shù)據(jù)經(jīng)過(guò)預(yù)處理后得到新數(shù)據(jù),將這些數(shù)據(jù)存儲(chǔ)至數(shù)據(jù)池中。
通過(guò)一系列的數(shù)據(jù)預(yù)處理,可以有效防止臟數(shù)據(jù)影響平臺(tái)的正常運(yùn)行。
工業(yè)大數(shù)據(jù)異構(gòu)性較強(qiáng),數(shù)據(jù)類(lèi)型較為復(fù)雜,這些數(shù)據(jù)通常以不同形式存儲(chǔ)在不同的數(shù)據(jù)庫(kù)或者數(shù)據(jù)管理系統(tǒng)中,所以管理起來(lái)較為麻煩,因此企業(yè)急需一個(gè)平臺(tái)對(duì)數(shù)據(jù)進(jìn)行統(tǒng)一管理。而此平臺(tái)的數(shù)據(jù)源主要分為兩類(lèi),一類(lèi)是實(shí)時(shí)數(shù)據(jù),另一類(lèi)是工業(yè)批數(shù)據(jù),為了方便管理使用,此平臺(tái)建立一個(gè)數(shù)據(jù)池來(lái)存儲(chǔ)數(shù)據(jù)。實(shí)時(shí)數(shù)據(jù)存放至InfluxDB數(shù)據(jù)庫(kù)中,設(shè)備狀態(tài)、設(shè)備離線事件、設(shè)備事件等信息,靜態(tài)系統(tǒng)數(shù)據(jù)存放至MySQL數(shù)據(jù)庫(kù)中,如:企業(yè)設(shè)備、企業(yè)名稱(chēng)、地址等信息。
數(shù)據(jù)存儲(chǔ)的過(guò)程如下:首先平臺(tái)先判斷獲取到的數(shù)據(jù)的類(lèi)型,若是工業(yè)批數(shù)據(jù)會(huì)利用SQL-query去取出數(shù)據(jù)連接的URL、用戶名和密碼,然后加載SQL-JDBC去連接實(shí)例,并執(zhí)行查詢;若是工業(yè)實(shí)時(shí)數(shù)據(jù)會(huì)先加載NoSQL-query,然后讀取NoSQL連接類(lèi),讀取InfluxDB數(shù)據(jù)庫(kù)自帶配置文件,從而連接實(shí)例,并執(zhí)行查詢。
無(wú)論是聯(lián)機(jī)事務(wù)處理(OLTP),還是聯(lián)機(jī)分析處理(OLAP),都是為了用戶更好地更直觀地獲取到處理到的數(shù)據(jù)結(jié)果,因此考慮一個(gè)與用戶交互性好的前端工具是十分必要的。
本平臺(tái)采用開(kāi)源的Grafana作為可視化分析工具,它不僅支持多種數(shù)據(jù)庫(kù),如IoTDB、MySQL、InfluxDB等,還支持多種數(shù)據(jù)的展示方式,如折線圖、圖表等,以更直觀的形式顯示出數(shù)據(jù),用戶按照各自需求可快速獲取到數(shù)據(jù)且不需要關(guān)心后臺(tái)的具體運(yùn)行過(guò)程。同時(shí)可以對(duì)工業(yè)設(shè)備進(jìn)行預(yù)警,它通過(guò)Slack、DingDing、Email等方式通知企業(yè)數(shù)據(jù)已達(dá)到闕值,從而實(shí)現(xiàn)設(shè)備數(shù)據(jù)的準(zhǔn)確預(yù)警。
首先,數(shù)據(jù)源模塊可采用Flume收集工業(yè)生產(chǎn)過(guò)程中產(chǎn)生的日志,或直接從傳感器中獲取到數(shù)據(jù),并由專(zhuān)門(mén)人員將這些數(shù)據(jù)整理為Excel格式。其次,利用Flink將整理好的數(shù)據(jù)發(fā)送至Kafka消息隊(duì)列中,保證數(shù)據(jù)傳輸?shù)陌踩院晚樞蛐浴H缓罄肍link獲取到暫存至Kafka中的數(shù)據(jù)并對(duì)其進(jìn)行預(yù)處理,去除重復(fù)數(shù)據(jù)、缺失率較高的數(shù)據(jù)、無(wú)效數(shù)據(jù)等,處理好后將其存儲(chǔ)至不同的數(shù)據(jù)庫(kù)中,批數(shù)據(jù)存儲(chǔ)至MySQL中,流數(shù)據(jù)存儲(chǔ)至InfluxDB中。而Flink貫穿整個(gè)運(yùn)行過(guò)程,對(duì)于MySQL中的數(shù)據(jù)可采用DataSet API,InfluxDB中的數(shù)據(jù)采用DataStream API。可視化分析模塊使用Grafana組件,實(shí)現(xiàn)不同類(lèi)型數(shù)據(jù)的增刪改查,同時(shí)也可以對(duì)企業(yè)數(shù)據(jù)進(jìn)行監(jiān)測(cè),若大于預(yù)定的值可通過(guò)郵件的形式進(jìn)行預(yù)警。
此平臺(tái)集群的硬件環(huán)境包含3臺(tái)物理機(jī),一個(gè)為主節(jié)點(diǎn),其余兩個(gè)為子節(jié)點(diǎn),其域名分別為Master、Slave1、Slave2,3臺(tái)機(jī)器均使用8 G內(nèi)存以及1 T的硬盤(pán),使用的操作系統(tǒng)為Centos6.4 64位。Flink集群選擇1.9.3版本。Flink的master進(jìn)程 jobManager放在Slave1中。修改好的配置文件放置在其它節(jié)點(diǎn),并在Slave2的Flink_HOME/conf/slaves目錄下添加 Master、Slave1、Slvae2,這樣可以通過(guò)主節(jié)點(diǎn)免密登錄啟動(dòng)其它的副節(jié)點(diǎn)啟動(dòng)。Kafka應(yīng)注意與Zookeeper 版本之間的兼容性,所以此平臺(tái)選擇了Kafka 2.2.0和Zookeeper3.4.10。前端工具Grafana選擇版本Grafana-6.7.2,此平臺(tái)選用的數(shù)據(jù)庫(kù)為MySQL-5.5和InfluxDB-1.7.3。
本文的實(shí)驗(yàn)數(shù)據(jù)來(lái)自經(jīng)過(guò)數(shù)據(jù)脫敏后的3000家企業(yè)基本信息,10 000余臺(tái)設(shè)備連續(xù)3個(gè)月的運(yùn)行數(shù)據(jù),脫敏簡(jiǎn)要過(guò)程如下:利用Java代碼定義數(shù)據(jù)脫敏的工具類(lèi),涉及到具體公司名時(shí),用*替代。同時(shí)企業(yè)id、設(shè)備id、事件id值重新編號(hào),從而保護(hù)數(shù)據(jù)的安全性。本實(shí)驗(yàn)的采用的數(shù)據(jù)主要分為兩種類(lèi)型,一種是“企業(yè)名單”、“企業(yè)設(shè)備信息”、“測(cè)點(diǎn)名稱(chēng)”等批數(shù)據(jù),此類(lèi)數(shù)據(jù)量為87 000條,描述的是3000家企業(yè)的一些基本信息,如公司名、所處地區(qū)等;另一種是“設(shè)備實(shí)時(shí)事件統(tǒng)計(jì)”、“設(shè)備狀態(tài)時(shí)長(zhǎng)統(tǒng)計(jì)”等實(shí)時(shí)數(shù)據(jù),此類(lèi)數(shù)據(jù)量較大為1 250 000條,描述的是設(shè)備的實(shí)時(shí)狀態(tài)信息,如在某個(gè)時(shí)間段企業(yè)的狀態(tài)等。批數(shù)據(jù)以“企業(yè)名單”為例,包括5個(gè)字段值:企業(yè)id、客戶名稱(chēng)、地址名稱(chēng)、省市區(qū)、公司名,部分?jǐn)?shù)據(jù)見(jiàn)表1。(注:表中只展示了部分?jǐn)?shù)據(jù)集中的某個(gè)表,并不是全部數(shù)據(jù))。
表1 企業(yè)名單
實(shí)時(shí)數(shù)據(jù)以“設(shè)備實(shí)時(shí)事件”數(shù)據(jù)為例,包括4個(gè)字段值:事件id、發(fā)生時(shí)間、設(shè)備id、事件,部分?jǐn)?shù)據(jù)見(jiàn)表2。
表2 設(shè)備實(shí)時(shí)事件
為了驗(yàn)證基于Flink的大數(shù)據(jù)平臺(tái)的有效性,對(duì)此平臺(tái)的各個(gè)模塊進(jìn)行了測(cè)試。
系統(tǒng)實(shí)現(xiàn)具體過(guò)程如下:首先將數(shù)據(jù)源中的批數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)導(dǎo)入到Kafka消息隊(duì)列中。批數(shù)據(jù)的數(shù)據(jù)量較小,導(dǎo)入時(shí)間較短,耗時(shí)3 min 10 s成功將“企業(yè)名單”的信息發(fā)送至Kafka中。而“設(shè)備實(shí)時(shí)事件統(tǒng)計(jì)”實(shí)時(shí)數(shù)據(jù)量較大,耗時(shí)較長(zhǎng),耗時(shí)17 min成功將實(shí)時(shí)數(shù)據(jù)導(dǎo)入到Kafka中。然后利用Flink讀取Kafka數(shù)據(jù)并進(jìn)行預(yù)處理后寫(xiě)入到MySQL與InfluxDB中,預(yù)處理后的重復(fù)數(shù)據(jù)篩選結(jié)果見(jiàn)表3(以實(shí)時(shí)數(shù)據(jù)“設(shè)備實(shí)時(shí)事件”為例)。
表3 重復(fù)數(shù)據(jù)篩選結(jié)果
無(wú)效數(shù)據(jù)篩選結(jié)果見(jiàn)表4。
表4 無(wú)效數(shù)據(jù)篩選結(jié)果
空數(shù)據(jù)篩選結(jié)果見(jiàn)表5。
表5 空數(shù)據(jù)篩選結(jié)果
數(shù)據(jù)篩選后,開(kāi)發(fā)人員可利用Navicat和InfluxDBStudio可視化工具查看數(shù)據(jù),用戶顯示界面如圖4所示。
圖4 用戶界面
用戶在瀏覽器中輸入localhost:3000進(jìn)入此平臺(tái),首先填寫(xiě)數(shù)據(jù)庫(kù)的用戶名以及密碼,創(chuàng)建用戶需要的數(shù)據(jù)庫(kù),其次選擇數(shù)據(jù)庫(kù)顯示的儀表形式,有折線圖、表格、文本等形式。例如:用戶想查詢MySQL數(shù)據(jù)庫(kù)中的某個(gè)特定條件的批數(shù)據(jù)并以表格的形式輸出,用戶可在系統(tǒng)界面選擇Table并輸入:SELECT * FROM ′company_list′ WHERE 區(qū)=′天寧區(qū)′,便只查詢天寧區(qū)的公司名單,其實(shí)現(xiàn)效果如圖5所示。
圖5 公司名稱(chēng)
同理,也可以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)的查詢,能快速地查詢到各個(gè)設(shè)備的實(shí)時(shí)狀態(tài)和對(duì)應(yīng)的發(fā)生時(shí)間,如圖6所示。
圖6 設(shè)備實(shí)時(shí)狀態(tài)表
同時(shí)系統(tǒng)界面也提供edit的方式,用戶只需要選擇操作的數(shù)據(jù)庫(kù)和限定條件,也可輕松查詢到數(shù)據(jù)。不僅如此,用戶也可以利用此平臺(tái)篩選出自己所需要的數(shù)據(jù),如查詢到相同設(shè)備號(hào)id的機(jī)器、同一時(shí)間內(nèi)機(jī)器的上線數(shù)量、統(tǒng)計(jì)一段時(shí)間內(nèi)出故障機(jī)器的數(shù)量、顯示預(yù)警的極值和結(jié)束時(shí)間等。
Grafana可以無(wú)縫定義告警在數(shù)據(jù)中的位置,可視化的定義閾值,并可以通過(guò)釘釘、E-mail等平臺(tái)獲取告警通知。這里我們選用E-mail的形式來(lái)關(guān)注實(shí)時(shí)設(shè)備狀態(tài)并獲得告警通知。首先在啟動(dòng)Grafana前配置/etc/grafana/grafana.ini開(kāi)啟smtp服務(wù),配置發(fā)送郵件的郵箱以及密碼。配置好后,通過(guò)Grafana 的Alerting功能設(shè)置發(fā)送郵件的間隔時(shí)間,實(shí)現(xiàn)對(duì)設(shè)備數(shù)據(jù)的預(yù)警(此平臺(tái)判斷機(jī)器是否出現(xiàn)故障的方式有兩種:①由于機(jī)器是24小時(shí)運(yùn)作的,所以機(jī)器會(huì)一直呈現(xiàn)在線狀態(tài),若機(jī)器離線時(shí)間過(guò)長(zhǎng)則會(huì)判定為出故障;②平臺(tái)會(huì)每隔一定時(shí)間發(fā)送機(jī)器上下線的數(shù)量給用戶,若下線的機(jī)器數(shù)量過(guò)多,則判定有機(jī)器出現(xiàn)故障),其告警如圖7所示。
圖7 告警
在文獻(xiàn)[11]中,作者介紹了當(dāng)前較為典型的Clou-dera大數(shù)據(jù)平臺(tái),其以Hadoop技術(shù)架構(gòu)為基礎(chǔ),具有穩(wěn)定的、可擴(kuò)展的企業(yè)級(jí)大數(shù)據(jù)管理平臺(tái),它提供了很多部署案例,能夠方便管理企業(yè)生產(chǎn)過(guò)程中的多種數(shù)據(jù),且具有強(qiáng)大的管理和監(jiān)控工具。其中Cloudera Manager是開(kāi)源的方便使用的一款產(chǎn)品,它提供Web用戶界面使得企業(yè)進(jìn)行數(shù)據(jù)管理時(shí)更加容易。而Shark[12]也是一個(gè)相對(duì)較新的開(kāi)源工業(yè)大數(shù)據(jù)分析平臺(tái),它是Spark的一個(gè)組件,可安裝在與Hadoop相同的集群上,是一個(gè)性能較好的分布式和容錯(cuò)內(nèi)存分析系統(tǒng),它具有數(shù)據(jù)聯(lián)合分區(qū),容錯(cuò)以及機(jī)器學(xué)習(xí)的能力,且完全兼容Hive和HiveQL,也能支持多種數(shù)據(jù)庫(kù)數(shù)據(jù)的查詢。
本平臺(tái)采用了Flink框架來(lái)構(gòu)建工業(yè)大數(shù)據(jù)平臺(tái)。首先比較Flink平臺(tái)和文獻(xiàn)[11,12]二者平臺(tái)基礎(chǔ)框架的技術(shù)特點(diǎn):Spark和Flink都是運(yùn)行在YARN上的,但Flink的性能是優(yōu)于Spark的,而Spark性能是大于Hadoop的,而且迭代的次數(shù)越多,F(xiàn)link 的優(yōu)勢(shì)越明顯。不僅如此Flink具有靈活的窗口,對(duì)于流數(shù)據(jù)處理起來(lái)更加方便,而工業(yè)生產(chǎn)下流數(shù)據(jù)偏多且較為復(fù)雜,因此Flink十分適用于工業(yè)場(chǎng)景。
其次,文獻(xiàn)[11,12]對(duì)于工業(yè)領(lǐng)域中的不同類(lèi)型的數(shù)據(jù)無(wú)明確的區(qū)分,只采用單一的數(shù)據(jù)庫(kù)存儲(chǔ)數(shù)據(jù)。而本平臺(tái)中采用一個(gè)數(shù)據(jù)池來(lái)存儲(chǔ)不同數(shù)據(jù),批數(shù)據(jù)放入MySQL據(jù)庫(kù)中,流數(shù)據(jù)放入InfluxDB數(shù)據(jù)庫(kù)中,能夠更好地區(qū)分開(kāi)不同類(lèi)型的工業(yè)大數(shù)據(jù)。再者,本平臺(tái)利用Kafka進(jìn)行數(shù)據(jù)暫時(shí)存儲(chǔ),更好地保證了數(shù)據(jù)傳輸?shù)陌踩砸约捌脚_(tái)的可擴(kuò)展性。
Flink平臺(tái)和Cloudera Manager大數(shù)據(jù)平臺(tái)、Shark大數(shù)據(jù)平臺(tái)的查詢數(shù)據(jù)效率如圖8所示,行表示數(shù)據(jù)集的數(shù)量(單位個(gè)數(shù)),列表示用戶查詢數(shù)據(jù)的響應(yīng)時(shí)間(單位ms)。
圖8 數(shù)據(jù)查詢效率
從圖8中我們可以看出:當(dāng)數(shù)據(jù)集為5000條時(shí),各個(gè)平臺(tái)的執(zhí)行效率是差不多的,基本能在幾毫秒內(nèi)響應(yīng)出來(lái)。但當(dāng)數(shù)據(jù)集數(shù)據(jù)變多時(shí),Shark平臺(tái)和Cloudera Manager平臺(tái)數(shù)據(jù)查詢時(shí)間明顯上升,執(zhí)行效率變低,而Flink平臺(tái)在處理將近60 000條數(shù)據(jù)集時(shí)也能快速響應(yīng)。
3個(gè)平臺(tái)的吞吐量方面也進(jìn)行了比較(吞吐量即單位時(shí)間內(nèi)平臺(tái)成功傳送數(shù)據(jù)的數(shù)量),比較結(jié)果如圖9所示,本次測(cè)試吞吐量的單位為:條/s。
圖9 吞吐量比較
從圖9中可以看到,當(dāng)Kafka Data的Partition為1 時(shí),此平臺(tái)的吞吐量是Cloudera Manager大數(shù)據(jù)平臺(tái)的3.2倍,是Shark平臺(tái)的將近1倍,而當(dāng)Partition數(shù)為8時(shí),此平臺(tái)吞吐量為Shark平臺(tái)的將近1倍,是Cloudera Manager大數(shù)據(jù)平臺(tái)的4.6倍??傊瓼link平臺(tái)的吞吐量是遠(yuǎn)遠(yuǎn)高于其它兩個(gè)平臺(tái)的,而吞吐量又極大地反應(yīng)了系統(tǒng)的負(fù)載能力。在工業(yè)大數(shù)據(jù)量大的情況下,F(xiàn)link平臺(tái)能夠更好地運(yùn)作。
當(dāng)數(shù)據(jù)量變大時(shí),延遲低也是一個(gè)企業(yè)需要考慮的地方,因此比較了3個(gè)平臺(tái)的延遲性。延遲性即數(shù)據(jù)從進(jìn)入系統(tǒng)到流出系統(tǒng)所用的時(shí)間,本次測(cè)試延遲的單位為:ms。其實(shí)驗(yàn)結(jié)果如圖10所示。
圖10 延遲比較
從圖10可以看到,F(xiàn)link平臺(tái)的延遲較低,即使面對(duì)200 000條的數(shù)據(jù)量,平臺(tái)也只具有21 ms的延遲,而Shark平臺(tái)的延遲幾乎是Flink平臺(tái)的2倍,而Cloudera Manager平臺(tái)是Shark平臺(tái)的兩倍,因此Flink平臺(tái)在延遲上也有較大的優(yōu)勢(shì)。
同時(shí),在平臺(tái)預(yù)警速度方面做了個(gè)對(duì)比:選用5000條實(shí)時(shí)數(shù)據(jù)在不同的平臺(tái)上運(yùn)行,比較不同平臺(tái)進(jìn)行預(yù)警并發(fā)送郵件至用戶的時(shí)間。其實(shí)驗(yàn)結(jié)果如圖11所示。
圖11 預(yù)警時(shí)間比較
從圖11中我們可以看出:5000條實(shí)時(shí)數(shù)據(jù)在此平臺(tái)進(jìn)行預(yù)警并發(fā)送郵件的速度是最快的,需要20 ms,而在Shark平臺(tái)和Cloudera Manager大數(shù)據(jù)平臺(tái)分別需要27 ms和35 ms,此工業(yè)大數(shù)據(jù)平臺(tái)預(yù)警時(shí)間更短,能夠最大地減少企業(yè)的損失。
針對(duì)工業(yè)大數(shù)據(jù)數(shù)據(jù)量大、異構(gòu)性強(qiáng)、及時(shí)性強(qiáng)的特點(diǎn),引入大數(shù)據(jù)技術(shù),提出了Flink和Kafka集成的工業(yè)大數(shù)據(jù)平臺(tái),此平臺(tái)通過(guò)集群環(huán)境能夠高效地查詢數(shù)據(jù),并能進(jìn)行設(shè)備數(shù)據(jù)的快速預(yù)警。與目前較為典型的兩款開(kāi)源大數(shù)據(jù)平臺(tái)進(jìn)行比較,實(shí)驗(yàn)結(jié)果表明,此平臺(tái)在數(shù)據(jù)查詢效率、吞吐量、延遲性以及預(yù)警速度方面都是優(yōu)于其它兩個(gè)典型的大數(shù)據(jù)平臺(tái)的,能夠滿足預(yù)計(jì)的設(shè)計(jì)目標(biāo)。不僅如此,此平臺(tái)不僅適用于工業(yè)領(lǐng)域,而且適用于所有時(shí)間序列數(shù)據(jù)多的場(chǎng)景,因此基于Flink的工業(yè)大數(shù)據(jù)平臺(tái)的研究是具有實(shí)際意義的。
在今后的工作中,還需完善此平臺(tái)的其它功能,如云平臺(tái)數(shù)據(jù)分析故障預(yù)測(cè)等。其次數(shù)據(jù)源部分的數(shù)據(jù)都是整理好的,而此平臺(tái)中并未過(guò)多介紹如何獲取數(shù)據(jù)源,因此今后還需多學(xué)習(xí)物聯(lián)網(wǎng)的知識(shí)。在企業(yè)生產(chǎn)過(guò)程中,安全性是重中之重的,雖然在傳輸過(guò)程中使用消息隊(duì)列保證數(shù)據(jù)傳輸?shù)陌踩?,但其它模塊產(chǎn)生的數(shù)據(jù)實(shí)際是不夠安全的,所以如何保障保證數(shù)據(jù)處理的安全性[13],這也是本平臺(tái)未來(lái)需要考慮的地方。