并发编程的艺术-线程池的原理与分析


并发编程的艺术-线程池的原理与分析

简介

在阿里编码规范中,使用线程需要通过线程池的方式,线程的频繁创建与销毁,也会大量占用CPU的资源,这时候可能不仅没有达到提升程序性能的目的,反而影响执行效率,所以,这时候就需要这篇博文的主角-线程池

使用

在java中,我们可以使用两种方式使用线程池

  • 自行使用new ThreadPoolExecutor()创建线程池对象, ThreadPoolExecutor是线程池的具体实现类
  • 使用以及给予的线程池工厂Executors来进行创建线程池。

通过 Executors工厂方法,我们通常可以创建以下四种线程池,

  • 创建一个固有线程数量的线程池
Executors.newFixedThreadPool(1);
  • 创建只有一个线程的线程池
Executors.newSingleThreadExecutor();
  • 创建一个可缓存的线程池,该线程池无数量上限,线程创建后会缓存60秒,超过将释放
Executors.newCachedThreadPool()
  • 创建一个有固定线程数量的线程池,可以延期执行,以及按照周期反复执行。
Executors.newScheduledThreadPool(1)

虽然Executors提供了便捷的方式去创建线程池,但是,这种创建线程池的方式,是完全不推荐的,根据阿里编码规范中提及:

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

其实上面四种最后都是通过ThreadPoolExecutor的方式去实现,只不过为我们简化了创建过程,根据new的不同对象,去传递对应的构造函数,但是这样参数就会变得不可控。

ThreadPoolExecutor的简介与设想

先来看看ThreadPoolExecutor的完整构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize: 核心线程数。
  • maximumPoolSize: 最大线程数。
  • keepAliveTime: 线程存活时间。
  • unit: 线程存活时间单位。
  • workQueue: 阻塞队列,用来存放待处理的任务。
  • threadFactory: 线程工厂,用来创建线程池中的工作线程。
  • handler: 拒绝策略,当线程池处于满负荷状态时,无法处理后续进来的任务,便会采用拒绝策略。

在我们去了解这些参数具体实现之前,先设想下下线程池的原理,这问题首要点是Thread中的run()方法运行结束后就会销毁,线程池是如何让它停止等待并不去主动销毁的呢? 其实,进入到线程池的线程,在空闲状态时是阻塞状态,这样它既不释放销毁,也不用去消耗CPU资源。既然阻塞是不是需要一个阻塞队列呢? 首先我们创建个简单的线程池Demo:

 public static void main(String[] args) {
     ExecutorService executor = Executors.newSingleThreadExecutor();
     executor.execute(new TestTask());
     executor.shutdown();
 }
static class TestTask implements Runnable{
    @Override
    public void run() {
        System.out.println("线程执行了");
    }
}

我们看到,提交给线程池的操作是通过execute()方法执行的,我们如果把整个线程池的操作理解为生产者与消费者,那么execute()就是生产者。

线程池猜想

跟据生产者消费者模型,生产者提供任务到阻塞队列,消费者从阻塞队列中获取任务,线程池的原理是不是这样的呢? 其实是正确的,BlockQuened是通过take()方法从阻塞对列中获取任务,如果队列中没有元素,则工作中的线程会阻塞在take()方法中,而生产者插入队列的方法为offer()方法,当然最后的拒绝策略也是防止队列积压过多的任务来准备的。

线程池的生命周期

线程池的整个生命周期有5种状态,分别是

  • RUNNING: 运行
  • SHUTDOWN: 关闭
  • STOP: 停止
  • TIDYING: 过渡
  • TERMINATED: 终止

线程池的生命周期

运行状态即为线程池正常运转状态,此状态可以接受新任务,也可以处理阻塞队列中的任务。当运行状态的线程池调用shutdown()时,线程池会处于关闭状态,此时的线程池不再接受新的任务指令,但是可以继续处理阻塞队列的任务。如果线程池调用shutdownNow()方法,线程池就会处于停止状态,此时的线程池不接受新任务指令,也不去处理阻塞队列中的任务,同时还回去尝试终止正在运行的任务。当任务终止或者执行完毕后,这时候线程池会进入过度状态,并尝试调用terminated()方法,此方法执行完以后,线程池正式进入终止状态。

线程池源码分析

execute

我们从线程池的入口execute()方法了解线程池的:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

