徐 達,曾 樂,王英杰
(國家氣象信息中心,北京 100081)
氣象綜合業(yè)務(wù)實時監(jiān)控系統(tǒng)—“天鏡”[1]是國家氣象信息中心為建設(shè)統(tǒng)一數(shù)據(jù)環(huán)境、整合分散獨立的監(jiān)視業(yè)務(wù)建立的通用、綜合、高效的集約化監(jiān)視平臺?!疤扃R”能夠為全國氣象部門在收集、分發(fā)、入庫、數(shù)據(jù)同步各個環(huán)節(jié)提供實時觀測數(shù)據(jù)和產(chǎn)品的數(shù)據(jù)全流程監(jiān)視服務(wù)。目前“天鏡”每小時接收處理氣象業(yè)務(wù)監(jiān)視全流程[2]數(shù)據(jù)記錄達到3千萬條,累計接入的數(shù)據(jù)資料超過400種,為了使目前的數(shù)據(jù)全流程監(jiān)視業(yè)務(wù)可以更高效地在大數(shù)據(jù)計算和分布式存儲架構(gòu)上運行,需要對目前海量監(jiān)視數(shù)據(jù)的處理中加大對計算策略和存儲策略的研究力度。
Spark[3]是對海量數(shù)據(jù)計算處理的重要工具和手段,是基于彈性分布式數(shù)據(jù)集(RDD)的數(shù)據(jù)結(jié)構(gòu),具有數(shù)據(jù)流模型特點。RDD將數(shù)據(jù)保留在內(nèi)存中,且允許用戶程序多次查詢,降低了對磁盤和網(wǎng)絡(luò)的開銷,適用于在線計算和迭代計算?!疤扃R”系統(tǒng)使用Spark計算全流程數(shù)據(jù),并按全國、省、市縣維度的統(tǒng)計指標進行匯聚。氣象資料的接入和監(jiān)視環(huán)節(jié)的擴展使得需要計算和處理的監(jiān)視信息激增,使得Spark運行作業(yè)時間變長,這對于滿足時效性要求而言需要縮短計算任務(wù)的運行時間,一種方式是從Spark集群框架和配置參數(shù)進行修改和優(yōu)化,另一種方式則是通過對程序代碼進行改動,采用最優(yōu)的計算策略來提升計算效率。
2017年10月,中國氣象局批復(fù)了由國家氣象信息中心牽頭,國家級各業(yè)務(wù)單位共同參與建設(shè)的氣象綜合業(yè)務(wù)實時監(jiān)控系統(tǒng)(一期)項目。該項目旨在建立技術(shù)先進的監(jiān)控系統(tǒng)技術(shù)框架,實現(xiàn)綜合監(jiān)視和告警運維核心功能,建立規(guī)范的監(jiān)控信息采集接口,監(jiān)視范圍橫向覆蓋氣象資料現(xiàn)有數(shù)據(jù)流程各環(huán)節(jié),縱向覆蓋信息系統(tǒng)從網(wǎng)絡(luò)及安全、服務(wù)器、存儲、中間件、應(yīng)用軟件運行狀態(tài)。氣象綜合業(yè)務(wù)實時監(jiān)控系統(tǒng)(一期)計劃2018年底建成后,將完成氣象綜合業(yè)務(wù)實時監(jiān)控的基礎(chǔ)框架,建立系統(tǒng)的硬件平臺和技術(shù)平臺,從技術(shù)上解決了原MCP系統(tǒng)面臨的性能瓶頸問題,建立規(guī)范化的監(jiān)視信息采集接口,實現(xiàn)監(jiān)視告警的核心功能,實現(xiàn)國家級基于CIMISS數(shù)據(jù)環(huán)境的資料數(shù)據(jù)流程的收集、分發(fā)、解碼入庫、接口服務(wù)等環(huán)節(jié)的監(jiān)視,以及CMACast衛(wèi)星廣播系統(tǒng)、部際系統(tǒng)等系統(tǒng)的監(jiān)視。但是隨著監(jiān)視信息不斷增長,現(xiàn)有的運行環(huán)境在處理計算上會有延遲,尤其是在中國地面分鐘級資料的實時監(jiān)視上會出現(xiàn)頁面為0的情況[4]。
國外氣象行業(yè)的監(jiān)視系統(tǒng)也是主要圍繞著數(shù)據(jù)傳輸網(wǎng)絡(luò)、數(shù)據(jù)收集生成、數(shù)據(jù)質(zhì)量、觀測設(shè)備狀態(tài)進行監(jiān)控,如美國國家海洋和大氣管理局(NOAA)建設(shè)了觀測系統(tǒng)監(jiān)控中心(OSMC)實時監(jiān)測全球海洋觀測系統(tǒng)的性能[5],歐洲中期天氣預(yù)報中心(ECMWF)通過常規(guī)觀測告警系統(tǒng)檢測數(shù)據(jù)可用性和質(zhì)量問題[6],美國國家環(huán)境預(yù)報中心(NCEP)的實時數(shù)據(jù)監(jiān)測系統(tǒng)(RTDMS)主要監(jiān)測數(shù)據(jù)的數(shù)量和時效性[7]。國外的數(shù)據(jù)監(jiān)視系統(tǒng)是基于傳統(tǒng)的數(shù)據(jù)資料文件入庫,并對該文件資料進行質(zhì)量評估后,繪制該類觀測資料的打點時序圖,對資料進行分類監(jiān)視。ECMWF和NOAA更加側(cè)重資料到報后的質(zhì)量情況,通過設(shè)計測試的數(shù)值預(yù)報模式來校驗到報的觀測資料是否合格,通過地圖打點的方式提供數(shù)據(jù)服務(wù),并用顏色來區(qū)分該類資料的數(shù)據(jù)質(zhì)量情況。
圍繞《全國氣象發(fā)展“十三五”規(guī)劃》提出的“智慧氣象”發(fā)展目標,氣象業(yè)務(wù)在實施現(xiàn)代化、信息化、集約化、標準化的進程中,都需要監(jiān)控系統(tǒng)來保障業(yè)務(wù)的高效穩(wěn)定運行。但是,各氣象業(yè)務(wù)的現(xiàn)有監(jiān)控系統(tǒng)都是獨立開發(fā)和運維,監(jiān)控系統(tǒng)分散且數(shù)量龐大,運行維護人力成本高;各監(jiān)控系統(tǒng)僅監(jiān)控業(yè)務(wù)流程中的獨立環(huán)節(jié),上下游監(jiān)控信息無法共享,缺乏對業(yè)務(wù)全流程的總體監(jiān)控,出現(xiàn)故障時準確定位故障位置困難、分析故障原因不及時,導(dǎo)致業(yè)務(wù)監(jiān)控運維效率低。因此,急需實現(xiàn)對觀測、信息、預(yù)報預(yù)測、公共服務(wù)及政務(wù)的全流程、全要素、全過程的一體化監(jiān)控和運維,以提升氣象業(yè)務(wù)運行管理的質(zhì)量和效率。2016年底,按照中國氣象局統(tǒng)一部署,由預(yù)報司牽頭組織與協(xié)調(diào),觀測司配合,信息中心作為實施技術(shù)組組長單位,協(xié)同各成員單位上下一心,通力合作,共同推動氣象綜合業(yè)務(wù)實時監(jiān)控系統(tǒng)建設(shè),樹立和打造氣象綜合業(yè)務(wù)監(jiān)控品牌——“天鏡”[8]。
氣象全流程監(jiān)控實現(xiàn)對數(shù)據(jù)從收集、分發(fā)、入庫、數(shù)據(jù)同步到應(yīng)用的全流程、全生命周期監(jiān)控。在收集環(huán)節(jié)由國內(nèi)氣象傳輸系統(tǒng)(CTS)收到氣象資料后,經(jīng)過文件打包處理后,把文件分發(fā)給業(yè)務(wù)系統(tǒng)和用戶。在入庫環(huán)節(jié)中解碼入庫程序按照氣象要素、時次等條件進行拆解,按照存儲規(guī)則錄入不同的數(shù)據(jù)庫中。為了提供氣象資料查詢服務(wù),需要將解碼后的數(shù)據(jù)在不同類型庫中進行同步。在氣象資料全流程監(jiān)視設(shè)計中需要對收集、分發(fā)、入庫、同步環(huán)節(jié)進行監(jiān)視。全流程實時指標見表1,計算依賴于節(jié)目表信息和總控配置信息,節(jié)目表信息用來指定該類氣象資料資料是否為考核資料,總控配置信息主要包含:資料業(yè)務(wù)時次配置信息、單站的單環(huán)節(jié)的單時次及時配置信息、統(tǒng)計規(guī)則(時次、時次截日、時次截小時、小時、日)、各個環(huán)節(jié)之間的關(guān)聯(lián)關(guān)系、文件級資料的應(yīng)收數(shù)、檢測告警開始時間、需要告警指標、告警持續(xù)時間等相關(guān)配置。
表1 全流程實時計算收集環(huán)節(jié)核心指標
Spark是一種快速、通用、可擴展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化項目。目前Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個包含多個子項目的集合,包含SparkSQL、Spark Streaming、GraphX、MLlib、等子項目。
Spark是基于內(nèi)存計算的大數(shù)據(jù)并行計算框架,與Hadoop的MapReduce相比,Spark基于內(nèi)存的運算速度更快,同時保證了高容錯性和高可伸縮性,Spark實現(xiàn)了高效的DAG執(zhí)行引擎,從而可以通過內(nèi)存來高效處理數(shù)據(jù)流[9-10]。
在“天鏡”中,Spark的體系架構(gòu)如圖1所示?!疤扃R”采用Standlone模式部署Spark集群,通過Zookeeper,一個開源的分布式應(yīng)用程序協(xié)調(diào)服務(wù)軟件進行集群管理,在Spark集群上創(chuàng)建常駐的SparkSession即常駐的Driver進程用于交互Spark程序,SparkSession中包含開源的ActorSystem,一套開源的用于設(shè)計跨處理器和網(wǎng)絡(luò)的可擴展彈性系統(tǒng)。服務(wù)端的ActorSystem向Zookeeper注冊自身的地址。在外部調(diào)度任務(wù)模塊的驅(qū)動下,將獲取服務(wù)端的Actor-System地址,隨機選擇其中一個地址,提交SprakSQL任務(wù),SparkSQL任務(wù)提交成功后,會把任務(wù)和接收提交的ActorSystem信息注冊到Zookeeper,用于后續(xù)查看SparkSQL任務(wù)狀態(tài)和取消任務(wù)。
圖1 “天鏡”中Spark的體系架構(gòu)
全流程各環(huán)節(jié)監(jiān)視信息通過接口網(wǎng)關(guān)進入后至高速緩沖通道,一路數(shù)據(jù)直接入庫進行持久化,一路數(shù)據(jù)進行標準化構(gòu)建和數(shù)據(jù)清洗形成中間結(jié)果表(見表2)。
表2 臺站級資料預(yù)處理后中間結(jié)果表
根據(jù)總控配置表的業(yè)務(wù)頻次(cron表達式[0 0 0/1 * * ? ]、統(tǒng)計規(guī)則[時次、時次截小時、時次截日、小時、日])信息計算出業(yè)務(wù)時次,并生成一個sparkSQL文件存入到HDFS中,提交給spark計算,計算考核指標的SparkSQL語句如下:
1.--考核應(yīng)收
2.sum(coalesce(CO_CHECK_TD,0))AS CO_CHECK_TD,
3.--考核及時收
4.sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL,
5.--考核逾期收
6.sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL,
7.--考核實收數(shù)
8.(sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL,
9.--考核缺收數(shù)
10.(sum(coalesce(CO_CHECK_TD,0)) - (sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,
11.--考核及時率
12.(sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_INTIME_RATE,
13.--考核到報率
14.((sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_RATE.
“天鏡”系統(tǒng)部署在36臺IntelX86物理服務(wù)器上(見圖2),其中5臺服務(wù)器用于部署網(wǎng)關(guān)模塊(gateway),數(shù)據(jù)預(yù)處理模塊(standardizer)主要負責(zé)接收監(jiān)視信息的收集和全流程中間結(jié)果(指標詳情)的處理,3臺服務(wù)器用于部署消息中間件(kafka)集群,用于數(shù)據(jù)的高速緩存,避免因數(shù)據(jù)量過大導(dǎo)致后端數(shù)據(jù)庫寫入壓力過大。18臺服務(wù)器部署分布式日志數(shù)據(jù)庫用于對監(jiān)視信息的原始指標,中間結(jié)果,最終計算指標進行存儲。用于計算的Spark集群(版本2.3.1)[11]部署在5臺CPU 24核,內(nèi)存256G,3.2TSAS磁盤,操作系統(tǒng)為Centos7.3服務(wù)器上。
圖2 “天鏡”-氣象數(shù)據(jù)全流程系統(tǒng)架構(gòu)
基于Spark計算引擎對氣象全流程監(jiān)視信息進行實時處理,作業(yè)調(diào)度任務(wù)每分鐘執(zhí)行一次,按照臺站級氣象資料(StationDiStaticJob)和文件級氣象資料(FileDiStaticJob)分為兩個計算任務(wù)。隨著接入的氣象資料種類越來越多,每分鐘處理的監(jiān)視信息也呈幾何級增長,執(zhí)行的Spark任務(wù)的耗時在20分鐘以上,導(dǎo)致氣象全流程監(jiān)視界面中氣象區(qū)域站資料無法及時顯示。與此同時,運維人員發(fā)現(xiàn)Spark集群中有個別節(jié)點的負載特別高,這種情況是因為數(shù)據(jù)源單個spark input read數(shù)據(jù)量過大,或者單個task相對于其他task spark input read較大的情況,導(dǎo)致的讀取數(shù)據(jù)源明顯不均勻[12]。因此盡量使用可切割的文本存儲,生成盡量多的task進行并行計算,可以從數(shù)據(jù)源避免傾斜,并從源頭增大并行度[13]。通過觀察Spark任務(wù)管理頁面可以看到已完成的計算任務(wù)資源使用和耗時情況,如表3所示,正常計算任務(wù)需要分配計算資源10核,內(nèi)存5 GB。
表3 優(yōu)化前Spark任務(wù)運行監(jiān)視結(jié)果
進行Spark計算任務(wù)的優(yōu)化的目的,是為了充分利用硬件本身的性能,最大限度地提升Spark中Executor的執(zhí)行效率[14-17]。依據(jù)氣象全流程監(jiān)視界面資料展示情況,拆分為地面資料、海洋資料、高空資料、輻射資料、農(nóng)業(yè)與生態(tài)資料、大氣成分、雷達數(shù)據(jù)、衛(wèi)星數(shù)據(jù)、氣象服務(wù)產(chǎn)品、數(shù)值預(yù)報產(chǎn)品共10類資料,每類資料又分為考核資料和非考核資料。相較于優(yōu)化前,雖然增加了SparkSQL模板的復(fù)雜度,但是提升了氣象考核資料的計算效率,該文以傳輸環(huán)節(jié)考核資料為例,新增的SparkSQL模板如下:
1.base.sql.co.checks=sum( casewhen CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) AS CO_CHECK_TD, sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL, sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) - (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,(sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_INTIME_RATE,((sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_RATE
在向Spark進行任務(wù)提交時,客戶端處理程序需要將氣象資料按照上述分類進行拆解,核心代碼如下:
1.…
2.for (TabMcmConfig config : tabOminCmCcSubsystem Allconfigs) {
3.// 1、資料編碼
4.String ctsCode = config.getcCtsCode();
5.String sodCode = config.getcSodCode();
6.//資料大類
7.String dataClass = config.getDataClass();
8.String ctsSodCode = ctsCode.concat(":").concat(sodCode);
9.//文件級還是站點級
10.String dataSourceType = config.getcDataSource();
11.if ("1".equals(dataSourceType)) {
12.if(!fileDiComputeEnabled) {
13.continue;
14.}
15.dataSourceType = "file";
16.}else {
17.dataSourceType = "station";
18.}
19.…
此段代碼通過獲取總控配置后對每類氣象資料進行分類,分類后生成的計算任務(wù)與生成的SparkSQL模板匹配,從而完成計算任務(wù)拆解,單個SparkSQL只計算一類考核資料或者一類非考核資料。
該文采用自動化測試的方法,由于對程序代碼結(jié)構(gòu)進行了修改和微調(diào),因此需要對優(yōu)化后的全流程指標計算結(jié)果正確性進行驗證。正確性可以根據(jù)監(jiān)視頁面中資料的統(tǒng)計指標和系統(tǒng)告警進行判斷,如圖3所示,可以通過查看Spark作業(yè)任務(wù)日志進行驗證,如表4所示。該文展示的全流程監(jiān)視界面與優(yōu)化前資料監(jiān)視統(tǒng)計指標計算結(jié)果一致,并且中國地面分鐘降水?dāng)?shù)據(jù)在一級界面中可以顯示正常。優(yōu)化后單個計算任務(wù)的計算時間控制2分鐘以內(nèi)。
圖3 氣象綜合業(yè)務(wù)實時監(jiān)控系統(tǒng)—“天鏡”全流程監(jiān)視界面
表4 優(yōu)化后Spark任務(wù)運行監(jiān)視結(jié)果
通過拆分計算任務(wù),生成盡可能多的task增加Spark計算并行度,成功將氣象全流程計算框架優(yōu)化并業(yè)務(wù)運行,如表5所示,獲得了10倍的加速效果,提高了程序的運行效率。但是“天鏡”系統(tǒng)在處理大數(shù)據(jù)計算時還是有瓶頸,原因是地面區(qū)域站氣象資料會產(chǎn)生大量重復(fù)數(shù)據(jù),要能夠高效處理海量的監(jiān)視數(shù)據(jù),除了對計算任務(wù)拆分,還需要對計算任務(wù)設(shè)置優(yōu)先級,針對核心資料優(yōu)先分配計算資源計算,這就需要業(yè)務(wù)人員對資料的監(jiān)視等級進行配置,同時要熟悉Spark資源分配機制,在此基礎(chǔ)上來做系統(tǒng)優(yōu)化,能夠較好地提升優(yōu)化效果。
表5 “天鏡”全流程Spark計算任務(wù)優(yōu)化前后運行時間