国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

采用ScheduledThreadPoolExecutor執(zhí)行定時重試任務(wù)時內(nèi)存溢出的分析及解決

2016-05-14 11:20:05余志堅姜春志
科技資訊 2016年7期
關(guān)鍵詞:入隊線程調(diào)用

余志堅 姜春志

摘 要:開發(fā)JavaWeb項目中發(fā)現(xiàn)服務(wù)之間的調(diào)用存在超時情況,由于涉及的處理邏輯全部是異步,引入定時重試的機制,重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。當(dāng)A服務(wù)依賴B服務(wù),B服務(wù)由于在業(yè)務(wù)高峰期處理能力降低,導(dǎo)致大量A服務(wù)過來的請求超時,A加入了超時重試機制,間隔時間根據(jù)重試次數(shù)的多少來決定,次數(shù)越多,兩次重試之間間隔的時間越多,此時的業(yè)務(wù)高峰也會給A帶來大量請求,大量的超時會導(dǎo)致重試隊列迅速堆積,直到內(nèi)存溢出。該文從線程池工作機制、ScheduledThreadPoolExecutor實例的創(chuàng)建,獲取重試任務(wù)的過程以及提交任務(wù)的過程角度分析,并通過源代碼的剖析和測試工具MyEclipse進行演示測試內(nèi)存泄露的情況,得出避免內(nèi)存泄露的解決方案。

關(guān)鍵詞:ScheduledThreadPoolExecutor 線程池 內(nèi)存溢出

中圖分類號:TP3 文獻標(biāo)識碼:A 文章編號:1672-3791(2016)03(a)-0015-03

1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程及線程池工作機制

1.1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程

重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:(1)獲取當(dāng)前機器上處理器的數(shù)量;(2)使用Google的ThreadFactoryBuiler創(chuàng)建指定格式名稱的線程,以方便查看問題;(3)有需要被拒絕的任務(wù)時,拋出異常;(4)創(chuàng)建定時任務(wù)池;打開MyEclipse工具顯示相對的代碼:int corePoolSize=Runtime.getRuntime().availableProcessors();

ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();

RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();

ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);

線程池就是多個線程在一個隊列中取任務(wù)執(zhí)行,提交的任務(wù)會被放入隊列中等待線程執(zhí)行,故隊列要設(shè)置一個大小。線程池同樣會根據(jù)任務(wù)繁忙程度來動態(tài)調(diào)整連接數(shù),空閑時保持最小連接數(shù),繁忙時增加連接,但不會超過上限,具有伸縮性,線程的創(chuàng)建和銷毀也需要消耗系統(tǒng)資源,線程的連接重用就可以避免這部分損失,具有重用性。

1.2 線程池工作機制

線程獲取任務(wù)的策略就是如果當(dāng)前線程池運行狀態(tài)正常,則阻塞等待任務(wù),否則直接返回或等待有限時間后返回。線程池中線程的主要任務(wù)就是獲取任務(wù),然后執(zhí)行,然后再去獲取任務(wù),如此循環(huán),這就實現(xiàn)了線程池中線程的可重用。

Worker封裝了任務(wù),同時創(chuàng)建了新的線程,并被添加到集合workers中,這個workers其實就是最核心的線程池。通過run方法實現(xiàn)重用。private final HashSet workers=new HashSet();

public void run(){

try{

Runnable task=firstTask;

firstTask=null;

while(task!=null||(task=getTask())!=null){

runTask(task);

task=null;}}

finally{

workerDone(this);

}

}

Runnable getTask(){

for(;;){

try{

int state=runState;

if(state>SHUTDOWN){return null;}

Runnable r;

if(state==SHUTDOWN){r=workQueue.poll();}

else if(poolSize>corePoolSize||allowCoreThreadTimeOut){

r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);

}else{r=workQueue.take();}

if(r!=null){return r;}

if(workerCanExit()){

if(runState>=SHUTDOWN){interruptIdleWorkers();}

return null;

}

}

catch(InterruptedException ie){}

}

}

