publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); // 获取状态表示 if (workerCountOf(c) < corePoolSize) { // 1. 如果当前线程数小于核心线程数,直接新建线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 2. 如果核心线程数已满,且是运行状态并且队列未满,添加任务至队列 intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) // 再次检查运行状态,如果不是运行状态就从队列中删除任务,删除成功后执行拒绝策略,因为此时线程池状态不是RUNNING reject(command); elseif (workerCountOf(recheck) == 0) // 如果当前线程数为 0,而我们又刚刚添加了一个任务,就新建一个空任务的线程,它会去轮询任务队列执行刚刚新增的任务 addWorker(null, false); } elseif (!addWorker(command, false)) // 添加失败,执行拒绝策略 reject(command); }
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { booleantimedOut=false; // 最近一次从阻塞队列中获取任务是否超时?
for (;;) { intc= ctl.get(); intrs= runStateOf(c);
// Check if queue empty only if necessary. // 为true的情况: // 1. 线程池为非RUNNING状态 且线程池正在停止 // 2. 线程池状态为非RUNNING状态 且阻塞队列为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 将 workCount 减 1 returnnull; }
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ privatefinalclassWorker extendsAbstractQueuedSynchronizer implementsRunnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ privatestaticfinallongserialVersionUID=6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; // 持有的线程 /** Initial task to run. Possibly null. */ Runnable firstTask; // 第一次执行的任务 /** Per-thread task counter */ volatilelong completedTasks; // 完成的任务数量
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // state 默认值设为 -1,控制未执行的新建线程不该被中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker */ publicvoidrun() { runWorker(this); }
// Lock methods // // The value 0 represents the unlocked state. // 值为 0 表示未加锁状态(线程空闲) // The value 1 represents the locked state. // 值为 1 表示锁定状态(线程忙)
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ finalvoidrunWorker(Worker w) { Threadwt= Thread.currentThread(); Runnabletask= w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts 设置为允许中断 booleancompletedAbruptly=true; // 异常退出标志 try { while (task != null || (task = getTask()) != null) { // getTask轮询阻塞队列 w.lock(); // 加锁 /* * 3个判断: * 1、runStateAtLeast(ctl.get(), STOP)为真说明当前状态大于等于STOP 此时需要给他一个中断信号 * 2、wt.isInterrupted()查看当前是否设置中断状态如果为false则说明为设置中断状态 * 3、Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 获取当前中断状态且清除中断状态 * 这个判断为真的话说明当前被设置了中断状态(有可能是线程池执行的业务代码设置的,然后重置了)且当前状态变成了大于等于STOP的状态了 * * 判断为真的两种情况: * 1、如果当前线程大于等于STOP 且未设置中断状态 整个判断为true 第一个runStateAtLeast(ctl.get(), STOP)为true !wt.isInterrupted()为true * 2、第一次判断的时候不大于STOP 且当前设置了中断状态(Thread.interrupted()把中断状态又刷新了) 且设置完了之后线程池状态大于等于STOP了 * Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 为true !wt.isInterrupted()为true * */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 设置中断 try { beforeExecute(wt, task); Throwablethrown=null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; thrownewError(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); // 释放独占锁 } } completedAbruptly = false; // 若while循环中抛出异常这句就不会被执行,表示为异常退出循环 } finally { processWorkerExit(w, completedAbruptly); } }
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) decrementWorkerCount();