线上真实案例,多次调用线程池 ThreadPoolExecutor 的 invokeAll() 方法进行数据统计时任务被拒绝,故事从此开始。
本文重在讲述问题的产生、抽象、寻找解决方法的过程,并结合源码对原因进行抽丝剥茧般的分析。bug 千千万万,唯有合理的逻辑推理思维才能让这些 bug 显露原形。
问题起源与抽象 先来看一段简单的代码,定义一个核心线程数5、有界队列5的线程池,然后创建10个任务丢进去执行2次。
按照以前对线程池执行逻辑的理解,创建的10个线程,会先交给核心线程去执行,5个核心线程满了之后,存放到队列中,刚好存储剩下的5个,按理说10个任务都会正常执行完毕。本次只测试固定大小的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class InvokeAllTest { private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5 , 5 , 60 * 1000 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(5 ), new MyThreadFactory()); public static void main (String[] args) { List<Callable<Void>> tasks = new ArrayList<>(); for (int i = 0 ; i < 10 ; i++) { tasks.add(new InvokeAllThread()); } System.out.println("第一次任务执行前的executor: " + executor); try { executor.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一次任务执行完毕后的executor: " + executor); System.out.println("==============第一次任务执行完毕,开始第二次任务============" ); try { Thread.sleep(1000 ); executor.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二次任务执行完毕后的executor:" + executor); } static class InvokeAllThread implements Callable <Void > { @Override public Void call () throws Exception { System.out.println(Thread.currentThread().getName()); return null ; } } static class MyThreadFactory implements ThreadFactory { private AtomicInteger threadNum = new AtomicInteger(1 ); @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r, String.valueOf(threadNum.getAndIncrement())); if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
运行程序后发现,第一次调用 invokeAll 正常执行,第二次调用报错。多次执行结果相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 第一次任务执行前的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 1 2 3 4 4 5 3 2 3 3 第一次任务执行完毕后的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10] ==============第一次任务执行完毕,开始第二次任务============ 2 4 5 2 1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 13] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:238) at com.aaron.hp.thread.pool.InvokeAllTest.main(InvokeAllTest.java:36)
问题排查与猜测 既然程序出现异常,就该调用 debug 模式进行排查,并遵循”大胆猜测,小心求证”的态度,去解决这个问题。
猜测一:invokeAll 在异步执行后会不会同步等待线程执行完毕获取最终结果 由于 invokeAll 封装的太好,之前只知道最后会同步等待才能获取返回值。那么现在就需要去证实这个概念。
进入 invokeAll 方法后,发现调用了f.get()
,那么毫无疑问,这个猜测可以排除掉了。
其实从执行过程的输出内容也可以看出,两次调用 invokeAll 的执行顺序和界限(打印语句) 非常明显。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null ) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false ; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0 , size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true ; return futures; } finally { if (!done) for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } }
猜测二:队列里面可能存在第一次调用 invokeAll 执行了但没有删掉的任务,所以才会导致第二次放入队列失败 由于未阅读源码,猜测只有当创建的任务执行完毕并且销毁之后,才会从队列中真正移除。
那么就需要查看入队列和出队列的时机。查看 invokeAll 方法中的 execute(f)
方法。
查看 ThreadPoolExecutor 类下的 execute 方法源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
此时就会发现入队列的操作在workQueue.offer(command)
处完成,而我们提交的任务是由一个叫 Worker 类的实例来执行,addWorker(command, true)
创建 Worker 实例。
那么我们就分别进去这两个方法来看下源码:
矮油黑人问号脸。。没想到这个 ThreadPoolExecutor 类的 addWorker 这么长,给核心代码写个注释重点关注,扫一眼然后去看 offer 方法(英文注释是源码中自带的)。前面都是校验,创建核心线程处为new Worker(firstTask)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
接着是 ArrayBlockingQueue 类的 offer 方法,在 enqueue(e)
处进入队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } }
此时我们先来调试一波,看看入队列时这些方法的执行情况,在三个 if 处分别设置断点,在 addWorker 和 offer 方法靠前的未知打断点,确定是否会进入。
第一次调用 invokeAll:addWorker 进入5次,offer 方法进入5次。
第二次调用 invokeAll:addWorker 进入0次,offer 方法进入10次(可能是5-10次)。
那么发现了新的问题:程序居然没报错!正常执行完成!这不科学!
带着疑惑,重新 debug,居然还没报错!难道之前的异常是偶然吗?
以最快速度连按 F9 debug了几次,有时候报错。。
重新运行 run 了几次,次次报错。。
怀疑人生了。。
此时墨菲定律 在我头脑中回响,”偶然事件存在必然的因素”。那么大胆猜测,这个原因极有可能是队列消费速度较慢导致的,去查看消费部分的源码。由于 worker 也是一个线程,那么肯定有类似的 run 方法:
查看 ThreadPoolExecutor 类 的 Worker 这个内部类,找到 run()
方法:
1 2 3 public void run () { runWorker(this ); }
而 run 方法调用的是 ThreadPoolExecutor 类里的 runWorker(this)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
查看 ThreadPoolExecutor 类下的 getTask()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
看到这里,猜测二也就不攻自破,出队列后任务才会被执行,所以某个任务出队列后,执行成功与否与队列再无瓜葛。(注意这个说法只针对默认代码,如果自定义了拒绝策略是可以将被 interrupt 的线程重新塞回队列里的)
两次猜测失败后的总结
队列是异步消费的,但入队是同步进行的,如果队列的容量不足以承载要存入队列的任务数,就会被拒绝。(虽然是 ArrayBlockQueue 的特性,但这是通过 debug 以及 run 后观察到的)
第一次 addWorker 方法执行了5次,offer 执行了5次;第二次则是 0 次,10 次。刚才忽略了这个细节,那么需要重新找到相应的源码阅读。
任务从队列中移除与任务是否执行完毕无关,先移除,后执行。
我们创建的任务,是由 worker 核心线程去调用任务的 run 方法来同步执行的,而不是调用任务实例的 start 去异步执行,这也就是为什么 invokeAll 可以获取到返回值的原因所在。
备注: 这里有点绕,任务实例指的是我们最开始在 for 循环中创建的10个tasks new InvokeAllThread()
,为什么继承了 Callable 明明改写的是 call()
方法,但却有 run()
方法可以被调用呢?这是因为在 invokeAll()
方法执行execute()
方法前,通过RunnableFuture<T> f = newTaskFor(t);
进行了包装。
复查源码,真相大白 查看 ThreadPoolExecutor 类下的 execute() 方法,创建 worker 前的判断如下:
1 if (workerCountOf(c) < corePoolSize) { ...}
第一次调用 invokeAll 时,线程池中的核心线程 worker 数为0,小于 corePoolSize,所以前5次会创建 worker 核心线程并返回,此时随着 worker 的创建,我们创建的10个任务中的5个也会随着 worker 的创建作为 firstTask 属性被传进去。后5个任务则被放入 queue 中。
第二次调用 invokeAll 时,线程池中的核心数已经是5,所以10个任务都会被放入 queue 中异步消费,但是我们的 queue 的容量为5。如果消费速度快于入队速度(debug),那么10个任务会正常执行。但是入队速度太快的话(run),前5个肯定可以入队,后面的5个几乎都会被拒绝。
问题解决方案
对于固定大小的线程池,我们要按照实际情况设置 queue 和 worker 的数量。根据任务类型(IO/CPU)以及机器配置(CPU 核数等)设置 worker 核心线程数;而根据我们的任务多少来设定 queue 的大小,而不是 queue + worker 的总数。
重写拒绝策略,将被丢弃的任务重新 put 回队列中去,put 是阻塞的。
参考 ThreadPoolExecutor源码分析及阻塞提交任务方法
Thread的中断机制(interrupt)