private boolean workerCanExit(){

final RenntrantLock mainLock=this.mainLock;

mainLock.lock();

boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));

}finally{mainLock.unLock();}

return canExit;

}

如果此時線程池運行狀態(tài)是終止(runState >= STOP),或者隊列為空,或者允許核心線程超時并且線程池中線程數(shù)量大于最小線程數(shù)量,那么方法將返回true。再回到getTask方法,調(diào)用workerCanExit方法的前提是沒有獲取到任務(wù),根據(jù)上邊獲取任務(wù)的過程,這幾個條件都有可能成立,所以此時getTask方法可以返回null,上層Worker的run方法從while循環(huán)重返回,整個線程結(jié)束,這就實現(xiàn)了線程池的可伸縮。

2 ScheduledThreadPoolExecutor獲取任務(wù)的過程

在getTask()中,描述了整個獲取任務(wù)的過程,如果線程池運行狀態(tài)已經(jīng)是SHUTDOWN了,調(diào)用非阻塞方法poll,因為如果當(dāng)前有任務(wù),那么可以獲取到任務(wù)并返回,如果沒有任務(wù),也沒有必要阻塞在隊列上等待任務(wù),因為已經(jīng)SHUTDOWN,后續(xù)不會再有任務(wù)進入。

如果當(dāng)前線程數(shù)大于最小線程數(shù),或者核心線程也可以做超時處理,意味著如果獲取不到任務(wù)就可以銷毀一部分線程了,所以poll方法設(shè)置了等待時間,超時后立即返回。

另一種情況是線程池還在運行狀態(tài),并且當(dāng)前線程數(shù)不大于最小線程數(shù),同時也不允許最小線程數(shù)以內(nèi)的線程超時,這個時候線程就要調(diào)用阻塞方法take,等待任務(wù)進入隊列以后才返回。

3 ScheduledThreadPoolExecutor提交任務(wù)的執(zhí)行過程

ScheduledThreadPoolExecutor提交任務(wù)的執(zhí)行過程,首先提交任務(wù):taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);

ScheduledThreadPoolExecutor通過schedule方法提交定時任務(wù),schedule方法源碼如下:

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){

if(command==null||unit==null){throw new NullPointerException();}

if(delay<0){delay=0;}

RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask(command,null,triggerTime));

delayedExecute(t);

return t;

}

提交的任務(wù)會被封裝成ScheduledFutureTask類型對象。

分析delayedExecute方法:private void delayedExecute(Runnable command){

if(isShutDown()){reject(command);return;}

if(getPoolSize()

super.getQueue().add(command);

}

如果線程的運行狀態(tài)不是RUNNNING或者入隊列沒有成功,則采用線程池的構(gòu)造方法中設(shè)置的拒絕策略來處理任務(wù)。

如果當(dāng)前線程池中的線程數(shù)量poolSize小于線程池核心線程的數(shù)量corePoolSize,執(zhí)行prestartCoreThread(),該方法會創(chuàng)建一個新的線程來執(zhí)行任務(wù),如果prestartCoreThread()創(chuàng)建的新線程執(zhí)行任務(wù)失敗或者當(dāng)前線程池中的線程數(shù)量poolSize大于等于線程池核心線程數(shù)量corePoolSize,當(dāng)若線程池的運行狀態(tài)是RUNNING并且入隊成功,由于在多線程環(huán)境下,狀態(tài)隨時可能會改變,此時線程池的運行狀態(tài)runState不是RUNNING或者線程池中沒有可用的線程(poolSize==0),要確保進入的任務(wù)被執(zhí)行處理,線程池在初始化完成以后是空的,并沒有線程,如果在服務(wù)器中使用線程池,服務(wù)重啟后有大量請求進入,則要同時創(chuàng)建多個線程,而且創(chuàng)建過程是加鎖同步的,會導(dǎo)致一定的競爭,解決辦法就是線程池初始化時調(diào)用prestartAllCoreThreads方法啟動核心線程數(shù)量的線程,這樣就能在線程池中的線程就緒以后才開始接收請求。

通過getQueue方法獲取任務(wù)隊列,并且調(diào)用add方法向隊列中添加任務(wù),dq的定義:

private final DelayQueue dq=new DelayQueue();

public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);}

