线程池

Executor 与 线程池

基本接口

Java.util.concurrent 包定义了三种 executor 接口:

  • Executor,支持加载新任务的简单接口
  • ExecutorService,Executor 的子接口,增加了帮助管理任务和executor本身的生命周期的功能
  • ScheduledExecutorService,ExecutorService 的子接口,支持周期性地执行任务

Executor

Executor 接口只定义了一个方法 execute,被设计用来代替一般的创建线程惯例

// r implements Runnable
// 1. 自己控制线程
(new Thread(r)).start();
// 2. 用 executor
e.execute(r);

对于 execute 方法的实现并没有特殊要求。低级的实现只是创建一个新的线程并立即执行

但更有可能是使用一个已经存在的工作线程(worker Threads)去执行 r,或者将 r 放在一个执行队列中等待工作线程有空的时候再执行

ExecutorService

ExecutorService 接口提供了另一个相似的 submit 方法,但比 execute 更加通用

和 execute 一样,submit 接受 Runnable 对象,但也接受 Callable 对象,Callable 允许任务执行后返回一个值

Submit 方法返回 Future 对象,Future 对象被用来接收 Callable 返回的值,并管理 Callable 和 Runnable 对象所代表的任务

ScheduledExecutorService

ScheduledExecutorService 接口为它的父类 ExecutorService 的行为提供计划,允许在执行 Runnable 和 Callable 任务之前停顿一段时间

接口定义了 scheduleAtFixedRate 和 scheduleWithFixedDelay,这两个方法以特定的时间间隔重复地执行特定任务

ThreadPoolExecutor 的构造参数

构造函数

ThreadPoolExecutor(
    int corePoolSize,                  // 核心线程数
    int maximumPoolSize,               // 最大线程数
    long keepAliveTime,                // 空闲线程存活时间:空闲线程即超过核心线程数的工作线程
    TimeUnit unit,                     // keepAliveTime的单位
    BlockingQueue<Runnable> workQueue, // 线程池内部的等待队列:工作线程数等于核心线程数时,新任务放到等待队列中
    ThreadFactory threadFactory,       // 线程工厂,创建新线程的方式
    RejectedExecutionHandler handler   // 饱和策略:工作线程数等于最大线程数,且等待队列已满,处理新任务
);

等待队列

系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度,因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue

JDK为BlockingQueue提供了几种实现方式,常用的有:

  • ArrayBlockingQueue:数组结构的阻塞队列
  • LinkedBlockingQueue:链表结构的阻塞队列
  • PriorityBlockingQueue:有优先级的阻塞队列
  • SynchronousQueue:不会存储元素的阻塞队列

newFixedThreadPool 和 newSingleThreadExecutor 在默认情况下使用一个无界的 LinkedBlockingQueue

newCachedThreadPool 使用的 SynchronousQueue 十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue 是一种移交机制,不能算是队列。newCachedThreadPool 生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用 SynchronousQueue 作为等待队列正合适

饱和策略

当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的 defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可

ThreadPoolExecutor 类

Fields

  • AtomicInteger ctl runState 是线程池运行状态,workerCount 是工作线程的数量,ctl 用一个32位的int来同时保存 runState 和 workerCount,其中高3位是 runState,其余29位是 workerCount,代码中会反复使用 runStateOf() 和 workerCountOf() 来获取 runState 和 workerCount

    // ctl
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // Packing and unpacking ctl
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  • BlockingQueue workQueue 等待队列

  • HashSet workers 工作线程

Worker

线程池是由Worker类负责执行任务,Worker继承了AbstractQueuedSynchronizer,引出了Java并发框架的核心AQS。

AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。

Worker利用AQS的功能实现对独占线程变量的设置,这是一个需要同步的过程

execute()

public void execute(Runnable command) {
    // 参数检验
    if (command == null)
        throw new NullPointerException();
    // 获取当前线程池状态
    int c = ctl.get();
    // 如果当前工作线程数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 新建工作线程(worker),若成功则返回
        if (addWorker(command, true))
            return;
        // 若新建失败,更新线程池状态
        c = ctl.get();
    }
    // 在等待队列中新建线程成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 若线程池不处于RUNNING状态则删掉刚刚新建的线程并handle掉
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果此时没有工作线程(worker),就添加个空的工作线程(worker)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 若之前在等待队列中新建线程失败,就新建为工作线程(worker)
    else if (!addWorker(command, false))
        // 如果新建工作线程(worker)失败就handle掉
        reject(command);
}

addWorker()

// 用 lock 和 CAS 更新数据
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 从workQueue里拿task
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 保证了线程池在STOP状态下线程是中断的,非STOP状态下线程没有被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

工厂类 Executors

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool 的优点是”优雅的缓冲”

考虑一个web应用服务,它的每一个线程只处理一个HTTP请求。如果这个应用简单地为每一个新来的请求创建一个新的处理线程,那么,当请求数量足够多时,线程占用的资源总和将超过系统的承受能力,服务器会因此忽然停止对所有请求的应答(常见的内存溢出)

而在使用固定大小线程池后,即使请求数量超出工作线程能够处理的请求上限,但是新来的HTTP请求会被暂时存放在消息队列中,当出现空闲的工作线程后,这些HTTP请求就会得到及时的处理

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor是线程数量固定为1的newFixedThreadPool版本,保证池内的任务串行

CachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool 生成一个会缓存的线程池,线程数量可以从0到 Integer.MAX_VALUE,超时时间为1分钟。线程池用起来的效果是:如果有空闲线程,会复用线程;如果没有空闲线程,会新建线程;如果线程空闲超过1分钟,将会被回收

关闭

使用完 ExecutorService 之后你应该将其关闭,以使其中的线程不再运行。

比如,如果你的应用是通过一个 main() 方法启动的,之后 main 方法退出了你的应用,如果你的应用有一个活动的 ExexutorService 它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。

要终止 ExecutorService 里的线程你需要调用 ExecutorService 的 shutdown() 方法。ExecutorService 并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown() 被调用之前所有提交给 ExecutorService 的任务都被执行。如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束


参考资料

分析Java线程池的创建 - 简书

分析Java线程池执行原理 - 简书