线程池使用FutureTask的时候如果拒绝策略设置为了DiscardPolicy和DiscardOldestPolicy并且在被拒绝的任务的Future对象上调用无参get方法那么调用线程会一直被阻塞。
问题复现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public static void main (String[] args) throws Exception { ThreadPoolExecutor executorService = new ThreadPoolExecutor (1 , 1 , 1L , TimeUnit.MINUTES, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .DiscardPolicy()); Future futureOne = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable one" ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); Future futureTwo = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable two" ); } }); Future futureThree = null ; try { futureThree = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable three" ); } }); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getLocalizedMessage()); } System.out.println("task one finish " + futureOne.get()); System.out.println("task two finish " + futureTwo.get()); System.out.println("task three finish " + (futureThree == null ? null : futureThree.get())); executorService.shutdown(); }
运行结果:
1 2 3 4 5 start runable one task one finish null start runable two task two finish null
创建了一个单线程并且队列元素个数为1的线程池,并且拒绝策略设置为了DiscardPolicy;先提交任务one,这个任务会使用唯一的一个线程进行执行,任务在打印 start runable one后会阻塞该线程5s;再向线程池提交了一个任务two,这时候会把任务two放入到阻塞队列;提交任务three时,由于队列已经满了则会触发拒绝策略丢弃任务three。
从运行结果看在任务one阻塞的5s内,主线程执行到了代码(5)等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出task one finish null。之后线程池的唯一线程会去队列里面取出任务two并执行所以输出start runable two,然后代码(6)会返回,这时候主线程输出task two finish null,然后执行代码(7)等待任务three执行完毕,从执行结果看代码(7)会一直阻塞不会返回。
至此问题产生,如果把拒绝策略修改为DiscardOldestPolicy也会存在有一个任务的get方法一直阻塞只是现在是任务two被阻塞:
1 2 3 4 start runable one task one finish null start runable three
但是如果拒绝策略设置为默认的AbortPolicy则会抛出RejectedExecutionException并正常返回。
问题分析
要分析这个问题需要看下线程池的submit方法里面做了什么,submit方法代码如下:
1 2 3 4 5 6 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
newTaskFor()把Runnable转为FutureTask对象,FutureTask实现RunnableFuture接口,继续跟execute:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void execute (Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
1 2 3 final void reject (Runnable command) { handler.rejectedExecution(command, this ); }
再来看下拒绝策略DiscardPolicy的代码:
1 2 3 4 5 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
这里rejectedExecution方法里面什么都没做,所以代码(4)调用submit后会返回一个future对象,即FutureTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FutureTask <V> implements RunnableFuture <V> { private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ;
state标识FutureTask的状态,初始状态是New。因此使用DiscardPolicy策略提交后返回了一个状态为NEW的FutureTask对象。
那么下面就需要看下当调用future的无参get方法时候当future变为什么状态时候才会返回:
1 2 3 4 5 6 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }
1 2 3 4 5 6 7 8 9 private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException (); throw new ExecutionException ((Throwable)x); }
也就是说当future的状态 >COMPLETING 时候调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素的时候并没有设置该future的状态,后面也没有其他机会可以设置该future的状态,所以future的状态一直是NEW,所以一直不会返回,同理DiscardOldestPolicy策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于future的状态,也会导致一直不会返回。
那么默认的AbortPolicy策略为啥没问题那?来看AbortPolicy策略代码:
1 2 3 4 5 6 7 8 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
看代码就应该明白了吧。
所以当使用Future的时候,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略。在执行策略时候设置该Future的状态大于COMPLETING即可,但是查看FutureTask提供的方法发现只有cancel方法是public的,并且可以设置FutureTask的状态大于COMPLETING,重写拒绝策略具体代码可以如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public static void main (String[] args) throws Exception { ThreadPoolExecutor executorService = new ThreadPoolExecutor (1 , 1 , 1L , TimeUnit.MINUTES, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .DiscardPolicy() { public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { if (r != null && r instanceof FutureTask) { ((FutureTask) r).cancel(true ); } } } } ); Future futureOne = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable one" ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); Future futureTwo = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable two" ); } }); Future futureThree = null ; try { futureThree = executorService.submit(new Runnable () { @Override public void run () { System.out.println("start runable three" ); } }); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getLocalizedMessage()); } try { System.out.println("task one finish " + futureOne.get()); System.out.println("task two finish " + futureTwo.get()); System.out.println("task three finish " + (futureThree == null ? null : futureThree.get())); } catch (Exception e) { e.printStackTrace(); } executorService.shutdown(); }
使用这个策略时候,由于report方法中对cancel的任务上会抛出CancellationException异常,所以在get()时使用try-catch捕获异常。运行后发现程序能正常退出。
转载自:http://ifeve.com/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%BD%BF%E7%94%A8futuretask%E6%97%B6%E5%80%99%E9%9C%80%E8%A6%81%E6%B3%A8%E6%84%8F%E7%9A%84%E4%B8%80%E7%82%B9%E4%BA%8B/