可以看出dq是阻塞隊列,線程池中的線程都是在隊列中取數(shù)據(jù),ScheduledThreadPoolExecutor中的構(gòu)造方法里的隊列的實現(xiàn)使用鏈表結(jié)構(gòu)的阻塞隊列,add方法內(nèi)部調(diào)用offer方法,offer源碼如下:public boolean offer(E e){

final ReentrantLock lock=this.lock();

lock.lock();

try{

E first=q.peek();

q.offer(e);

if(first==null||e.compareTo(first)<0){

available.singalAll();

return true; }

}finally{lock.unlock();}}

這方法需要在多線程環(huán)境下同步執(zhí)行,會用到鎖Lock。鎖實現(xiàn)的大概原理如下。

Lock實現(xiàn)鎖的方式是通過排他性更新共享變量,更新成功的線程繼續(xù)執(zhí)行,沒有更新成功的線程將會被阻塞。Lock的共享變量state在可重入鎖中可以用來表示一個線程調(diào)用了幾次lock方法,也就是有幾次獲取鎖的行為。Lock的功能實現(xiàn)是通過內(nèi)部聚合了抽象隊列同步器(AQS),同步器有公平和非公平之分。非公平同步器對于新來的線程會嘗試獲取,不成功以后才會進入等待隊列,而公平同步器則會首先判斷是否排隊。AQS中會保存獲取鎖的當(dāng)先線程的引用。如果一次性嘗試獲取鎖不成功,則線程會進入隊列,循環(huán)嘗試獲取鎖。

peek方法會獲取隊列的第一個元素,只是獲取,并沒有出隊列。接著調(diào)用優(yōu)先級隊列PriorityQueue類型變量q的offer方法將隊列入隊,優(yōu)先級隊列會對任務(wù)進行排序,距離執(zhí)行時間越近,位置越靠前。下邊的if判斷可以這樣理解,first是在當(dāng)前任務(wù)入隊之前獲取的,也就是隊列中原有的第一個任務(wù),compareTo的這段比較是說當(dāng)前任務(wù)的執(zhí)行時間比隊列中第一個任務(wù)執(zhí)行時間還要早,如果first是null,那么當(dāng)前任務(wù)入隊后將是第一個元素,如果當(dāng)前任務(wù)的執(zhí)行時間比隊列中第一個任務(wù)的執(zhí)行時間早,那么當(dāng)前入隊后也將是第一個元素,只要這兩個條件有一個成立了,這個if的判斷條件就為true,就要執(zhí)行Condition類型的available變量的signalAll方法,喚醒等待的線程工作。

4 隊列的大小判斷

隊列的大小是決定內(nèi)存溢出最直觀的因素,首先來看看優(yōu)先級隊列PriorityQueue的offer方法:public boolean offer(E e){

if(e==null){throw new NullPointerException();}

modCount++;

int i=size;

if(i>=queue.length){grow(i+1);}

size=i+1;

if(i==0){queue[0]=e;}

else{siftUp(i,e);}

return true;

上述代碼表示如果隊列中元素的個數(shù)(size)大于等于隊列的長度,將要通過grow方法擴容,如下:private void grow(int minCapacity){

if(minCapacity<0){throw new OutOfMemoryError();}

int oldCapacity=queue.length;

int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));

if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}

if(newCapacity

queue=Arrays.copyof(queue,newCapacity);

}

