扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容介绍了“什么是ThreadPoolExecutor”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都做网站、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的兴文网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
ThreadPoolExecutor是一个通过使用可能几个池线程之一来执行每个提交任务的ExecutorService,这些线程池通常通过Executors工厂方法进行配置。
ThreadPoolExecutor中的线程池处理了两个不同的问题:
1、由于减少了每个任务调用的开销,在执行大量的异步任务时它们通常提供改进的性能;
2、它们提供了边界和管理资源的一种手段,包括多线程,在执行任务集合时的消耗。
每个ThreadPoolExecutor还维护一些基本的统计数据,例如完成任务的数量。
AtomicInteger类型的ctl代表了ThreadPoolExecutor中的控制状态,它是一个复核类型的成员变量,是一个原子整数,借助高低位包装了两个概念:
(1)workerCount:线程池中当前活动的线程数量,占据ctl的低29位;
(2)runState:线程池运行状态,占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。
//COUNT_BITS分割32位二进制偏移量,Integer.SIZE即Integer类型长度(32),COUNT_BITS=29,高3位保存线程池的状态,低29位用来计量对象池中工作线程数 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
AtomicInteger ctl的定义如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //异或运算符 100100|111=100111 private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池理想的最大工作线程数(上限):
//(1<获取线程池当前的工作线程数:
//通过(与)运算符,示例 11001&1111=1001,CAPACITY=0x1fffffff,所以就是ctl的值(&)CAPACITY就是只获取ctl低29位的值就是当前线程池的工作线程数 private static int workerCountOf(int c) { return c & CAPACITY; }基本变量
//用以下文中workers集合操作的锁 private final ReentrantLock mainLock = new ReentrantLock(); //用于保存任务并传递给工作线程的队列 private final BlockingQueueworkQueue; /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. * 保存线程池中所有工作线程的集合,仅在获取mainLock锁权限时可操作 */ private final HashSet workers = new HashSet (); /** * Wait condition to support awaitTermination 创建线程池的线程通过调用线程池引用.awaitTermination方法中通过termination实现持有锁后释放锁挂起等待工作线程tryTerminate操作成功唤醒,或者超时自动唤醒中断失败。 */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. 获取线程池工作集合历史最大容量,需获得锁 */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. 池完成任务数,在processWorkerExit函数持锁增量更新 */ private long completedTaskCount; //用以持锁任务创建worker时创建线程的工厂类 private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. * 在线程非RUNNING状态或者池容量和队列容器容量满载时拒绝处理对象 */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. * 空闲线程等待工作的超时时间(以纳秒为单位)。 * 当超过corePoolSize工作线程书或allowCoreThreadTimeOut为true时,线程将使用此超时。 * 否则,他们将永远等待新的工作 */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. * 如果为false(默认值为false),则即使处于空闲状态,核心线程也会保持活动状态。 * 如果为true,则活跃线程使用keepAliveTime来超时等待工作,达到阈值就会释放线程 */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. * 除非设置allowCoreThreadTimeOut,否则核心池大小是保持活动状态(不允许超时等)的最低数量, * 在这种情况下,最小值为零 */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. * 线程池最大工作线程书数,受CAPACITY约束,最大不会超过CAPACITY */ private volatile int maximumPoolSize; /** * The default rejected execution handler. * 默认拒绝策略处理器 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); 线程池状态解析:
RUNNING: Accept new tasks and process queued tasks(运行状态,接受新任务,并处理队列任务)
// 二进制位 -1=0x8000001(取反码后+1得补码)=0xfffffffe+1=0xffffffff // 右移29位后=0xe0000000即 1110 0000 0000 0000 0000 0000 0000 0000 // -536870912 private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN: Don't accept new tasks, but process queued tasks(停止运行状态,不接受新任务,但处理队列中任务)
//SHUTDOWN=0 private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(中断线程池工作状态,不接受新任务,不处理队列中准备弹出的任务,但是会执行完现有的工作任务(前提是在修改为STOP前,弹出队列的任务已经走过线程状态判断,执行业务方法,若正好弹出准备判断线程状态,STOP扭转成功,当前任务也会被拦截))
// STOP=0x20000000=0010 0000 0000 0000 0000 0000 0000 0000 // 536870912 private static final int STOP = 1 << COUNT_BITS;
TIDYING:All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING.will run the terminated() hook method(任务处理结束状态,workcount=0,线程池运行状态修改为TIDYING,并且会执行==terminated()==钩子函数)
// TIDYING=0x40000000=0100 0000 0000 0000 0000 0000 0000 0000 // 1073741824 private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED: terminated() has completed() (terminated()执行完成后,会修改成TERMINATED状态)
// TERMINATED=0x60000000=0110 0000 0000 0000 0000 0000 0000 0000 // 1610612736 private static final int TERMINATED = 3 << COUNT_BITS;线程池状态扭转过程
源码注释
/** The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() 显式调用线程池showdown()方法,或者线程池对象不被引用,被GC回收时调用finalize()函数,finalize()函数中调用shutdown() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() 显式调用shutdownNow()扭转状态,修改线程池中workers所有工作线程为中断状态,让接下来队列弹出的任务都跳过执行任务 * SHUTDOWN -> TIDYING * When both queue and pool are empty 工作线程全部执行完成且队列也是空,则扭转状态 * STOP -> TIDYING * When pool is empty 当没有任务时,状态扭转 * TIDYING -> TERMINATED * When the terminated() hook method has completed 执行terminated(),try{terminated()}finally{扭转状态} */获取线程状态
// Packing and unpacking ctl //~CAPACITY 连同符号位反转(即相反数-1,若不理解百度反码和补码) 得 0xe0000000 ,就是取高三位做位与计算 private static int runStateOf(int c) { return c & ~CAPACITY; }线程池方法分析
1.执行任务入口解析(流程图)
public void execute(Runnable command) { //任务非空校验 if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //获取当前线程池计数器值 int c = ctl.get(); //判断当前工作线程池的活动线程数是否<核心线程数 if (workerCountOf(c) < corePoolSize) { //进入addWorker函数,参数true(标识创建核心线程数工作线程,该函数中会对该标识识别是当前工作数量数比较核心线程数还是最大线程数),检查是否可创建worker任务线程 if (addWorker(command, true)) return; //执行此步,意味着进入addWorker函数,资源被其他线程争夺,导致该任务没有抢到创建核心工作线程的资源,二次获取最新活动线程数 c = ctl.get(); } //检查线程池状态是否为RUNNING状态,并且任务队列是否可追加该任务 if (isRunning(c) && workQueue.offer(command)) { //重新获取线程池ctl值 int recheck = ctl.get(); //检查当前线程池状态为非RUNNING状态,且从队列容器中回滚该任务 if (! isRunning(recheck) && remove(command)) //拒绝加入任务,实则调用上文中handlder的rejectedExecution()抛出异常(默认AbortPolicy中止策略) reject(command); //获取最后一次获取的计量值,判断是否工作线程均已完成任务,因为很有可能之前在36行操作之前工作线程数已达最大线程数阈值,但是正好刚加入到队列中后,线程已全部执行完成,且释放了,所以需要创建一个空任务的worker线程用以调用runWorker中从队列中弹出任务去执行(具体查看getTask()) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //第三步则意味着第二步可能池状态非RUNNING,当然如果是非RUNNING状态,在addWorker判断池状态是否可接受新非核心任务。 //也有可能是队列满载,该任务会插队尝试创建非核心工作线程,如果创建失败,会触发拒绝策略异常 else if (!addWorker(command, false)) reject(command); } //拒绝任务 final void reject(Runnable command) { //默认拒绝策略Handler-AbortPolicy handler.rejectedExecution(command, this); }流程图:
2.创建任务线程(addWorker)(流程图)
/** * firstTask:创建任务工作线程RunWorker-执行的第一个任务,也有可能是空任务(唤醒任务) * core:true(核心工作线程),false(非核心工作线程) 对应的是比较corePoolSize和maxPoolSize条件。 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //STOP、TIDYING、TERMINATED状态不接受新任务,所以直接拒绝,创建任务失败 //SHUTDOWN状态下,仅允许创建task为null的唤醒任务(前提队列中存在任务),因为队列中有任务,否则唤醒任务创建线程无意义不允许创建工作线程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //根据core标识,对应比较阈值,首先保证不能>=(1>>29)-1,否则不允许创建工作线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS当前值+1替换当前值,根绝替换返回值判断是否替换成功,成功,直接跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; //意味着CAS替换失败,重新取值,判断最新池状态是否还是RUNNING,RUNNNING状态则继续执行该循环体,尝试ctl+1操作 //否则直接跳入外循环,进行状态判断是否允许创建任务线程 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //此次任务工作的创建标记以及对应的线程启动标记 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建任务工作线程,查看下文代码的Worker源码 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //尝试持有worker集合的权限独占锁 mainLock.lock(); try { //如果获得锁时,线程池状态非RUNNING或SHUTDOWN状态TASK不为空,则不允许该任务工作对象加入集合,也不允许线程启动 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //检查线程状态是否可启动 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //记录worker集合在某一刻的长度最大数,按照配置来说,也就是同时存活存货线程数最大也顶多就是MaxPoolSize if (s > largestPoolSize) largestPoolSize = s; //开启任务工作对象加入集合成功标记 workerAdded = true; } } finally { //释放worker集合的权限独占锁,因为可能同一时刻有N个任务需要创建对象加入workers集合 mainLock.unlock(); } if (workerAdded) { //加入工作集合成功,则需要启动本次工作对象的内置线程 t.start(); //工作对象线程启动成功标记 workerStarted = true; } } } finally { if (! workerStarted) //添加任务工作对象失败,看下文源码 addWorkerFailed(w); } return workerStarted; } //持MainLock锁,让workers集合移除添加失败的任务,以及上文中ctl的cas自增操作回滚,尝试中止线程,最终释放锁 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); //递归循环,直至降一操作成功 decrementWorkerCount(); //尝试停止线程池,该处不详做介绍,在runWorker中会有介绍 tryTerminate(); } finally { mainLock.unlock(); } } //ThreadPoolExectutor内部类 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //该任务工作对象内置线程(用来处理该工作对象中的任务,以及队列中的任务),如果是ThreadFactory是异常的,则thread一定是null final Thread thread; //该任务工作对象的初始化任务,有可能是NULL(唤醒任务) Runnable firstTask; //该工作任务执行完成的任务次数 volatile long completedTasks; /** * 构造New实例的内部变量 * state默认为-1,在runWorker中执行到持锁修改state=1,才可以触发线程中断信号,查看下文interruptIfStarted */ Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 将处理逻辑交给ThrearunWorker */ public void run() { runWorker(this); } //判断该任务工作对象是否有线程持有锁 protected boolean isHeldExclusively() { return getState() != 0; } //线程池修改为STOP状态时,会对worker集合中的工作对象内置线程发送中断信号 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } //上面的任务工作创建失败后的回滚工作线程数自增操作 private void decrementWorkerCount() { //可以看到是递归降一操作,循环降一操作,直至成功才退出循环 do {} while (! compareAndDecrementWorkerCount(ctl.get())); }流程图:
3.任务工作线程执行解析(runWorker)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //获取工作初始化对象时的任务 Runnable task = w.firstTask; //然后置空,防止重复执行 w.firstTask = null; //本处并不是释放锁,只是把默认state(-1)修改为0,允许中断。 w.unlock(); boolean completedAbruptly = true; try { //如果内置任务是NULL,就会去从队列中弹出任务处理,空队列就会阻塞或者超时阻塞。 while (task != null || (task = getTask()) != null) { w.lock(); /** * 两次检查 * 第一次检查 如果是>=STOP状态 * 第二次检查 获取当前线程中断信号(该静态方法会清除中断信号)且判断是否>=STOP状态 * 前两次检查任一满足,则继续检查该线程是否中断,未中断将中断该线程 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //子实现类执行任务前的钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { //执行execute传入的任务,或者execute加入到队列中的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //子实现类执行任务后的钩子函数 afterExecute(task, thrown); } } finally { task = null; //每个工作对象内部会记录worker对应的线程处理了多少个任务(无论任务内部工作是否有异常),但是如果遇到某个任务抛出异常后,该线程就会释放 //比如队列容量80个,池工作最大工作线程数是20个,然后队列满载的情况下,极有可能每个线程在执行初始化的内置任务zhi w.completedTasks++; w.unlock(); } } //如果任务执行过程中出现异常,不会执行此步 completedAbruptly = false; } finally { //线程释放后的退出处理工作,会把此次执行任务的结果和工作对象传递给该函数。 processWorkerExit(w, completedAbruptly); } } //用于子类实现类的runWorker的任务执行前置钩子函数 protected void beforeExecute(Thread t, Runnable r) { } //用于子类实现类的runWorker的任务执行后置钩子函数 protected void afterExecute(Thread t, Runnable r) { } private void processWorkerExit(Worker w, boolean completedAbruptly) { //未完成标记,就先回滚数量,比如工作任务执行异常,或者开启核心线程超时配置,指定时间未收到队列唤醒 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //持锁,统计工作对象中完成的次数,累加到线程池的累计变量 completedTaskCount += w.completedTasks; //集合移除该工作对象 workers.remove(w); } finally { mainLock.unlock(); } //尝试中断,RUNNING状态或者SHUTDOWN状态下队列中有任务,该操作无需理会,下文有该方法详细介绍 tryTerminate(); int c = ctl.get(); /** * 1.如果是STOP状态即值以上的状态,该操作跳过 * 2.该步操作主要是该线程处理任务结果来判断,如果是异常退出,直接创建一个空任务的处理处理线程 * 3.如果正常线程处理完成释放的线程,判断是allowCoreThreadTimeOut是否是true,如果是且队列是空,则有可能是线程超时未取到任务而释放线程的,则所有线程return返回直接释放,无需创建线程,否则则查看队列中是否有任务未处理完(如果有任务则需要最少一个线程,如果是最后一个线程,需要再创建一个空任务处理线程,由队列弹出任务来自旋处理),如果是allowCoreThreadTimeOut为默认值false,判断是否超过核心线程数如果超过就直接释放线程,否则需要再创建一个空任务的处理线程 **/ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } //创建一个非核心的空任务线程用来处理队列中的任务 addWorker(null, false); } } final void tryTerminate() { //自旋尝试停止线程池,前提是非RUNNING状态或非(SHUTDOWN状态下队列不为空)的情况之一,否则直接跳出循环 for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果是活跃线程数>0,就会从工作者列表中从第一个开始取,直到没有中断的工作线程,然后对该线程发送中断信号 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //在持有锁之后,则把shutdown或者stopz状态尝试扭转为tidying状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); //唤醒尝试tryTerminate过程中阻塞在condition队列中的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // 继续自旋尝试该次操作 } }“什么是ThreadPoolExecutor”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
分享标题:什么是ThreadPoolExecutor
网站网址:http://csdahua.cn/article/gpppgo.html
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流