楊寧 黃婷婷
摘 要: 隨著數(shù)據(jù)規(guī)模的不斷增大,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)方法已經(jīng)無(wú)法滿(mǎn)足大數(shù)據(jù)量的數(shù)據(jù)查詢(xún)需求,而基于Hadoop 平臺(tái)的 Hive 數(shù)據(jù)倉(cāng)庫(kù)為海量數(shù)據(jù)分析提供了方便的操作。隨著實(shí)時(shí)查詢(xún)需求的增加,基于Spark的Hive操作得到了很好的應(yīng)用。文章主要介紹了Hive on Spark的整合步驟以及與Hadoop運(yùn)行模式的比較。對(duì) MovieLens 數(shù)據(jù)集的實(shí)驗(yàn)測(cè)試顯示,新模式的執(zhí)行速度提高了17.42-46.35倍,這對(duì)進(jìn)一步了解Hive的運(yùn)行機(jī)制及海量數(shù)據(jù)的實(shí)時(shí)分析具有重要的意義。
關(guān)鍵詞: Hadoop; Hive; Spark; 海量數(shù)據(jù); 實(shí)時(shí)分析
中圖分類(lèi)號(hào):TP399 文獻(xiàn)標(biāo)志碼:A 文章編號(hào):1006-8228(2018)11-31-05
Abstract: With the increasing size of the acquired data, the traditional relational database method can no longer meet the data query requirements of such large data volume, but the Hive data warehouse based on Hadoop platform provides convenient operation for massive data analysis. And with the increase in real-time query requirements, Spark-based Hive operations have been well applied. In this paper, Hive and Spark are integrated, and the integration steps of Hive on Spark and the comparison with the traditional running structure are introduced. The experimental test on MovieLens dataset shows that the execution speed of the new mode has increased by 17.42-46.35 times. This is of great significance for further understanding of Hive's operating mechanism and real-time analysis of massive data.
Key words: Hadoop; Hive; Spark; massive data; real-time analysis
0 引言
隨著數(shù)據(jù)分析需求的不斷增加,實(shí)時(shí)性查詢(xún)?cè)絹?lái)越重要,但是基于Hadoop的Hive查詢(xún),耗時(shí)過(guò)長(zhǎng),這在實(shí)時(shí)性要求比較嚴(yán)格的業(yè)務(wù)中是無(wú)法使用的。Spark的使用,給Hive的實(shí)時(shí)查詢(xún)提供了可能,如何更好的將Spark與Hive進(jìn)行整合,從而得到更高的查詢(xún)速度,是以后需要進(jìn)一步研究的方向。
1 Hive
1.1 Hive的背景
Hive起源于Facebook。原因是Facebook有著大量的用戶(hù)數(shù)據(jù)[1]需要進(jìn)行處理。而Hadoop[2]是一個(gè)由MapReduce[3]模塊實(shí)現(xiàn)的大數(shù)據(jù)處理工具,主要的應(yīng)用場(chǎng)景是在構(gòu)建數(shù)據(jù)倉(cāng)庫(kù)時(shí),對(duì)數(shù)據(jù)執(zhí)行抽取、轉(zhuǎn)換和裝載操作[4]。但是,由于MapReduce程序?qū)τ谄渌Z(yǔ)言開(kāi)發(fā)者來(lái)說(shuō)相對(duì)麻煩。所以,F(xiàn)acebook研發(fā)了Hive,這將sql語(yǔ)句在Hadoop上執(zhí)行成為了可能,達(dá)到了提高查詢(xún)效率的目的。
1.2 Hive的模型
Hive運(yùn)行時(shí),將SQL語(yǔ)句進(jìn)行解釋、編譯、優(yōu)化并生成執(zhí)行任務(wù),默認(rèn)情況下會(huì)將查詢(xún)語(yǔ)句轉(zhuǎn)化為MapReduce任務(wù)進(jìn)而執(zhí)行。在基于Spark的架構(gòu)中,將轉(zhuǎn)化為抽象的RDD,然后對(duì)相應(yīng)的RDD再進(jìn)行相關(guān)的處理。Hive 中的主要數(shù)據(jù)模型如下:表(Table)、外部表(External Table)、分區(qū)(Partition)、桶(Bucket)[5]。Hive中包含的主要組件如下:
Driver組件:主要有Compiler、Optimizer、Executor,可以將Hive語(yǔ)句進(jìn)行編譯、解析、優(yōu)化,進(jìn)而轉(zhuǎn)化為相應(yīng)的任務(wù)并提交給計(jì)算引擎進(jìn)行處理。
MetaStore組件:存儲(chǔ)著Hive的元數(shù)據(jù)信息,主要為關(guān)系型數(shù)據(jù)庫(kù)。
用戶(hù)接口:用于訪(fǎng)問(wèn)Hive。
2 Hive on Hadoop
2.1 Hadoop的背景
Google在2004年提出了最原始的分布式計(jì)算架構(gòu)模型[6]:MapReduce,該模型主要用于大規(guī)模數(shù)據(jù)的并行處理。MapReduce模型主要分為Map和Reduce過(guò)程,主要的原理是將大規(guī)模數(shù)據(jù)處理作業(yè)拆分成多個(gè)可獨(dú)立運(yùn)行的Map任務(wù),然后傳輸?shù)蕉鄠€(gè)處理機(jī)上進(jìn)行分布處理,最后通過(guò)Reduce任務(wù)混洗合并,從而產(chǎn)生最終的輸出文件。盡管MapReduce模型比較好的考慮了數(shù)據(jù)存儲(chǔ)、調(diào)度、容錯(cuò)管理、負(fù)載均衡[7]等問(wèn)題。但是它也存在不足,如占用過(guò)多的網(wǎng)絡(luò)資源、磁盤(pán)讀寫(xiě)耗費(fèi)時(shí)間、異步性差等問(wèn)題。
2.2 Hadoop的四大組件
Hadoop為可靠、可擴(kuò)展的分布式開(kāi)源軟件。
Hadoop的四個(gè)組件如下:
Hadoop Common:支持其他Hadoop模塊的程序。
HDFS:分布式文件系統(tǒng),提供訪(fǎng)問(wèn)應(yīng)用程序的數(shù)據(jù)。
Hadoop YARN:作業(yè)調(diào)度和集群資源管理的框架[8]。
Hadoop MapReduce:基于YARN的大型數(shù)據(jù)集并行處理系統(tǒng)。
2.3 Hive on Hadoop運(yùn)行機(jī)制
Hive的客戶(hù)端書(shū)寫(xiě)hql語(yǔ)句發(fā)起任務(wù)請(qǐng)求,然后將hql語(yǔ)句轉(zhuǎn)化為mapreduce任務(wù),通過(guò)資源管理器yarn,分發(fā)到各個(gè)節(jié)點(diǎn)上進(jìn)行數(shù)據(jù)處理。這種運(yùn)行模式的目的是使客戶(hù)端主要集中進(jìn)行查詢(xún)語(yǔ)句的書(shū)寫(xiě),而不用過(guò)多的關(guān)注底層的開(kāi)發(fā)。具體執(zhí)行流程如圖1所示。
2.4 Hive on Hadoop的應(yīng)用
由于Hadoop具有較高的延遲,而且在作業(yè)提交和調(diào)度的時(shí)候,需要大量的額外開(kāi)銷(xiāo)。所以,這種模式無(wú)法滿(mǎn)足大數(shù)據(jù)集的低延遲查詢(xún)。因此,該模式最佳使用場(chǎng)合是大數(shù)據(jù)集的離線(xiàn)批處理作業(yè),例如,網(wǎng)絡(luò)日志的離線(xiàn)分析。
3 Hive on Spark
3.1 Spark的背景介紹
Apache Spark[9]是基于內(nèi)存計(jì)算的用于大規(guī)模數(shù)據(jù)處理的分析引擎。Spark中的核心抽象概念就是彈性分布式數(shù)據(jù)集RDD(resilient distributed datasets)[10],該數(shù)據(jù)集為只讀型可恢復(fù)數(shù)據(jù)集。用戶(hù)可以利用 Spark中的轉(zhuǎn)換(transformation)和動(dòng)作(action)操作對(duì)其進(jìn)行處理,這其中也包括RDD的持久化操作,我們可以利用緩存的方式將其保存在內(nèi)存[11]中不被回收。
RDD通過(guò)血統(tǒng)(lineage)關(guān)系來(lái)完成容錯(cuò):主要的原理是丟失的RDD有足夠的信息知道自己的父RDD,從而可以通過(guò)再次計(jì)算的方式從父RDD得到丟失的RDD。
3.2 Spark的四大特性
Spark具有四大特性如下:
快速性:相比較于Hadoop,官網(wǎng)給出的運(yùn)行速度是提高了100倍。因?yàn)镾park使用DAG[12]調(diào)度程序、查詢(xún)優(yōu)化程序和物理執(zhí)行引擎,所以實(shí)現(xiàn)批量數(shù)據(jù)和流式數(shù)據(jù)處理的高性能。
易用性:支持使用Java,Scala,Python,R和SQL等語(yǔ)言進(jìn)行快速編寫(xiě)應(yīng)用程序。
高可用性:Spark提供了很多庫(kù),包括SQL、DataFrame、MLlib[13]、GraphX[14]和Spark Streaming[15]。我們可以在同一個(gè)應(yīng)用程序中組合使用這些庫(kù)。
跨平臺(tái)性:Spark可以運(yùn)行在Hadoop、Mesos、或者Kubernetes中;可以從HDFS、HBase、Hive和其他數(shù)百個(gè)數(shù)據(jù)源中訪(fǎng)問(wèn)數(shù)據(jù)。
3.3 Hive on Spark的運(yùn)行機(jī)制
我們?cè)赟park平臺(tái)運(yùn)行Hive時(shí),有遠(yuǎn)程和本地兩種方式。Hive on Spark主要的設(shè)計(jì)思路是,盡可能重用Hive邏輯計(jì)算層面的功能。在運(yùn)行生成物理計(jì)劃開(kāi)始時(shí),就提供一整套針對(duì)Spark的實(shí)現(xiàn),目的是使Hive的查詢(xún)可以作為Spark任務(wù)來(lái)執(zhí)行。
設(shè)計(jì)原則如下:
⑴ 盡量保持Hive源碼的完整性:主要為了不影響Hive目前對(duì)MapReduce和Tez的支持;
⑵ 利用Hive語(yǔ)句:主要指使用Hive的執(zhí)行語(yǔ)句對(duì)數(shù)據(jù)進(jìn)行操作,使主要的計(jì)算邏輯仍由Hive提供;
⑶ 對(duì)Spark具有良好的松耦合性:使用中可以直接利用命令進(jìn)行計(jì)算引擎的切換。
圖2是一個(gè)關(guān)于兩表join的hive操作執(zhí)行過(guò)程,具體的處理過(guò)程如下:
這個(gè)join查詢(xún)?cè)谶M(jìn)行邏輯計(jì)劃過(guò)程中生成了兩個(gè)MapWork和一個(gè)ReduceWork。TS讀取表記錄,F(xiàn)IL進(jìn)行過(guò)濾;RS對(duì)數(shù)據(jù)進(jìn)行分發(fā)和排序,JOIN算子對(duì)RS分組排序后的數(shù)據(jù)進(jìn)行join運(yùn)算,最后通過(guò)FS算子輸出結(jié)果。
在執(zhí)行SparkTask時(shí),將各個(gè)MapWork和ReduceWork包裝成函數(shù)應(yīng)用到RDD上,RDD主要由Hive表生成。對(duì)于存在依賴(lài)關(guān)系的Work之間,需要調(diào)用Shuffle操作并進(jìn)行stage的相應(yīng)劃分。圖2右為RDD的具體執(zhí)行過(guò)程,首先通過(guò)Union操作,然后執(zhí)行Shuffle操作,最后得到相應(yīng)的RDD,foreachAsync的作用是將任務(wù)提交到Spark引擎上進(jìn)行處理。
3.4 hive on spark的應(yīng)用
如今,數(shù)據(jù)的來(lái)源和特性不斷改變,傳統(tǒng)的處理方式已不再適用,并且當(dāng)使用過(guò)程中碰到迭代操作時(shí),基于MapReduce的Hive查詢(xún)根本無(wú)法滿(mǎn)足快速處理的要求。但是,對(duì)于實(shí)時(shí)查詢(xún)業(yè)務(wù),基于Spark的大數(shù)據(jù)分析工具Hive有著突出的表現(xiàn),特別是對(duì)于一些復(fù)雜的操作,如迭代操作。
4 相關(guān)工作
4.1 Hive on Spark的集群搭建準(zhǔn)備
Spark的編譯。要使用Hive on Spark,所用的Spark版本必須不包含Hive的相關(guān)jar包。需要下載Spark源碼進(jìn)行重新編譯。
我們這里用的Spark源碼是從官網(wǎng)下載的spark-1.6.2的源碼包。編譯前請(qǐng)確保已經(jīng)安裝JDK、Maven和Scala,Maven為3.3.3及以上版本,并配置環(huán)境變量。進(jìn)入到源碼根目錄下,利用make-distribution.sh命令進(jìn)行編譯,注意Hive和Spark的版本號(hào)要匹配。
4.2 Hive on Spark的搭建
本次實(shí)驗(yàn)中,主要搭建了三臺(tái)虛擬機(jī),其中Hive只需安裝在其中一臺(tái)機(jī)上,啟動(dòng)Hive時(shí),注意將MySQL驅(qū)動(dòng)包上傳到Hive的lib目錄下;然后,在Hive的機(jī)器上,將Spark的lib目錄下的assembly包拷貝到Hive的lib目錄下,目的是執(zhí)行Hive操作就不需要再手動(dòng)啟動(dòng)Spark。初始化數(shù)據(jù)庫(kù),啟動(dòng)Hive。至此,安裝結(jié)束,進(jìn)行實(shí)驗(yàn)測(cè)試。
5 兩種模式在具體查詢(xún)分析中的比較
5.1 影評(píng)案例的測(cè)試
主要使用了三張表movies.dat,ratings.dat,users.dat,我們主要對(duì)兩張表以及三張表的join操作進(jìn)行了測(cè)試,具體操作如表1,表2所示。
5.2 實(shí)驗(yàn)環(huán)境
實(shí)驗(yàn)采用在虛擬機(jī)建立3臺(tái)機(jī)器測(cè)試,配置如下,電腦硬件:(英特爾)Intel(R) Core(TM) i5-3210M CPU@2.50GHz(2500 Mh),內(nèi)存8.0GB,操作系統(tǒng)是 Microsoft Windows 7旗艦版(64位/Service Pack 1)。
三臺(tái)虛擬機(jī)的信息具體如表3。
三張表movies.dat,ratings.dat,users.dat的數(shù)據(jù)量分別為3883行數(shù)據(jù),1000209行數(shù)據(jù),6040行數(shù)據(jù)。測(cè)試結(jié)果如表4,表5所示。對(duì)于hadoop的具體執(zhí)行過(guò)程如表6所示。
5.3 性能比較與總結(jié)
我們通過(guò)具體的案例分析,將結(jié)果用圖表進(jìn)行顯示,每次運(yùn)行的時(shí)間單位為秒,具體如圖3、圖4所示。
6 實(shí)驗(yàn)總結(jié)和期望
實(shí)驗(yàn)中,我們看到利用Spark作為計(jì)算引擎比MapReduce的執(zhí)行速度快了17.421-46.347倍。基于Hadoop的執(zhí)行過(guò)程具體如表6所示,總體運(yùn)行時(shí)間都比較長(zhǎng)。但是當(dāng)基于Spark引擎運(yùn)行時(shí),每條語(yǔ)句的執(zhí)行時(shí)間都明顯降低了,特別是執(zhí)行sql6語(yǔ)句的時(shí)候,時(shí)間減少的更加明顯。
通過(guò)實(shí)驗(yàn),基于Spark的Hive語(yǔ)句執(zhí)行的效果明顯好于Hadoop,特別是對(duì)于復(fù)雜的查詢(xún)語(yǔ)句,如產(chǎn)生多個(gè)map和reduce過(guò)程的語(yǔ)句,Spark的表現(xiàn)更加突出。Spark的應(yīng)用使Hive的實(shí)時(shí)查詢(xún)成為了可能。這也對(duì)海量數(shù)據(jù)的實(shí)時(shí)分析具有重要的意義。
參考文獻(xiàn)(References):
[1] 李學(xué)龍,龔海剛.大數(shù)據(jù)系統(tǒng)綜述[J].中國(guó)科學(xué):信息科學(xué),2015.45(1):1-44
[2] 陸嘉恒.Hadoop實(shí)戰(zhàn)[M].機(jī)械工業(yè)出版社,2011.
[3] 宋杰.MapReduce大數(shù)據(jù)處理平臺(tái)與算法研究進(jìn)展[J].軟件學(xué)報(bào),2017.28(3).
[4] El-Sappagh S H A, Hendawi A M A, Bastawissy A H E. A proposed modelfor data warehouse ETL processes[J]. Journal of King Saud University Computer & Information Sciences,2011.23(2):91-104
[5] Dean J,Ghemawat S. MapReduce:simplified data process-ing on largeclusters[J].Communications of the ACM,2008.51(1):107-113
[6] 董西成.Hadoop技術(shù)內(nèi)幕深入解析MapReduce架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理[M].機(jī)械工業(yè)出版,2013.
[7] 陳林,Hadoop異構(gòu)集群下的負(fù)載均衡算法研究[J].現(xiàn)代計(jì)算機(jī),2018.5:60-62
[8] 方宸.基于YARN網(wǎng)絡(luò)數(shù)據(jù)分析系統(tǒng)實(shí)現(xiàn)與應(yīng)用研究[D].華中科技大學(xué),2014.
[9] 高彥杰.Spark大數(shù)據(jù)處理[M].機(jī)械工業(yè)出版社,2014.
[10] Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: a faulttolerant abstraction for in-memory cluster computing [C]// Proc of Conference on Networked Systems Design and Implementation.[S. l. ]:USENIX Association, 2012:2
[11] Han Z, Zhang Y. Spark: A Big Data Processing Platform Based on MemoryComputing [C]// Proc of International Symposium on Parallel Architectures.[S. l.]:IEEE Press,2015:172-176
[12] 袁景凌,熊盛武,饒文碧.Spark案例與實(shí)驗(yàn)教程[M].武漢大學(xué)出版社,2017.
[13] B Yavuz, B Yavuz, B Yavuz, E Sparks, D Liu.MLlib: machine learning in apache spark[J]. Journal of Machine Learning Research,2016.17(1):1235-1241
[14] 孫海.Spark的圖計(jì)算框架:GraphX[J],現(xiàn)代計(jì)算機(jī)(專(zhuān)業(yè)版),2017.9:120-122,127
[15] 陸世鵬,基于Spark Streaming的海量日志實(shí)時(shí)處理系統(tǒng)的設(shè)計(jì)[J].電子產(chǎn)品可靠性與環(huán)境試驗(yàn),2017.35(5).