华拓科技网
您的当前位置:首页JDK17 线程池 ThreadPoolExecutor

JDK17 线程池 ThreadPoolExecutor

来源:华拓科技网

线程池

线程池将创建线程和使用线程解耦。优点是

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 以上是参数校验
    this.corePoolSize = corePoolSize;  // 核心线程数量
    this.maximumPoolSize = maximumPoolSize; // 最大线程数量
    this.workQueue = workQueue; // 任务队列
    this.keepAliveTime = unit.toNanos(keepAliveTime); // 非核心线程的存活时间
    this.threadFactory = threadFactory; // 线程工厂类
    this.handler = handler; // 拒绝策略执行器
}

corePoolSize是核心线程数,默认核心线程不会被销毁。
maximumPoolSize 大等于corePoolSize,如果核心线程不足以任务并且任务队列已满,那么会新建非核心线程处理任务。如果线程数达到最大线程数,那么不执行任务,而是执行拒绝策略。非核心线程会被销毁,它们如果在keepAliveTime 存活时间内没有收到任务则会被销毁。
workQueue 任务队列存放工作线程还没来得及处理的任务。
threadFactory 是新建线程的工厂类,定义线程的名称,优先级以及是否是守护线程。默认用Executors.DefaultThreadFactory
拒绝策略执行器是线程池无法接收新线程(可能原因1. 线程池已被关闭 2. 工作线程数已经是最大线程且任务队列满了)时执行的方法。默认用ThreadPoolExecutor#AbortPolicy.

上面2个方法都是构造器方法,第一个构造器方法,调用者需要指定四个参数,使用默认的线程工厂类和拒绝策略。第二个构造器方法,调用者可以指定所有参数。

状态

线程池从建立到终止的状态由ctl表示。它的类型是AtomicInteger AtomicInteger 类实现原子性操作int值。
ctl的前3位是线程池状态,有5种,RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATEDctl的后29位是工作线程数量。
RUNNING=1,表示线程池正常运行。调用shutdown()方法后从RUNNING变为SHUTDOWN =0表示不接收新任务,但会执行完已有任务。调用shutdownNow()方法后从RUNNING变为STOP=1表示不接收新任务,且不执行已有任务。等待工作线程全结束了,且任务队列空了,从SHUTDOWN/STOP变为TIDYING=2表示线程池已清空。执行完钩子方法terminated()后从TIDYING变为TERMINATED =3表示线程池已终止。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 000(29个1)
private static final int RUNNING    = -1 << COUNT_BITS; // 111(29个0)
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0
private static final int STOP       =  1 << COUNT_BITS; // 001(29个0)
private static final int TIDYING    =  2 << COUNT_BITS; // 010(29个0)
private static final int TERMINATED =  3 << COUNT_BITS; // 010(29个0)
// 从`ctl`获取线程池状态
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 从`ctl`获取工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
// 将线程池状态和工作线程数组合成`ctl`
private static int ctlOf(int rs, int wc) { return rs | wc; }

向线程池添加任务 execute

public void execute(Runnable command) { // command是线程池接收的任务
    if (command == null) // 参数校验
        throw new NullPointerException();
    int c = ctl.get(); // 线程池状态
    if (workerCountOf(c) < corePoolSize) { // 如果工作线程数少于核心线程数
        if (addWorker(command, true)) // 增加核心工作线程
            return;
        c = ctl.get();
    }
    // 运行至此,当前线程数不少于核心线程数
    // isRunning()返回线程池是否是RUNNING状态,是则true
    // workQueue.offer(command)) 将任务阻塞地加入队列
    if (isRunning(c) && workQueue.offer(command)) {
    	// 运行至此,任务已进入阻塞队列
        int recheck = ctl.get(); 
        // 如果线程池状态不是RUNNING,取消任务,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果工作线程数是0,启动一个非核心工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 运行至此,没有成功加入阻塞队列
    // 添加非核心线程执行任务
    else if (!addWorker(command, false))
        reject(command);
}

addWorker(command, true)表示线程池增加核心工作线程,addWorker(command, false)表示增加非核心线程。execute()方法的流程图如下

Worker

想理解addWorker()方法,得先理解Worker类。线程池的线程可以分为2类。一类是调用线程池,执行executor.execute(), executor.shutdown()等操作,称为线程池线程。一类是执行任务,从任务队列拉取任务,超过核心线程数且超时则自我销毁等操作,称为工作线程。

Worker封装Thread类型变量和Runnable类型变量。this.thread = getThreadFactory().newThread(this);表示thread是工作线程,执行Worker类的run()方法,后面addWork()方法会执行worker.start()真正开启工作线程。firstTask表示线程池任务。

Worker类继承AbstractQueuedSynchronizer是为了实现不可重入独占锁, stateWorker类的状态,表示当前工作线程是否正在执行线程池任务。工作线程除了执行线程池任务以外,还会执行从任务队列获取任务等操作。为了防止工作线程执行任务期间线程池线程操纵工作线程(比如设置中断位)干扰任务的正常执行,因此加锁。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    final Thread thread; // 工作线程
    Runnable firstTask; // 线程池任务

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
	public void run() {
     runWorker(this);
    }
}  

