郭盛興, 王 晶, 廖建新
(1. 北京郵電大學 網絡與交換技術國家重點實驗室, 北京 100876;2. 東信北郵信息技術有限公司, 北京 100191)
中間件是一種定義于操作系統(tǒng)之上,應用程序之下的一層軟件,它能使應用之間進行跨網絡協(xié)同工作,屏蔽了操作系統(tǒng)和網絡協(xié)議的差異向應用提供通信服務[1]. 消息中間件支持在一個分布式應用環(huán)境中多種用途的消息交換. 它所提供的API將不同分布式環(huán)境很好地封裝起來,對外提供統(tǒng)一的接口,使得應用能通過統(tǒng)一接口進行開發(fā)[2].
通用消息(component packet of realtime application process management and communication,COPART-MACO)是一個抽象得比較好的消息中間件. 對不同類型的上層消息增加同樣的底層消息頭,如圖1,這種消息稱為通用消息.
使用通用消息的進程間交互采用統(tǒng)一的協(xié)議,底層采用統(tǒng)一的方式通信,軟件功能差異主要體現在高層的消息處理部分[3]. 通用消息可以滿足不同進程間通信需要,但通用消息是面向無連接的,只提供了消息尋址功能,不保證通信的可靠性,對比TCP/IP協(xié)議來說,相當于實現了IP層功能. 為滿足可靠性要求,提出了基于通用消息的持久化隊列(Ebupt Message Queue簡稱EMQ)設計,EMQ由服務端和客戶端組成,服務端提供集中式的消息接收、存儲和轉發(fā)服務,客戶端可以通過服務端發(fā)送和接收消息,客戶端和服務端之間的通信采用請求應答方式,服務端具有消息持久化功能.
圖1 通用消息的消息結構Fig.1 COPART-MACO packet encapsulation
EMQ部署結構如圖2,其中包含3個域:消息隊列服務域、生產者域、消費者域.
圖2 EMQ部署結構Fig.2 EMQ deployment structure
包含的組件說明:
1) ininit,消息隊列的守護進程,負責啟動本域內的所有其他進程,在子進程異常退出后,重新啟動子進程;
2) inaccessd,控制臺接入服務端進程,接入它之后,可以建立與本域內的所有進程的連接,輸入控制臺命令得到應答;
3) msgr,通用消息的消息分發(fā)模塊,用于轉發(fā)本域內的進程和外部進程之間的交互消息,根據實際需要,也可以有一個或者多個;
4) emqserver,EMQ服務端,持久化消息隊列的核心,負責接收、存儲和轉發(fā)消息;
5) emqclient,與EMQ服務端交互的客戶端,EMQ客戶端分為生產者和消費者,生產者發(fā)送消息給消息隊列服務端,消費者從通用消息服務端接收消息并處理.
EMQ服務端的結構如圖3:
圖3 持久化消息隊列服務端結構Fig.3 EMQ server structure
服務端底層通信鏈路層是通用消息層,EmqManager管理EMQ服務端的兩個最主要的數據結構,與客戶端的連接EmqConnection和持久化的消息隊列EmqQueue,連接與消息隊列間通過隊列名QueueName相互關連.
通用消息是面向無連接的,所以EMQ客戶端與服務端之間建立的是虛擬連接. 在通用消息的結構下,每個進程都有唯一的進程地址標識,包括三部分:域編號,功能實體編號,和進程實例編號. 一條EMQ虛擬連接對應一對進程地址標識. 連接建立的時候,客戶端請求參數中含有隊列名,服務端保存與客戶端的連接、進程地址標識與隊列的對應關系,客戶端收到服務端連接響應后保存與服務端的連接信息. 對于JMS[4]中一個客戶端進程與服務端一個隊列建立多條連接的情況,如Java編程中客戶端進程采用多線程,這時每個線程會與服務端的一個隊列有一個虛擬連接,將每一個線程作為通用消息的一個實例.
考慮通用性,持久化隊列設計成相對獨立的模塊,按照EMQ消息持久化所要求的接口能力提供操作接口. EMQ需要持久化隊列實現提供的接口主要包括:createQueue(創(chuàng)建隊列)、destroyQueue(刪除隊列)、enQueue(消息入隊)、deQueue(消息出隊)、recover(恢復隊列)等. 研究給出兩種持久化隊列可行的實現方式:
1) 基于文件和索引的實現
定義每條消息的持久化存儲結構,將通用消息整個消息的長度、消息頭和消息體及消息是否已經被“消費”的標識存儲到文件,在內存中保存每條消息在文件中的位置索引,讀取消息時直接通過索引根據存儲結構讀取消息. 此方式在消息量相對比較小的情況下是一種比較好的選擇.
2) 基于開源項目的實現
基于SQLite來實現持久化隊列的存儲,在SQLite基礎上做一層封裝,提供EMQ所需的接口. 基于SQLite實現的持久化隊列,對于比較大的數據量也能獲得比較好的性能. 除SQLite外,還可以考慮Berkeley DB、redis等其他開源項目,在其開放的API基礎上封裝為EMQ提供所需的接口即可.
生產者與消費者間核心的消息交互流程如圖4.
1) 生產者依據負荷分擔策略從幾個服務端連接中選擇一個連接,發(fā)送消息請求;
2) 服務端收到消息后,找到消費者進程地址標識對應的持久化隊列,并將消息持久化保存到隊列中;
3) 服務端保存消息后將操作結果響應給生產者;
4) 生產者依據響應結果決定消息如何處理;
5) 當消費者連接對應的隊列中有數據時,服務端將每次取出一條消息發(fā)送給消費者;
6) 消費者收到消息后首先給服務端消費消息響應;
圖4 消息交互流程Fig.4 Diagram of message flow
7) 消費者對收到的消息進行“消費”處理.
EMQ客戶端和服務端的通信多數采用請求—應答方式. 定義EMQ客戶端和EMQ服務端的消息格式如圖5.
圖5 消息格式Fig.5 Message format
1) 操作類型:用于區(qū)分對隊列的不同操作,為一字節(jié)整型;
2) 操作參數:可以為任意長度,但是由于EMQ消息使用通用消息的消息體承載,所以操作參數最大長度受通用消息一個包的最大長度限制.
EMQ客戶端與服務端之間接口消息的操作類型編碼和操作參數定義如表1.
操作結果0表示成功,非0表示失敗,表2是各種主要操作結果代碼及說明.
流量控制機制理想情況下可以指示任意時刻發(fā)送方發(fā)送消息的速率或數量,確保接受方的資源不被耗盡[5].
表1 操作類型定義Tab.1 Define of message operations
表2 操作結果代碼定義Tab.2 Result code of operation
服務端采用基于滑動窗口的流量控制機制[6],針對每條消費者連接設置一個發(fā)送窗口,當發(fā)送出去但還沒有收到響應的消息數量達到窗口大小或收到操作結果代碼為“超過接收窗口大小”的應答時,服務端延遲發(fā)送消息. 對于每條生產者連接,服務端采用接收窗口來進行流量控制,當收到一條消息,如果未發(fā)送應答的消息數量達到窗口大小時,應答“超過接收窗口大小”的操作結果代碼,發(fā)送方則延遲發(fā)送消息.
消費者采用服務端同樣的流量控制機制和錯誤超時重傳策略.
服務端與客戶端采用請求-應答機制,當應答錯誤和應答超時通過消息重傳來保證消息不被丟失,而且EMQ隊列服務端具有持久化功能,因此可以有效保證服務端與客戶端之間消息通信的服務質量.
為發(fā)揮通用消息的作為底層通信模塊的優(yōu)勢,同時彌補其在服務質量方面的不足,在通用消息基礎上,提出了一種消息隊列EMQ的設計. EMQ服務端與客戶端之間基于請求-應答方式,當請求應答錯誤和應答超時進行消息重傳,服務端對消息持久化,這些機制可以更加有效地保證消息隊列的服務質量.
在實際應用中,設計提高了通用消息的服務質量,但由于消息持久化的引入卻付出了性能的代價,面對當前各種應用中越來越大量的交互消息及實時性要求,性能又是非常重要的,所以研究更高效率持久化隊列,設計一種無論對小數據量還是大數據量消息交互均有良好的性能表現的隊列,是后續(xù)研究重要的努力方向.