线程池将创建线程和使用线程解耦。优点是
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 以上是参数校验
this.corePoolSize = corePoolSize; // 核心线程数量
this.maximumPoolSize = maximumPoolSize; // 最大线程数量
this.workQueue = workQueue; // 任务队列
this.keepAliveTime = unit.toNanos(keepAliveTime); // 非核心线程的存活时间
this.threadFactory = threadFactory; // 线程工厂类
this.handler = handler; // 拒绝策略执行器
}
corePoolSize是核心线程数,默认核心线程不会被销毁。
maximumPoolSize 大等于corePoolSize,如果核心线程不足以任务并且任务队列已满,那么会新建非核心线程处理任务。如果线程数达到最大线程数,那么不执行任务,而是执行拒绝策略。非核心线程会被销毁,它们如果在keepAliveTime 存活时间内没有收到任务则会被销毁。
workQueue 任务队列存放工作线程还没来得及处理的任务。
threadFactory 是新建线程的工厂类,定义线程的名称,优先级以及是否是守护线程。默认用Executors.DefaultThreadFactory。
拒绝策略执行器是线程池无法接收新线程(可能原因1. 线程池已被关闭 2. 工作线程数已经是最大线程且任务队列满了)时执行的方法。默认用ThreadPoolExecutor#AbortPolicy.
上面2个方法都是构造器方法,第一个构造器方法,调用者需要指定四个参数,使用默认的线程工厂类和拒绝策略。第二个构造器方法,调用者可以指定所有参数。
线程池从建立到终止的状态由ctl表示。它的类型是AtomicInteger ,AtomicInteger 类实现原子性操作int值。
ctl的前3位是线程池状态,有5种,RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED。ctl的后29位是工作线程数量。
RUNNING=1,表示线程池正常运行。调用shutdown()方法后从RUNNING变为SHUTDOWN =0表示不接收新任务,但会执行完已有任务。调用shutdownNow()方法后从RUNNING变为STOP=1表示不接收新任务,且不执行已有任务。等待工作线程全结束了,且任务队列空了,从SHUTDOWN/STOP变为TIDYING=2表示线程池已清空。执行完钩子方法terminated()后从TIDYING变为TERMINATED =3表示线程池已终止。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 000(29个1)
private static final int RUNNING = -1 << COUNT_BITS; // 111(29个0)
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0
private static final int STOP = 1 << COUNT_BITS; // 001(29个0)
private static final int TIDYING = 2 << COUNT_BITS; // 010(29个0)
private static final int TERMINATED = 3 << COUNT_BITS; // 010(29个0)
// 从`ctl`获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 从`ctl`获取工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// 将线程池状态和工作线程数组合成`ctl`
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { // command是线程池接收的任务
if (command == null) // 参数校验
throw new NullPointerException();
int c = ctl.get(); // 线程池状态
if (workerCountOf(c) < corePoolSize) { // 如果工作线程数少于核心线程数
if (addWorker(command, true)) // 增加核心工作线程
return;
c = ctl.get();
}
// 运行至此,当前线程数不少于核心线程数
// isRunning()返回线程池是否是RUNNING状态,是则true
// workQueue.offer(command)) 将任务阻塞地加入队列
if (isRunning(c) && workQueue.offer(command)) {
// 运行至此,任务已进入阻塞队列
int recheck = ctl.get();
// 如果线程池状态不是RUNNING,取消任务,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程数是0,启动一个非核心工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 运行至此,没有成功加入阻塞队列
// 添加非核心线程执行任务
else if (!addWorker(command, false))
reject(command);
}
addWorker(command, true)表示线程池增加核心工作线程,addWorker(command, false)表示增加非核心线程。execute()方法的流程图如下
想理解addWorker()方法,得先理解Worker类。线程池的线程可以分为2类。一类是调用线程池,执行executor.execute(), executor.shutdown()等操作,称为线程池线程。一类是执行任务,从任务队列拉取任务,超过核心线程数且超时则自我销毁等操作,称为工作线程。
Worker封装Thread类型变量和Runnable类型变量。this.thread = getThreadFactory().newThread(this);表示thread是工作线程,执行Worker类的run()方法,后面addWork()方法会执行worker.start()真正开启工作线程。firstTask表示线程池任务。
Worker类继承AbstractQueuedSynchronizer是为了实现不可重入独占锁, state是Worker类的状态,表示当前工作线程是否正在执行线程池任务。工作线程除了执行线程池任务以外,还会执行从任务队列获取任务等操作。为了防止工作线程执行任务期间线程池线程操纵工作线程(比如设置中断位)干扰任务的正常执行,因此加锁。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread; // 工作线程
Runnable firstTask; // 线程池任务
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
runWorker(Worker w)定义工作线程声明周期内的操作,这个方法体里只有task.run();表示工作线程执行客户端发送给线程池的任务,其余的源码表示工作线程的其他操作。三层try-catch,第一层表示工作线程从任务队列拉取任务。第二层表示工作线程执行环境任务beforeExecute(), afterExecute()。第三层表示工作线程执行任务。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 当前线程是工作线程,不是调用线程池的客户端线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
// 第一层try结束代表当前工作线程要被销毁
try {
// task != null 表示新建工作线程时就分配任务
// getTask() 表示阻塞地从任务队列拉取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 至此,工作线程获得不可重入独占锁
// (runStateAtLeast(ctl.get(), STOP)表示线程池的状态是`STOP,TIDYING,TERMINATED`,线程池要求中断任务,中断任务是指设置中断位为true,而不是强制终止任务
// 这个if语句的语义是:清除当前线程的中断标记,不干扰执行任务期间的中断状态,除非如果线程池要求中断任务。
// 尽可能获取最新线程池状态,因此执行2次(runStateAtLeast(ctl.get(), STOP)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 运行至此,正常情况下线程中断位为false,线程池状态为`STOP,TIDYING,TERMINATED`时中断位为true
// 第二层try表示工作线程执行本次任务
try {
beforeExecute(wt, task); // 钩子方法,所有任务公用一个方法体
// 第三层try表示执行客户端提交给线程池的任务
// `run()`方法不会开启一个新线程
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 运行至此,代表任务没有抛出异常
completedAbruptly = false;
} finally {
// 运行至此,completedAbruptly =true表示任务`task.run()`抛出异常。
// 运行至此,工作线程没有获取到任务,如果不符合线程池要求,则销毁这个工作线程
processWorkerExit(w, completedAbruptly);
}
// 至此,工作线程执行完`run()`方法中所有内容,终止该线程
}
工作线程的终结方法processWorkerExit()。尝试销毁工作线程,并且尝试终止线程池。
decrementWorkerCount()方法就是将状态ctl-1,表示工作线程减1.如果任务没有抛出异常,工作线程会在getTask()方法里执行该方法。
mainLock是线程池的重入独占锁,保证多个线程调用线程池时线程安全。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 线程池完成的任务数+=当前工作线程完成的任务数
workers.remove(w); // 当前工作线程被移出集合,意味着工作线程被销毁
} finally {
mainLock.unlock();
}
tryTerminate(); // 当前线程已被销毁,是否意味着线程池进入非`RUNNING`状态。
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // true表示:线程池的状态是`RUNNING, SHUTDOWN`,继续执行线程池已有任务
if (!completedAbruptly) { // 如果任务没有抛出异常
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 线程池最小线程数
if (min == 0 && ! workQueue.isEmpty()) // 如果最小线程数为0,并且仍要执行任务
min = 1; // 最小线程为1
if (workerCountOf(c) >= min) // 当前工作线程满足执行任务的要求
return; // replacement not needed
}
// 如果任务抛出异常或者当前工作线程不满足执行任务的要求,新建一个非核心线程,不布置任务
addWorker(null, false);
}
}
tryTerminate()方法尝试让线程池进入TERMINATED状态。interruptIdleWorkers()方法将中断一个空闲的工作线程。
final void tryTerminate() {
for (;;) { // 自旋
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
// 运行至此,线程池状态要么是`RUNNING`,要么是`TIDYING, TERMINATED`,要么是`SHUTDOWN`并且任务队列为空
// `return;`表示第一种情况和第三种情况下线程池还不能进入终止状态,第二种情况表示不用重复执行下述操作
return;
// 运行至此,线程池状态不是`RUNNING, TIDYING, TERMINATED`。线程池的状态可能是
// 1. `SHUTDOWN`并且任务队列为空
// 2. `STOP`,无所谓队列空不空,因为工作线程不执行队列中任务
// 第一种情况下可能有非常多工作线程阻塞在`getTask()`方法的`workQueue.take()`/`workQueue.poll()`方法。设置工作线程的标记位后,工作线程会跳出阻塞抛出`InterruptedException`异常。为了防止大规模抛出异常,造成系统不稳定,因此一个一个中断线程。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // 中断一个工作线程
return;
}
// 至此,`SHUTDOWN`满足任务队列为空并且工作线程数为0,`STOP`满足工作线程数为0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 获取线程池的重入独占锁
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // true则表示当前线程池状态已经被修改为`TIDYING`,并且工作线程数是0 只有1个线程能返回true
try {
terminated(); // 钩子方法,线程池终止方法
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 至此,终止线程池
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
Worker类定义了工作线程的内容。但是创建工作线程是线程池调用addWorker()方法完成的。retry循环尝试获取新增线程授权,获得授权后新增工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) // 表示线程池非`RUNNING`
&& (runStateAtLeast(c, STOP) // 表示线程池为`STOP, TIDYING, TERMINATED`
|| firstTask != null // 任务不为空
|| workQueue.isEmpty())) // 任务队列为空
return false;
// 运行至此,没有`return false`,可能情况
// 1. 线程池状态为`RUNNING`
// 2. 线程池状态为`SHUTDOWN`,并且以下条件满足1个
// 1. 任务为空,表示不接收新任务
// 2. 任务队列非空,表示仍然执行任队列里的任务
for (;;) { // 自旋
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false; // 工作线程数已经达到阈值了,不创建新线程
if (compareAndIncrementWorkerCount(c))
break retry; // 成功使状态`ctl`+1,获得创建新线程许可,跳出循环
// 至此,没有获得许可,继续尝试
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry; // 如果线程池状态不是`RUNNING`,那么重新进入retry循环。如果是`RUNNING`,这个for循环自旋就行。
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false; // 工作线程是否执行`start()`方法,即新建线程
boolean workerAdded = false; // 工作线程是否被添加进入工作线程集合
Worker w = null;
try {
w = new Worker(firstTask); // 新建工作线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 获取线程池独占锁,为了原子性更新`workers`,`largestPoolSize`。
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 再次判断线程池状态,可能情况
// 1. `RUNNING`
// 2. `SHUTDOWN`并且任务为空
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException(); // 线程状态已被篡改,抛出异常
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果工作线程进入集合,表示线程池已经收录这个`Worker`对象,则可以启动线程
t.start(); // 创建新线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
如果没有新建工作线程,则执行回退。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 线程集合删除本工作线程
decrementWorkerCount(); // 减少工作线程数
tryTerminate(); // 检查线程池是否已经准备进入终止状态
} finally {
mainLock.unlock();
}
}
JDK17提供四种拒绝策略。可以自定义拒绝策略,定义RejectedExecutionHandler 接口的实现类,重写rejectedExecution()方法。ThreadPoolExecutor类的默认拒绝策略是AbortPolicy。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// AbortPolicy 直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// DiscardPolicy丢弃,不做任何处理
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// DiscardOldestPolicy尝试丢弃任务队列的头部线程
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
// CallerRunsPolicy尝试使用线程池对象的线程处理任务,也就是谁提交任务,谁处理任务。run()方法不会新建线程对象,而是直接本线程执行任务内容
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
shutdown()与shutdownNow()都使线程池不再接收新任务,区别是
shutdown()会执行完工作线程正在执行的任务,和任务队列的任务。因此是安全的。shutdownNow()会移出任务队列的任务。更危险的是,它会给工作正在执行的任务标记中断。public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 权限检查
advanceRunState(SHUTDOWN); // 将线程池状态改为`SHUTDWON`
interruptIdleWorkers(); // 对所有空闲线程标记中断
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
}
// 更改线程池状态的前3位,后29位不动,方法文档规定`targetState`的值只能是`SHUTDOWN`或者`STOP`
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 返回任务队列的任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 权限检查
advanceRunState(STOP); // 状态改为STOP
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 清空任务队列,并且将任务保存给tasks
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
void interruptIfStarted() {
Thread t;
// `getState() >= 0`=`true`表示工作线程正在执行任务,此时标记的中断位会影响任务的执行,因此说不安全
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容