runWorker(Worker w)定义工作线程声明周期内的操作,这个方法体里只有task.run();表示工作线程执行客户端发送给线程池的任务,其余的源码表示工作线程的其他操作。三层try-catch,第一层表示工作线程从任务队列拉取任务。第二层表示工作线程执行环境任务beforeExecute(), afterExecute()。第三层表示工作线程执行任务。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 当前线程是工作线程,不是调用线程池的客户端线程
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    // 第一层try结束代表当前工作线程要被销毁
    try {
    	// task != null 表示新建工作线程时就分配任务
    	// getTask() 表示阻塞地从任务队列拉取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 至此,工作线程获得不可重入独占锁
            // (runStateAtLeast(ctl.get(), STOP)表示线程池的状态是`STOP,TIDYING,TERMINATED`,线程池要求中断任务,中断任务是指设置中断位为true,而不是强制终止任务
            // 这个if语句的语义是:清除当前线程的中断标记,不干扰执行任务期间的中断状态,除非如果线程池要求中断任务。
            // 尽可能获取最新线程池状态,因此执行2次(runStateAtLeast(ctl.get(), STOP)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 运行至此,正常情况下线程中断位为false,线程池状态为`STOP,TIDYING,TERMINATED`时中断位为true
            // 第二层try表示工作线程执行本次任务
            try {
                beforeExecute(wt, task); // 钩子方法,所有任务公用一个方法体
                // 第三层try表示执行客户端提交给线程池的任务
                // `run()`方法不会开启一个新线程
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // 运行至此,代表任务没有抛出异常
        completedAbruptly = false;
    } finally {
        // 运行至此,completedAbruptly =true表示任务`task.run()`抛出异常。
        // 运行至此,工作线程没有获取到任务,如果不符合线程池要求,则销毁这个工作线程
        processWorkerExit(w, completedAbruptly);
    }
    // 至此,工作线程执行完`run()`方法中所有内容,终止该线程
}

工作线程的终结方法processWorkerExit()。尝试销毁工作线程,并且尝试终止线程池。
decrementWorkerCount()方法就是将状态ctl-1,表示工作线程减1.如果任务没有抛出异常,工作线程会在getTask()方法里执行该方法。
mainLock是线程池的重入独占锁,保证多个线程调用线程池时线程安全。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks; // 线程池完成的任务数+=当前工作线程完成的任务数
            workers.remove(w); // 当前工作线程被移出集合,意味着工作线程被销毁
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); // 当前线程已被销毁,是否意味着线程池进入非`RUNNING`状态。

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // true表示:线程池的状态是`RUNNING, SHUTDOWN`,继续执行线程池已有任务
            if (!completedAbruptly) { // 如果任务没有抛出异常
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 线程池最小线程数
                if (min == 0 && ! workQueue.isEmpty()) // 如果最小线程数为0,并且仍要执行任务
                    min = 1; // 最小线程为1
                if (workerCountOf(c) >= min) // 当前工作线程满足执行任务的要求
                    return; // replacement not needed
            }
            // 如果任务抛出异常或者当前工作线程不满足执行任务的要求,新建一个非核心线程,不布置任务
            addWorker(null, false);
        }
    }

tryTerminate()方法尝试让线程池进入TERMINATED状态。interruptIdleWorkers()方法将中断一个空闲的工作线程。

