并发编程的艺术-线程池的原理与分析
简介
在阿里编码规范中,使用线程需要通过线程池的方式,线程的频繁创建与销毁,也会大量占用CPU的资源,这时候可能不仅没有达到提升程序性能的目的,反而影响执行效率,所以,这时候就需要这篇博文的主角-线程池
使用
在java中,我们可以使用两种方式使用线程池
- 自行使用
new ThreadPoolExecutor()
创建线程池对象, ThreadPoolExecutor是线程池的具体实现类 - 使用以及给予的线程池工厂Executors来进行创建线程池。
通过 Executors工厂方法,我们通常可以创建以下四种线程池,
- 创建一个固有线程数量的线程池
Executors.newFixedThreadPool(1);
- 创建只有一个线程的线程池
Executors.newSingleThreadExecutor();
- 创建一个可缓存的线程池,该线程池无数量上限,线程创建后会缓存60秒,超过将释放
Executors.newCachedThreadPool()
- 创建一个有固定线程数量的线程池,可以延期执行,以及按照周期反复执行。
Executors.newScheduledThreadPool(1)
虽然Executors提供了便捷的方式去创建线程池,但是,这种创建线程池的方式,是完全不推荐的,根据阿里编码规范中提及:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
其实上面四种最后都是通过ThreadPoolExecutor的方式去实现,只不过为我们简化了创建过程,根据new的不同对象,去传递对应的构造函数,但是这样参数就会变得不可控。
ThreadPoolExecutor的简介与设想
先来看看ThreadPoolExecutor的完整构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize: 核心线程数。
- maximumPoolSize: 最大线程数。
- keepAliveTime: 线程存活时间。
- unit: 线程存活时间单位。
- workQueue: 阻塞队列,用来存放待处理的任务。
- threadFactory: 线程工厂,用来创建线程池中的工作线程。
- handler: 拒绝策略,当线程池处于满负荷状态时,无法处理后续进来的任务,便会采用拒绝策略。
在我们去了解这些参数具体实现之前,先设想下下线程池的原理,这问题首要点是Thread中的run()
方法运行结束后就会销毁,线程池是如何让它停止等待并不去主动销毁的呢? 其实,进入到线程池的线程,在空闲状态时是阻塞状态,这样它既不释放销毁,也不用去消耗CPU资源。既然阻塞是不是需要一个阻塞队列呢? 首先我们创建个简单的线程池Demo:
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new TestTask());
executor.shutdown();
}
static class TestTask implements Runnable{
@Override
public void run() {
System.out.println("线程执行了");
}
}
我们看到,提交给线程池的操作是通过execute()
方法执行的,我们如果把整个线程池的操作理解为生产者与消费者,那么execute()
就是生产者。
跟据生产者消费者模型,生产者提供任务到阻塞队列,消费者从阻塞队列中获取任务,线程池的原理是不是这样的呢? 其实是正确的,BlockQuened是通过take()
方法从阻塞对列中获取任务,如果队列中没有元素,则工作中的线程会阻塞在take()
方法中,而生产者插入队列的方法为offer()
方法,当然最后的拒绝策略也是防止队列积压过多的任务来准备的。
线程池的生命周期
线程池的整个生命周期有5种状态,分别是
- RUNNING: 运行
- SHUTDOWN: 关闭
- STOP: 停止
- TIDYING: 过渡
- TERMINATED: 终止
运行状态即为线程池正常运转状态,此状态可以接受新任务,也可以处理阻塞队列中的任务。当运行状态的线程池调用shutdown()
时,线程池会处于关闭状态,此时的线程池不再接受新的任务指令,但是可以继续处理阻塞队列的任务。如果线程池调用shutdownNow()
方法,线程池就会处于停止状态,此时的线程池不接受新任务指令,也不去处理阻塞队列中的任务,同时还回去尝试终止正在运行的任务。当任务终止或者执行完毕后,这时候线程池会进入过度状态,并尝试调用terminated()
方法,此方法执行完以后,线程池正式进入终止状态。
线程池源码分析
execute
我们从线程池的入口execute()
方法了解线程池的:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
首先ctl是一个存储了线程状态与线程数量的原子变量,使用了int的位运算来存储,int类型占用4个字节也就是32位,它通过高3位来存储线程状态,低29位存储线程数量。
知道了ctl,他会判断当前工作线程数量是否小于核心线程数量,如果返回true,则调用addWorker()
方法创建线程并启动,如果false,则说明核心线程已经初始化完毕,无需再创建。此时就会尝试将任务交给阻塞队列,如果连阻塞队列都无法在添加,说明阻塞队列也满了,这是就回去尝试通过addWorker()
方法创建非核心线程,如果这还是失败,则直接调用reject()
方法执行拒绝策略。
addWorker
addWorker()
方法在上文中反复被调用,说明它是线程池的核心方法,它一共接收两个参数,第一个参数为需要执行的任务,第二个就是是否为核心线程,源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// A-------------------------------------
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// B-------------------------------------
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
// C-------------------------------------
}
阅读这篇源码前我们先看一下线程池的各个状态期间的表示
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
知道了这个,我们再去看源码A部分上面, if条件可以划分为两部,第一部分rs >= SHUTDOWN
,根据对照很好理解,如果线程池状态不是在RUNNING状态,说明线程池起码被调用了shutdown()
,此时现在的线程池已经不再去接收新的任务,第二部分的
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
如果线程池处于关闭状态,并且没有任务传递过来,但是队列不等于空,这三个条件同时满足是允许创建线程,也是就是创建一个空线程,否则是不被允许的
再来看A-B之间的代码,首先CAPACITY
是线程池的最大容量,wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)
所以,这段代码可表示为,如果当前工作的线程,已经超过线程池的最大承载量,或者超过了指定的阈值,则会返回false,否则将通过CAS的方式更新工作线程的数量。
最后是B-C的部分,首先通过new Worker
方法对待执行的任务进行初始化,然后使用lock()
方法进行全局加锁,然后根据判断当前线程池是否是运行状态,或者是否是创建空任务来决定是否调用start()
方法来启动工作线程,这里的start()
启动的不是我们任务中的run()
方法,而是new 的Worker对象的,因为new Worker()
也实现了Runnable接口,部分源码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//前面省略
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//后续省略
}
他其实最后调用的是runWorker()
方法。
runWorker
不难猜到,runWorker
方法执行的最终结果肯定是运行我们的任务,那么具体是怎么操作的呢?部分源码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
//省略
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
源码中可以看到,只要task任务不为空,循环将一直执行下去,其中getTask()
方法就是从阻塞队列中获取任务的逻辑,如果有任务,后续就是判断线程状态是否需要调用wt.interrupt()
来中断县城了,最后调用task.run()
来执行我们真正需要执行的任务。
getTask
getTask是从阻塞队列获取任务的方法,源码如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先我们看第一个判断条件rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())
总体来说就是线程池状态处于stop状态,或者队列为空并且线程状态处于非运行状态的情况下,则回收线程并返回空,不再提供任务。
第二个判断条件为(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())
大致意思是,如果工作线程大于最大线程池数量,则回收当前线程, 第二个是如果是否配置超时配置并且当前线程已经超时, 前两个有一个成立,后面如果工作线程大于1个或者队列为空两者满足其中一个,则回收当前线程。
最后就是空闲等待的判断,以及队列人物的提取逻辑,这里会通过捕获InterruptedException
异常的方法,唤醒阻塞在take()方法上的线程,然后再次循环来判断是否当前线程要被回收。
线程池的设置
我们根据业务功能可划分为两种:
-
I/O密集型:就是线程频繁需要和磁盘或者远程网络通信,这种场景中磁盘的耗时和网络通信的耗时较大,意味着线程处于阻塞期间,不会占用CPU资源,所以线程数量设置超过CPU核心数并不会造成问题。
-
CPU密集型:就是对CPU的利用率较高的场景,比如循环、递归、逻辑运算等,这种情况下线程数量设置越少,就越能减少CPU的上下文频繁切换。
根据这两种情况我们根据下面两个计算公式来设计线程池的数量
- CPU密集型,线程池大小设置为 cpu核心数+1。
- IO密集型,线程池大小设置为 2倍的CPU核心数+1。
理论仅供参考,根据实际业务来调整
结语
线程池大致分析就到这里,其实好像没有想象中的复杂,点个赞呗