若隊列容量小于64,那就在原有基礎(chǔ)上加1然后擴大2倍,這種情況絕對不會造成內(nèi)存的溢出問題。如果大于等于64呢?直接擴容一半,然后將值賦給一個int型變量,當(dāng)某種情況如果超過int類型的最大值了,JDK的處理是賦值成Integer的MAX_VALUE為2147483647,也就是最大的隊列長度是2 G多,如果一個對象的大小按照50個字節(jié)來算,將會占用100 G的內(nèi)存必定溢出。

5 模擬內(nèi)存溢出代碼測試

當(dāng)業(yè)務(wù)高峰給服務(wù)器帶來大量請求,大量的超時會導(dǎo)致重試隊列迅速堆積,直到內(nèi)存溢出,下面就通過代碼來測試一下:模擬大量的添加任務(wù),并且任務(wù)在調(diào)度隊列中堆積,推遲一天執(zhí)行。

while(true){

taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS);

}

虛擬機啟動參數(shù)-:

-Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/

運行輸出:

java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof...

Heap dump file created [44940425 bytes in 0.618 secs]

內(nèi)存溢出了,來看看內(nèi)存快照(見表1)。

6 解決方案及措施

編譯好的java程序需要運行在JVM中,而JVM為程序提供并管理所需要的內(nèi)存空間,JVM自帶一個用于回收沒有任何引用指向的對象的線程機制(垃圾回收器),但針對于ScheduledThreadPoolExecutor提交的任務(wù)會被封裝成ScheduledFutureTask類型對象且每個對象中又有Sync成員變量。解決的辦法可以是手動判斷隊列的大小,通過taskService.getQueue().size()方法,通過Jmap內(nèi)存分析工具估算每個對象的大小,Jmap是一個可以輸出所有內(nèi)存中對象的工具,甚至可以將JVM 中的heap,以二進制輸出成文本。打印出某個Java進程內(nèi)存內(nèi)的所有‘對象的情況,結(jié)合能夠為隊列分配的內(nèi)存大小,計算出隊列容納任務(wù)的最大數(shù)量,以避免內(nèi)存溢出。

參考文獻

[1] 逯昌浩.淺析多核處理器條件下的Java編程[J].中國科技信息,2009(12):128,130.

[2] 張復(fù)興,曾新洲.擴展線程池模型及性能分析[J].計算技術(shù)與自動化,2007(4):110-112.

[3] (美)Bruce Eckel.Java編程思想[M].陳昊鵬,譯.北京:機械工業(yè)出版社,2007.

猜你喜歡
入隊線程調(diào)用
今天我入隊——入隊儀式
少先隊活動(2022年5期)2022-06-06 03:45:02
1+1我們這樣學(xué)隊章:我們的入隊誓詞
少先隊活動(2020年7期)2020-08-14 01:17:50
核電項目物項調(diào)用管理的應(yīng)用研究
LabWindows/CVI下基于ActiveX技術(shù)的Excel調(diào)用
今天我入隊了
入隊風(fēng)波
淺談linux多線程協(xié)作
基于系統(tǒng)調(diào)用的惡意軟件檢測技術(shù)研究
利用RFC技術(shù)實現(xiàn)SAP系統(tǒng)接口通信
Linux線程實現(xiàn)技術(shù)研究
织金县| 黄梅县| 西吉县| 夏河县| 广平县| 南城县| 尉犁县| 扶风县| 都江堰市| 南阳市| 曲阳县| 中宁县| 奉化市| 乌苏市| 陵川县| 西峡县| 双柏县| 岳西县| 安陆市| 个旧市| 丹东市| 乌海市| 靖江市| 嵊州市| 喀喇沁旗| 延寿县| 乐平市| 依兰县| 大埔县| 绍兴县| 左贡县| 博爱县| 舒城县| 富平县| 卢氏县| 图木舒克市| 蓝田县| 兴和县| 安溪县| 大庆市| 枣庄市|