final void tryTerminate() {
   for (;;) { // 自旋
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            // 运行至此,线程池状态要么是`RUNNING`,要么是`TIDYING, TERMINATED`,要么是`SHUTDOWN`并且任务队列为空
            // `return;`表示第一种情况和第三种情况下线程池还不能进入终止状态,第二种情况表示不用重复执行下述操作
            return;
        // 运行至此,线程池状态不是`RUNNING, TIDYING, TERMINATED`。线程池的状态可能是
        //    1. `SHUTDOWN`并且任务队列为空
        //    2. `STOP`,无所谓队列空不空,因为工作线程不执行队列中任务
        // 第一种情况下可能有非常多工作线程阻塞在`getTask()`方法的`workQueue.take()`/`workQueue.poll()`方法。设置工作线程的标记位后,工作线程会跳出阻塞抛出`InterruptedException`异常。为了防止大规模抛出异常,造成系统不稳定,因此一个一个中断线程。
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE); // 中断一个工作线程
            return;
        }
	
		// 至此,`SHUTDOWN`满足任务队列为空并且工作线程数为0,`STOP`满足工作线程数为0
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); // 获取线程池的重入独占锁
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // true则表示当前线程池状态已经被修改为`TIDYING`,并且工作线程数是0 只有1个线程能返回true
                try {
                    terminated(); // 钩子方法,线程池终止方法
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 至此,终止线程池
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

线程池新建工作线程 addWorker

Worker类定义了工作线程的内容。但是创建工作线程是线程池调用addWorker()方法完成的。retry循环尝试获取新增线程授权,获得授权后新增工作线程。

private boolean addWorker(Runnable firstTask, boolean core) {
   retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN) // 表示线程池非`RUNNING`
            && (runStateAtLeast(c, STOP) // 表示线程池为`STOP, TIDYING, TERMINATED`
                || firstTask != null // 任务不为空
                || workQueue.isEmpty())) // 任务队列为空
            return false;
		// 运行至此,没有`return false`,可能情况
		//     1. 线程池状态为`RUNNING`
		//     2. 线程池状态为`SHUTDOWN`,并且以下条件满足1个
		//          1. 任务为空,表示不接收新任务
		//          2. 任务队列非空,表示仍然执行任队列里的任务
        for (;;) { // 自旋
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false; // 工作线程数已经达到阈值了,不创建新线程
            if (compareAndIncrementWorkerCount(c)) 
                break retry; // 成功使状态`ctl`+1,获得创建新线程许可,跳出循环
            // 至此,没有获得许可,继续尝试
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry; // 如果线程池状态不是`RUNNING`,那么重新进入retry循环。如果是`RUNNING`,这个for循环自旋就行。
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false; // 工作线程是否执行`start()`方法,即新建线程
    boolean workerAdded = false; // 工作线程是否被添加进入工作线程集合
    Worker w = null;
    try {
        w = new Worker(firstTask); // 新建工作线程对象
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 获取线程池独占锁,为了原子性更新`workers`,`largestPoolSize`。
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 再次判断线程池状态,可能情况
                    //    1. `RUNNING`
                    //    2. `SHUTDOWN`并且任务为空
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException(); // 线程状态已被篡改,抛出异常
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) { // 如果工作线程进入集合,表示线程池已经收录这个`Worker`对象,则可以启动线程
                t.start(); // 创建新线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

如果没有新建工作线程,则执行回退。

private void addWorkerFailed(Worker w) {
   final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w); // 线程集合删除本工作线程
        decrementWorkerCount(); // 减少工作线程数
        tryTerminate(); // 检查线程池是否已经准备进入终止状态
    } finally {
        mainLock.unlock();
    }
}

拒绝策略

JDK17提供四种拒绝策略。可以自定义拒绝策略,定义RejectedExecutionHandler 接口的实现类,重写rejectedExecution()方法。ThreadPoolExecutor类的默认拒绝策略是AbortPolicy

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// AbortPolicy 直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
// DiscardPolicy丢弃,不做任何处理
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
// DiscardOldestPolicy尝试丢弃任务队列的头部线程
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
// CallerRunsPolicy尝试使用线程池对象的线程处理任务,也就是谁提交任务,谁处理任务。run()方法不会新建线程对象,而是直接本线程执行任务内容
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

终止线程池

shutdown()shutdownNow()都使线程池不再接收新任务,区别是

  1. shutdown()会执行完工作线程正在执行的任务,和任务队列的任务。因此是安全的。
  2. shutdownNow()会移出任务队列的任务。更危险的是,它会给工作正在执行的任务标记中断。

shutdown()

public void shutdown() {
   final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); // 权限检查
        advanceRunState(SHUTDOWN); // 将线程池状态改为`SHUTDWON`
        interruptIdleWorkers(); // 对所有空闲线程标记中断
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate(); // 尝试终止线程池
}
// 更改线程池状态的前3位,后29位不动,方法文档规定`targetState`的值只能是`SHUTDOWN`或者`STOP`
private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

shutdownNow()

// 返回任务队列的任务
public List<Runnable> shutdownNow() {
   List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); // 权限检查
        advanceRunState(STOP); // 状态改为STOP
        interruptWorkers(); // 中断所有线程
        tasks = drainQueue(); // 清空任务队列,并且将任务保存给tasks
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
   // assert mainLock.isHeldByCurrentThread();
    for (Worker w : workers)
        w.interruptIfStarted();
}
void interruptIfStarted() {
    Thread t;
    // `getState() >= 0`=`true`表示工作线程正在执行任务,此时标记的中断位会影响任务的执行,因此说不安全
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

因篇幅问题不能全部显示,请点此查看更多更全内容