余志堅 姜春志
摘 要:開發(fā)JavaWeb項目中發(fā)現(xiàn)服務(wù)之間的調(diào)用存在超時情況,由于涉及的處理邏輯全部是異步,引入定時重試的機制,重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。當(dāng)A服務(wù)依賴B服務(wù),B服務(wù)由于在業(yè)務(wù)高峰期處理能力降低,導(dǎo)致大量A服務(wù)過來的請求超時,A加入了超時重試機制,間隔時間根據(jù)重試次數(shù)的多少來決定,次數(shù)越多,兩次重試之間間隔的時間越多,此時的業(yè)務(wù)高峰也會給A帶來大量請求,大量的超時會導(dǎo)致重試隊列迅速堆積,直到內(nèi)存溢出。該文從線程池工作機制、ScheduledThreadPoolExecutor實例的創(chuàng)建,獲取重試任務(wù)的過程以及提交任務(wù)的過程角度分析,并通過源代碼的剖析和測試工具MyEclipse進行演示測試內(nèi)存泄露的情況,得出避免內(nèi)存泄露的解決方案。
關(guān)鍵詞:ScheduledThreadPoolExecutor 線程池 內(nèi)存溢出
中圖分類號:TP3 文獻標(biāo)識碼:A 文章編號:1672-3791(2016)03(a)-0015-03
1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程及線程池工作機制
1.1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程
重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:(1)獲取當(dāng)前機器上處理器的數(shù)量;(2)使用Google的ThreadFactoryBuiler創(chuàng)建指定格式名稱的線程,以方便查看問題;(3)有需要被拒絕的任務(wù)時,拋出異常;(4)創(chuàng)建定時任務(wù)池;打開MyEclipse工具顯示相對的代碼:int corePoolSize=Runtime.getRuntime().availableProcessors();
ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();
RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();
ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);
線程池就是多個線程在一個隊列中取任務(wù)執(zhí)行,提交的任務(wù)會被放入隊列中等待線程執(zhí)行,故隊列要設(shè)置一個大小。線程池同樣會根據(jù)任務(wù)繁忙程度來動態(tài)調(diào)整連接數(shù),空閑時保持最小連接數(shù),繁忙時增加連接,但不會超過上限,具有伸縮性,線程的創(chuàng)建和銷毀也需要消耗系統(tǒng)資源,線程的連接重用就可以避免這部分損失,具有重用性。
1.2 線程池工作機制
線程獲取任務(wù)的策略就是如果當(dāng)前線程池運行狀態(tài)正常,則阻塞等待任務(wù),否則直接返回或等待有限時間后返回。線程池中線程的主要任務(wù)就是獲取任務(wù),然后執(zhí)行,然后再去獲取任務(wù),如此循環(huán),這就實現(xiàn)了線程池中線程的可重用。
Worker封裝了任務(wù),同時創(chuàng)建了新的線程,并被添加到集合workers中,這個workers其實就是最核心的線程池。通過run方法實現(xiàn)重用。private final HashSet
public void run(){
try{
Runnable task=firstTask;
firstTask=null;
while(task!=null||(task=getTask())!=null){
runTask(task);
task=null;}}
finally{
workerDone(this);
}
}
Runnable getTask(){
for(;;){
try{
int state=runState;
if(state>SHUTDOWN){return null;}
Runnable r;
if(state==SHUTDOWN){r=workQueue.poll();}
else if(poolSize>corePoolSize||allowCoreThreadTimeOut){
r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);
}else{r=workQueue.take();}
if(r!=null){return r;}
if(workerCanExit()){
if(runState>=SHUTDOWN){interruptIdleWorkers();}
return null;
}
}
catch(InterruptedException ie){}
}
}
private boolean workerCanExit(){
final RenntrantLock mainLock=this.mainLock;
mainLock.lock();
boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));
}finally{mainLock.unLock();}
return canExit;
}
如果此時線程池運行狀態(tài)是終止(runState >= STOP),或者隊列為空,或者允許核心線程超時并且線程池中線程數(shù)量大于最小線程數(shù)量,那么方法將返回true。再回到getTask方法,調(diào)用workerCanExit方法的前提是沒有獲取到任務(wù),根據(jù)上邊獲取任務(wù)的過程,這幾個條件都有可能成立,所以此時getTask方法可以返回null,上層Worker的run方法從while循環(huán)重返回,整個線程結(jié)束,這就實現(xiàn)了線程池的可伸縮。
2 ScheduledThreadPoolExecutor獲取任務(wù)的過程
在getTask()中,描述了整個獲取任務(wù)的過程,如果線程池運行狀態(tài)已經(jīng)是SHUTDOWN了,調(diào)用非阻塞方法poll,因為如果當(dāng)前有任務(wù),那么可以獲取到任務(wù)并返回,如果沒有任務(wù),也沒有必要阻塞在隊列上等待任務(wù),因為已經(jīng)SHUTDOWN,后續(xù)不會再有任務(wù)進入。
如果當(dāng)前線程數(shù)大于最小線程數(shù),或者核心線程也可以做超時處理,意味著如果獲取不到任務(wù)就可以銷毀一部分線程了,所以poll方法設(shè)置了等待時間,超時后立即返回。
另一種情況是線程池還在運行狀態(tài),并且當(dāng)前線程數(shù)不大于最小線程數(shù),同時也不允許最小線程數(shù)以內(nèi)的線程超時,這個時候線程就要調(diào)用阻塞方法take,等待任務(wù)進入隊列以后才返回。
3 ScheduledThreadPoolExecutor提交任務(wù)的執(zhí)行過程
ScheduledThreadPoolExecutor提交任務(wù)的執(zhí)行過程,首先提交任務(wù):taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);
ScheduledThreadPoolExecutor通過schedule方法提交定時任務(wù),schedule方法源碼如下:
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){
if(command==null||unit==null){throw new NullPointerException();}
if(delay<0){delay=0;}
RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask
delayedExecute(t);
return t;
}
提交的任務(wù)會被封裝成ScheduledFutureTask類型對象。
分析delayedExecute方法:private void delayedExecute(Runnable command){
if(isShutDown()){reject(command);return;}
if(getPoolSize() super.getQueue().add(command); } 如果線程的運行狀態(tài)不是RUNNNING或者入隊列沒有成功,則采用線程池的構(gòu)造方法中設(shè)置的拒絕策略來處理任務(wù)。 如果當(dāng)前線程池中的線程數(shù)量poolSize小于線程池核心線程的數(shù)量corePoolSize,執(zhí)行prestartCoreThread(),該方法會創(chuàng)建一個新的線程來執(zhí)行任務(wù),如果prestartCoreThread()創(chuàng)建的新線程執(zhí)行任務(wù)失敗或者當(dāng)前線程池中的線程數(shù)量poolSize大于等于線程池核心線程數(shù)量corePoolSize,當(dāng)若線程池的運行狀態(tài)是RUNNING并且入隊成功,由于在多線程環(huán)境下,狀態(tài)隨時可能會改變,此時線程池的運行狀態(tài)runState不是RUNNING或者線程池中沒有可用的線程(poolSize==0),要確保進入的任務(wù)被執(zhí)行處理,線程池在初始化完成以后是空的,并沒有線程,如果在服務(wù)器中使用線程池,服務(wù)重啟后有大量請求進入,則要同時創(chuàng)建多個線程,而且創(chuàng)建過程是加鎖同步的,會導(dǎo)致一定的競爭,解決辦法就是線程池初始化時調(diào)用prestartAllCoreThreads方法啟動核心線程數(shù)量的線程,這樣就能在線程池中的線程就緒以后才開始接收請求。 通過getQueue方法獲取任務(wù)隊列,并且調(diào)用add方法向隊列中添加任務(wù),dq的定義: private final DelayQueue public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);} 可以看出dq是阻塞隊列,線程池中的線程都是在隊列中取數(shù)據(jù),ScheduledThreadPoolExecutor中的構(gòu)造方法里的隊列的實現(xiàn)使用鏈表結(jié)構(gòu)的阻塞隊列,add方法內(nèi)部調(diào)用offer方法,offer源碼如下:public boolean offer(E e){
final ReentrantLock lock=this.lock();
lock.lock();
try{
E first=q.peek();
q.offer(e);
if(first==null||e.compareTo(first)<0){
available.singalAll();
return true; }
}finally{lock.unlock();}}
這方法需要在多線程環(huán)境下同步執(zhí)行,會用到鎖Lock。鎖實現(xiàn)的大概原理如下。
Lock實現(xiàn)鎖的方式是通過排他性更新共享變量,更新成功的線程繼續(xù)執(zhí)行,沒有更新成功的線程將會被阻塞。Lock的共享變量state在可重入鎖中可以用來表示一個線程調(diào)用了幾次lock方法,也就是有幾次獲取鎖的行為。Lock的功能實現(xiàn)是通過內(nèi)部聚合了抽象隊列同步器(AQS),同步器有公平和非公平之分。非公平同步器對于新來的線程會嘗試獲取,不成功以后才會進入等待隊列,而公平同步器則會首先判斷是否排隊。AQS中會保存獲取鎖的當(dāng)先線程的引用。如果一次性嘗試獲取鎖不成功,則線程會進入隊列,循環(huán)嘗試獲取鎖。
peek方法會獲取隊列的第一個元素,只是獲取,并沒有出隊列。接著調(diào)用優(yōu)先級隊列PriorityQueue類型變量q的offer方法將隊列入隊,優(yōu)先級隊列會對任務(wù)進行排序,距離執(zhí)行時間越近,位置越靠前。下邊的if判斷可以這樣理解,first是在當(dāng)前任務(wù)入隊之前獲取的,也就是隊列中原有的第一個任務(wù),compareTo的這段比較是說當(dāng)前任務(wù)的執(zhí)行時間比隊列中第一個任務(wù)執(zhí)行時間還要早,如果first是null,那么當(dāng)前任務(wù)入隊后將是第一個元素,如果當(dāng)前任務(wù)的執(zhí)行時間比隊列中第一個任務(wù)的執(zhí)行時間早,那么當(dāng)前入隊后也將是第一個元素,只要這兩個條件有一個成立了,這個if的判斷條件就為true,就要執(zhí)行Condition類型的available變量的signalAll方法,喚醒等待的線程工作。
4 隊列的大小判斷
隊列的大小是決定內(nèi)存溢出最直觀的因素,首先來看看優(yōu)先級隊列PriorityQueue的offer方法:public boolean offer(E e){
if(e==null){throw new NullPointerException();}
modCount++;
int i=size;
if(i>=queue.length){grow(i+1);}
size=i+1;
if(i==0){queue[0]=e;}
else{siftUp(i,e);}
return true;
上述代碼表示如果隊列中元素的個數(shù)(size)大于等于隊列的長度,將要通過grow方法擴容,如下:private void grow(int minCapacity){
if(minCapacity<0){throw new OutOfMemoryError();}
int oldCapacity=queue.length;
int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));
if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}
if(newCapacity queue=Arrays.copyof(queue,newCapacity); } 若隊列容量小于64,那就在原有基礎(chǔ)上加1然后擴大2倍,這種情況絕對不會造成內(nèi)存的溢出問題。如果大于等于64呢?直接擴容一半,然后將值賦給一個int型變量,當(dāng)某種情況如果超過int類型的最大值了,JDK的處理是賦值成Integer的MAX_VALUE為2147483647,也就是最大的隊列長度是2 G多,如果一個對象的大小按照50個字節(jié)來算,將會占用100 G的內(nèi)存必定溢出。 5 模擬內(nèi)存溢出代碼測試 當(dāng)業(yè)務(wù)高峰給服務(wù)器帶來大量請求,大量的超時會導(dǎo)致重試隊列迅速堆積,直到內(nèi)存溢出,下面就通過代碼來測試一下:模擬大量的添加任務(wù),并且任務(wù)在調(diào)度隊列中堆積,推遲一天執(zhí)行。 while(true){ taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS); } 虛擬機啟動參數(shù)-: -Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/ 運行輸出: java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof... Heap dump file created [44940425 bytes in 0.618 secs] 內(nèi)存溢出了,來看看內(nèi)存快照(見表1)。 6 解決方案及措施 編譯好的java程序需要運行在JVM中,而JVM為程序提供并管理所需要的內(nèi)存空間,JVM自帶一個用于回收沒有任何引用指向的對象的線程機制(垃圾回收器),但針對于ScheduledThreadPoolExecutor提交的任務(wù)會被封裝成ScheduledFutureTask類型對象且每個對象中又有Sync成員變量。解決的辦法可以是手動判斷隊列的大小,通過taskService.getQueue().size()方法,通過Jmap內(nèi)存分析工具估算每個對象的大小,Jmap是一個可以輸出所有內(nèi)存中對象的工具,甚至可以將JVM 中的heap,以二進制輸出成文本。打印出某個Java進程內(nèi)存內(nèi)的所有‘對象的情況,結(jié)合能夠為隊列分配的內(nèi)存大小,計算出隊列容納任務(wù)的最大數(shù)量,以避免內(nèi)存溢出。 參考文獻 [1] 逯昌浩.淺析多核處理器條件下的Java編程[J].中國科技信息,2009(12):128,130. [2] 張復(fù)興,曾新洲.擴展線程池模型及性能分析[J].計算技術(shù)與自動化,2007(4):110-112. [3] (美)Bruce Eckel.Java編程思想[M].陳昊鵬,譯.北京:機械工業(yè)出版社,2007.