首先ctl是一个存储了线程状态与线程数量的原子变量,使用了int的位运算来存储,int类型占用4个字节也就是32位,它通过高3位来存储线程状态,低29位存储线程数量。

知道了ctl,他会判断当前工作线程数量是否小于核心线程数量,如果返回true,则调用addWorker()方法创建线程并启动,如果false,则说明核心线程已经初始化完毕,无需再创建。此时就会尝试将任务交给阻塞队列,如果连阻塞队列都无法在添加,说明阻塞队列也满了,这是就回去尝试通过addWorker()方法创建非核心线程,如果这还是失败,则直接调用reject()方法执行拒绝策略。

addWorker

addWorker()方法在上文中反复被调用,说明它是线程池的核心方法,它一共接收两个参数,第一个参数为需要执行的任务,第二个就是是否为核心线程,源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
// A-------------------------------------
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
// B-------------------------------------
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
    // C-------------------------------------
}

阅读这篇源码前我们先看一下线程池的各个状态期间的表示

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

知道了这个,我们再去看源码A部分上面, if条件可以划分为两部,第一部分rs >= SHUTDOWN ,根据对照很好理解,如果线程池状态不是在RUNNING状态,说明线程池起码被调用了shutdown(),此时现在的线程池已经不再去接收新的任务,第二部分的

! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())

如果线程池处于关闭状态,并且没有任务传递过来,但是队列不等于空,这三个条件同时满足是允许创建线程,也是就是创建一个空线程,否则是不被允许的

再来看A-B之间的代码,首先CAPACITY是线程池的最大容量,wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize) 所以,这段代码可表示为,如果当前工作的线程,已经超过线程池的最大承载量,或者超过了指定的阈值,则会返回false,否则将通过CAS的方式更新工作线程的数量。

最后是B-C的部分,首先通过new Worker方法对待执行的任务进行初始化,然后使用lock()方法进行全局加锁,然后根据判断当前线程池是否是运行状态,或者是否是创建空任务来决定是否调用start()方法来启动工作线程,这里的start()启动的不是我们任务中的run()方法,而是new 的Worker对象的,因为new Worker()也实现了Runnable接口,部分源码如下:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    //前面省略
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
	//后续省略
}

他其实最后调用的是runWorker()方法。

runWorker

不难猜到,runWorker方法执行的最终结果肯定是运行我们的任务,那么具体是怎么操作的呢?部分源码如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                  //省略
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

源码中可以看到,只要task任务不为空,循环将一直执行下去,其中getTask()方法就是从阻塞队列中获取任务的逻辑,如果有任务,后续就是判断线程状态是否需要调用wt.interrupt()来中断县城了,最后调用task.run()来执行我们真正需要执行的任务。

getTask

getTask是从阻塞队列获取任务的方法,源码如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先我们看第一个判断条件rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()) 总体来说就是线程池状态处于stop状态,或者队列为空并且线程状态处于非运行状态的情况下,则回收线程并返回空,不再提供任务。

第二个判断条件为(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())大致意思是,如果工作线程大于最大线程池数量,则回收当前线程, 第二个是如果是否配置超时配置并且当前线程已经超时, 前两个有一个成立,后面如果工作线程大于1个或者队列为空两者满足其中一个,则回收当前线程。

最后就是空闲等待的判断,以及队列人物的提取逻辑,这里会通过捕获InterruptedException异常的方法,唤醒阻塞在take()方法上的线程,然后再次循环来判断是否当前线程要被回收。

线程池的设置

我们根据业务功能可划分为两种:

  • I/O密集型:就是线程频繁需要和磁盘或者远程网络通信,这种场景中磁盘的耗时和网络通信的耗时较大,意味着线程处于阻塞期间,不会占用CPU资源,所以线程数量设置超过CPU核心数并不会造成问题。

  • CPU密集型:就是对CPU的利用率较高的场景,比如循环、递归、逻辑运算等,这种情况下线程数量设置越少,就越能减少CPU的上下文频繁切换。

根据这两种情况我们根据下面两个计算公式来设计线程池的数量

  • CPU密集型,线程池大小设置为 cpu核心数+1。
  • IO密集型,线程池大小设置为 2倍的CPU核心数+1。

理论仅供参考,根据实际业务来调整

结语

线程池大致分析就到这里,其实好像没有想象中的复杂,点个赞呗


文章作者: TimeRoar
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 TimeRoar !
评论
  目录