Executor 与 线程池
基本接口
Java.util.concurrent 包定义了三种 executor 接口:
- Executor,支持加载新任务的简单接口
- ExecutorService,Executor 的子接口,增加了帮助管理任务和executor本身的生命周期的功能
- ScheduledExecutorService,ExecutorService 的子接口,支持周期性地执行任务
Executor
Executor 接口只定义了一个方法 execute,被设计用来代替一般的创建线程惯例
// r implements Runnable
// 1. 自己控制线程
(new Thread(r)).start();
// 2. 用 executor
e.execute(r);
对于 execute 方法的实现并没有特殊要求。低级的实现只是创建一个新的线程并立即执行
但更有可能是使用一个已经存在的工作线程(worker Threads)去执行 r,或者将 r 放在一个执行队列中等待工作线程有空的时候再执行
ExecutorService
ExecutorService 接口提供了另一个相似的 submit 方法,但比 execute 更加通用
和 execute 一样,submit 接受 Runnable 对象,但也接受 Callable 对象,Callable 允许任务执行后返回一个值
Submit 方法返回 Future 对象,Future 对象被用来接收 Callable 返回的值,并管理 Callable 和 Runnable 对象所代表的任务
ScheduledExecutorService
ScheduledExecutorService 接口为它的父类 ExecutorService 的行为提供计划,允许在执行 Runnable 和 Callable 任务之前停顿一段时间
接口定义了 scheduleAtFixedRate 和 scheduleWithFixedDelay,这两个方法以特定的时间间隔重复地执行特定任务
ThreadPoolExecutor 的构造参数
构造函数
ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间:空闲线程即超过核心线程数的工作线程
TimeUnit unit, // keepAliveTime的单位
BlockingQueue<Runnable> workQueue, // 线程池内部的等待队列:工作线程数等于核心线程数时,新任务放到等待队列中
ThreadFactory threadFactory, // 线程工厂,创建新线程的方式
RejectedExecutionHandler handler // 饱和策略:工作线程数等于最大线程数,且等待队列已满,处理新任务
);
等待队列
系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度,因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue
JDK为BlockingQueue提供了几种实现方式,常用的有:
- ArrayBlockingQueue:数组结构的阻塞队列
- LinkedBlockingQueue:链表结构的阻塞队列
- PriorityBlockingQueue:有优先级的阻塞队列
- SynchronousQueue:不会存储元素的阻塞队列
newFixedThreadPool 和 newSingleThreadExecutor 在默认情况下使用一个无界的 LinkedBlockingQueue
newCachedThreadPool 使用的 SynchronousQueue 十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue 是一种移交机制,不能算是队列。newCachedThreadPool 生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用 SynchronousQueue 作为等待队列正合适
饱和策略
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
ThreadFactory
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的 defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可
ThreadPoolExecutor 类
Fields
AtomicInteger ctl runState 是线程池运行状态,workerCount 是工作线程的数量,ctl 用一个32位的int来同时保存 runState 和 workerCount,其中高3位是 runState,其余29位是 workerCount,代码中会反复使用 runStateOf() 和 workerCountOf() 来获取 runState 和 workerCount
// ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }BlockingQueue
workQueue 等待队列 HashSet
workers 工作线程
Worker
线程池是由Worker类负责执行任务,Worker继承了AbstractQueuedSynchronizer,引出了Java并发框架的核心AQS。
AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。
Worker利用AQS的功能实现对独占线程变量的设置,这是一个需要同步的过程
execute()
public void execute(Runnable command) {
// 参数检验
if (command == null)
throw new NullPointerException();
// 获取当前线程池状态
int c = ctl.get();
// 如果当前工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 新建工作线程(worker),若成功则返回
if (addWorker(command, true))
return;
// 若新建失败,更新线程池状态
c = ctl.get();
}
// 在等待队列中新建线程成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 若线程池不处于RUNNING状态则删掉刚刚新建的线程并handle掉
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果此时没有工作线程(worker),就添加个空的工作线程(worker)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 若之前在等待队列中新建线程失败,就新建为工作线程(worker)
else if (!addWorker(command, false))
// 如果新建工作线程(worker)失败就handle掉
reject(command);
}
addWorker()
// 用 lock 和 CAS 更新数据
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 从workQueue里拿task
while (task != null || (task = getTask()) != null) {
w.lock();
// 保证了线程池在STOP状态下线程是中断的,非STOP状态下线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
工厂类 Executors
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的优点是”优雅的缓冲”
考虑一个web应用服务,它的每一个线程只处理一个HTTP请求。如果这个应用简单地为每一个新来的请求创建一个新的处理线程,那么,当请求数量足够多时,线程占用的资源总和将超过系统的承受能力,服务器会因此忽然停止对所有请求的应答(常见的内存溢出)
而在使用固定大小线程池后,即使请求数量超出工作线程能够处理的请求上限,但是新来的HTTP请求会被暂时存放在消息队列中,当出现空闲的工作线程后,这些HTTP请求就会得到及时的处理
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor是线程数量固定为1的newFixedThreadPool版本,保证池内的任务串行
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool 生成一个会缓存的线程池,线程数量可以从0到 Integer.MAX_VALUE,超时时间为1分钟。线程池用起来的效果是:如果有空闲线程,会复用线程;如果没有空闲线程,会新建线程;如果线程空闲超过1分钟,将会被回收
关闭
使用完 ExecutorService 之后你应该将其关闭,以使其中的线程不再运行。
比如,如果你的应用是通过一个 main() 方法启动的,之后 main 方法退出了你的应用,如果你的应用有一个活动的 ExexutorService 它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。
要终止 ExecutorService 里的线程你需要调用 ExecutorService 的 shutdown() 方法。ExecutorService 并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown() 被调用之前所有提交给 ExecutorService 的任务都被执行。如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束
参考资料
分析Java线程池的创建 - 简书
分析Java线程池执行原理 - 简书