文/胡慶亮 王珊珊 高亮
高校智慧校園建設旨在改變師生與學校資源、環(huán)境的交互方式,開展以人為本的個性化服務,進而建立智能開放的教育環(huán)境和便利舒適的生活環(huán)境。消息中心服務可以將各類業(yè)務過程中產生的消息進行集中管理與收發(fā),師生用戶可以方便、及時、準確的獲知個人所關注的各類業(yè)務狀態(tài),實現(xiàn)了學校消息的一站式與個性化推送。因此,消息中心成為高校智慧校園建設的一項重要內容。
消息中心的實現(xiàn)依賴于高效可靠的消息隊列中間件(簡稱消息中間件),它可以通過消息傳遞和消息排隊模型,在分布式環(huán)境下提供應用解耦、彈性伸縮、冗余存儲、流量削峰、異步通信、數據同步等功能。目前,應用比較廣泛的消息中間件包括:RabbitMQ、ActiveMQ、Kafka、RocketMQ,其中RabbitMQ 是使用Erlang 語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP 協(xié)議實現(xiàn),該協(xié)議面向消息、隊列和路由(包括點對點和發(fā)布/訂閱),強調可靠性與安全性,主要應用于對數據一致性、穩(wěn)定性和可靠性要求很高的場景,此外RabbitMQ 還有高可用性、高易用性等優(yōu)點。結合高校的統(tǒng)一消息服務特點(對數據一致性、穩(wěn)定性和可靠性要求很高,并發(fā)量、吞吐量要求一般),考慮采用RabbitMQ來構建高校智慧校園消息中心。本文主要對基于RabbitMQ 構建的智慧校園消息中心的設計方案與實現(xiàn)進行闡述。
RabbitMQ 起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲轉發(fā)消息,具有易用性、擴展性、高可用性等優(yōu)勢,其內部結構如圖1 所示。
圖1 RabbitMQ 內部結構
1. Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
2.Publisher:消息的生產者,也是一個向交換器發(fā)布消息的客戶端應用程序。
3.Exchange:交換器,用來接收生產者發(fā)送的消息并將這些消息路由給服務器中的隊列。
4.Binding:綁定,是消息隊列和交換器之間的關聯(lián)。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構成的路由表。
5.Queue:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列,消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
6.Connection:網絡連接,比如一個TCP 連接。
7.Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP 連接內地虛擬連接,AMQP命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說建立和銷毀TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條TCP連接。
8.Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
9.Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務器域。每個vhost 本質上就是一個mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的vhost 是“/”。
10.Broker:表示消息隊列服務器實體。
基于RabbitMQ 的智慧校園消息中心包括:“消息匯聚層”和“消息下發(fā)層”,消息匯聚層完成業(yè)務消息的統(tǒng)一匯集與存儲,消息下發(fā)層則以方便、有效的途徑(服務門戶、短信消息、微信消息等)將消息下發(fā)給師生用戶。具體架構如圖2 所示。
圖2 基于RabbitMQ 的智慧校園消息中心架構
1.消息匯聚層
在消息匯聚層,對于RabbitMQ 而言,生產者是消息接口API,業(yè)務系統(tǒng)通過調用消息接口API 將消息數據放入消息隊列;消費者的職責則由后臺輪詢程序完成。在該架構下,完整的消息集成流程如下:
(1)業(yè)務系統(tǒng)調用消息接口API;
(2)消息接口API 被調用后,首先將消息數據落地到數據庫表,消息記錄的初始推送狀態(tài)設置為“pushstatus=0”,然后以Confirm 方式將消息發(fā)送給RabbitMQ;
(3) 消 息 接 口API 在 接 收 到RabbitMQ返回的Confirm消息確認成功后,更新消息記錄的推送狀態(tài)“pushstatus=1”。
(4)輪詢程序從RabbitMQ 隊列讀取消息,調用消息匯聚中心接口將消息寫入消息匯聚中心數據庫表。
其中,步驟三是RabbitMQ 的發(fā)送確認過程。在此過程中,可能出現(xiàn)網絡閃斷、MQ Broker 端異常等情況,導致回送消息失敗或者異常,因此需要發(fā)送方(生產者)對消息進行可靠性投遞,以保障消息不丟失。為此專門設計了輪詢機制,設置定時任務,每5 分鐘讀取一次中間狀態(tài)的消息(消息可以設置一個超時時間,比如超時1 分鐘且“pushstatus=0”,也就是1 分鐘的時間窗口內沒有被確認的消息,才會被定時任務拉取出來),然后將中間狀態(tài)的消息重新發(fā)送到MQ,稱之為“Retry send 機制”。輪詢程序的另外一個功能是定時比較源頭與消息匯聚中心的數據差異,將差異數據再次寫入消息匯聚中心(相比重新投遞的定時任務,此任務的時間窗口應設置的較大,如一天內未成功寫入的消息;執(zhí)行時間間隔也比較長,如一個小時),稱之為“Rewrite 機制”。因此本方案除了利用RabbitMQ 自身的可靠性機制(包括隊列持久化、發(fā)送確認)之外,“Retry send 機制”與“Rewrite 機制”作為額外的保障措施,提供了更高的可靠性。
2.消息下發(fā)層
在消息下發(fā)層,通過調用各類發(fā)送渠道,包括校園服務門戶(PC 與移動)、短信平臺、微信平臺、郵件平臺等,將消息方便及時的推送給師生用戶。
下文主要針對消息匯聚層中的生產者(消息接口API)和消費者(輪詢程序)的實現(xiàn)過程進行闡述。消息下發(fā)層以調用第三方程序接口為主,不是本方案的核心內容,故不再贅述。
1.生產者
通過分析實際的應用場景,定義了兩種消息類型:提醒與待辦,提醒是業(yè)務系統(tǒng)發(fā)送給用戶的提示消息,具有“已讀”、“未讀”屬性;待辦則是需要用戶辦理的一類特殊提醒,具有“未辦理”、“已辦理”屬性?;谙㈩愋偷亩x,對于提醒,API 提供了“提醒生成”與“提醒已讀”兩個操作;對于待辦,API 提供了“待辦生成”與“待辦消除”兩個操作。由于提醒在程序實現(xiàn)上與待辦類似,所以下文僅描述待辦API 的實現(xiàn)過程。
(1)待辦生成API
待辦生成API 程序在功能上主要實現(xiàn)了待辦消息數據的落地以及將待辦數據放入RabbitMQ 消息隊列并更新推送狀態(tài)。主要程序實現(xiàn)(java 代碼)如下:
/*
*待辦數據寫入數據庫表
*/try
{
String sql_insert = "insert into " + schema + ".TMP_TODOSERVICE(SEQ_ID, APP_ID, REFNO, MESSAGE_TYPE_CODE, TARGET_TYPE, TARGET_IDS, CONTENT,
URL, DO_STEP, CREATETIME, PUSHSTATUS, DOFLAG,PUSHSTATUS_2) " + "values(" + schema + ".SEQ_TMP.
NEXTVAL, " + app_id + ", '" + refno + "', '" + message_type_code + "', '" + target_type + "', '" + target_ids + "', '" + content+ "', '" + url + "', '" + do_step + "', sysdate, -1, 1, -1)";
st = conn.createStatement();
st.execute(sql_insert);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ 生產者,將待辦數據放入
RabbitMQ 隊列并更新推送狀態(tài)
*/
JSONObject joTodo = new JSONObject();
joTodo.put("datatype", "push");
joTodo.put("app_id", Long.valueOf(app_id));
joTodo.put("app_key", app_key);
joTodo.put("refno", refno);
joTodo.put("message_type_code", message_type_code);
joTodo.put("target_type", target_type);
joTodo.put("target_ids", target_ids);
joTodo.put("content", content);
joTodo.put("url", url);
joTodo.put("do_step", do_step);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", joTodo.toString());
(2)待辦消除API
消除待辦API 程序在功能上主要實現(xiàn)了待辦完成數據的落地(修改已寫入數據庫的待辦的完成狀態(tài))、將待辦數據放入RabbitMQ 消息隊列并更新推送狀態(tài)。主
要程序實現(xiàn)(java 代碼)如下:
/*
*待辦完成數據寫入數據庫表
*/
try
{
String sql_update = "update " + schema + ".TMP_TODOSERVICE set DOFLAG=0, DONETIME=sysdate " +"where APP_ID=" + app_id + " and REFNO='" + refno + "'";
st = conn.createStatement();
st.execute(sql_update);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ 生產者,將待辦完成數據放入RabbitMQ 隊列并更新推送狀態(tài)
*/JSONObject joTodo_complete = new JSONObject();
joTodo_complete.put("datatype", "complete");
joTodo_complete.put("app_id", Long.valueOf(app_id));
joTodo_complete.put("app_key", app_key);
joTodo_complete.put("refno", refno);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", oTodo_complete.toString());
2.消費者
作為消費者的輪詢程序實現(xiàn)的功能包括:(1)讀取消息隊列中的待辦數據并調用消息匯聚中心接口,將數據寫入消息匯聚中心數據庫表;(2)“Retry Send”功能,定時拉取推送MQ 失敗的消息,重新發(fā)送給RabbitMQ;(3)“Rewrite”功能,定時比較源頭與消息匯聚中心數據的差異(消息落地數據庫表與消息匯聚中心數據庫表),調用消息匯聚中心接口將差異數據重新寫入。主要程序實現(xiàn)(java 代碼)如下:
/*
*RabbitMQ 消費者,從RabbitMQ 隊
列獲取待辦數據并寫入消息匯聚中心
*/try {
JSONObject jsonObj = JSONObject.fromObject(message);
long app_id = Long.parseLong(jsonObj.get("app_id").toString());
String app_key = jsonObj.get("app_key").toString();
String refno = jsonObj.get("refno").toString();
if (StringUtils.isEmpty(app_key)) {
app_key = (String)appKeyMap.get(Long.valueOf(app_id));
}
if((null != jsonObj.get("datatype")) && ("push".equals(jsonObj.
get("datatype").toString()))) {
String message_type_code = jsonObj.get("message_type_
code").toString();
String target_type = jsonObj.get("target_type").toString();
String target_ids = jsonObj.get("target_ids").toString();
String content = jsonObj.get("content").toString();
String url = jsonObj.get("url").toString();
String do_step = jsonObj.get("do_step").toString();
表1 驗證結果對比
ret = TodoIServiceUtil.todoServicePush(portalServ
erUrl, app_id, app_key, refno, message_type_code,
target_type, target_ids, content, url, do_step);//調用
消息匯聚中心接口
} else if ((null != jsonObj.get("datatype")) && ("complete".
equals(jsonObj.get("datatype").toString())))
{
ret = TodoIServiceUtil.todoServiceComplete(portalServerUrl,
app_id, app_key, refno);//調用消息匯聚中心接口
}
}
/*
*定時Retry send
*/
try
{
retrySendTodoPushFailureFromDb();//重發(fā)推送mq 失敗的
未完成待辦
retrySendTodoStateFailureFromDb();//重發(fā)推送mq 失敗的
已完成待辦
}
/*
*定時Rewrite
*/
try
{
matchTodoPushFailureFromDb();//重寫未寫入消息匯聚中心的待辦
matchTodoCompleteFailureFromDb();//重寫完成狀態(tài)不一致的待辦
}
在方案驗證環(huán)節(jié),對系統(tǒng)可靠性(消息接收成功率)與及時性(消息的平均延遲時間(毫秒))進行了測試與考察,定義如下:
2.第i 條消息的延遲=第i 條消息的數據庫寫入時間-第i 條消息的發(fā)送時間
(n 為接收消息總數)
同時將“Retry Send 機制”的觸發(fā)時間設定為5 分鐘,時間窗口設定為1 分鐘,“Rewrite 機制”的觸發(fā)時間設定為10 分鐘,時間窗口設定為5 分鐘。java 程序循環(huán)調用待辦推送接口API,分別發(fā)送待辦2000條、5000 條、10000 條。在每條消息發(fā)出時,記錄其發(fā)送時間,并和數據庫記錄生成時間做比較,得到每條消息的延遲時間。三次驗證結果分別統(tǒng)計見表1。
驗證過程并沒有考慮程序本身執(zhí)行時間以及網絡延遲的影響,可見隨著消息發(fā)送的增加,消息的平均延遲時間差別并不大;另外,在驗證過程中,遇到了RabbitMQ 因網絡連接超時等情況而發(fā)送失敗的情況,但方案中“Retry send 機制”與“Rewrite 機制”保證了消息仍然被準確接收,驗證了方案的可靠性。
驗證過程未包含RabbitMQ 的吞吐量測試,有資料表明,RabbitMQ 吞吐量可達到5.95w/s,在消息持久化場景下,吞吐量也能達到2.6w/s 左右。這也說明AMQP 協(xié)議為了保證消息的可靠性在吞吐量上做了一定程度的取舍。
基于RabbitMQ 的智慧校園消息中心方案已在上海財經大學的一站式校園服務門戶中投入使用,經過三年多的運行,目前消息中心已實現(xiàn)了面向教職工的75 種消息和面向學生的48 種消息的匯聚與下發(fā)。同時支持用戶個性化設置消息接收渠道(門戶站內信、手機短信以及微信消息)和業(yè)務消息類型,方便師生用戶及時、準確的接收個人所關注的業(yè)務狀態(tài)變更提醒,大大提升了業(yè)務辦理效率以及用戶使用體驗,取得了很好的應用效果。
本文描述了一種基于RabbitMQ 的智慧校園消息中心設計方案以及主要的程序實現(xiàn)。方案在利用RabbitMQ 自身可靠性機制的基礎上,增加了“Retry Send 機制”與“Rewrite 機制”,提高了消息接收的整體可靠性。測試驗證結果與實際應用成效表明,該方案可以很好的滿足高校中的消息集成需求,為高校智慧校園建設提供大力支撐。