本文最后更新于 270 天前,其中的信息可能已经有所发展或是发生改变。
线程回顾
创建线程的方式
- 继承 Thread 类
- 实现 Runnable 接口
创建后的线程有如下状态:
NEW:新建的线程,无任何操作
| public static void main(String[] args) { |
| Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName() + "is running")); |
| Thread.State state = thread.getState(); |
| |
| System.out.println(state); |
| } |
RUNNABLE:可执行的线程,在 JVM 中执行但是在等待操作系统的资源
| public static void main(String[] args) { |
| Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName() + "is running")); |
| thread.start(); |
| Thread.State state = thread.getState(); |
| |
| System.out.println(state); |
| } |
BOLCKED:阻塞,获取不到锁
| public static void main(String[] args) { |
| Thread thread1 = new Thread(() -> { |
| synchronized (Test.class) { |
| try { |
| Thread.sleep(10000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| }); |
| |
| Thread thread2 = new Thread(() -> { |
| synchronized (Test.class) { |
| } |
| }); |
| thread1.start(); |
| thread2.start(); |
| |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| |
| Thread.State state = thread2.getState(); |
| System.out.println(state); |
| } |
WAITING:等待,等待其他线程进行操作,时间不确定
| public static void main(String[] args) { |
| Thread thread = new Thread(LockSupport::park); |
| thread.start(); |
| |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| Thread.State state = thread.getState(); |
| System.out.println(state); |
| |
| |
| LockSupport.unpark(thread); |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| state = thread.getState(); |
| System.out.println(state); |
| } |
TIMED_WAITING:等待,等待的时间是确定的
| public static void main(String[] args) { |
| Thread thread = new Thread(() -> { |
| try { |
| Thread.sleep(10000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| }); |
| thread.start(); |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| Thread.State state = thread.getState(); |
| System.out.println(state); |
| } |
TERMINATED:终止,线程已经运行完毕
| public static void main(String[] args) { |
| Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName() + "is running")); |
| thread.start(); |
| try { |
| |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| Thread.State state = thread.getState(); |
| System.out.println(state); |
| } |
引入线程池
上述创建线程的方式存在如下缺陷:
- 线程使用完后会被销毁,高并发场景下频繁创建和销毁线程的性能开销不可忽略
- 无法控制线程并发数量,线程过多会导致 JVM 宕机
线程池是一种池化思想,由于创建和销毁线程需要时间,以及系统资源开销,我们需要一个“管理者"来统一管理线程及任务分配,减少这些开销,解决资源不足的问题。
在主要大厂的编程规范中,不允许在应用中自行显式地创建线程,线程必须通过线程池提供。
线程池解决了什么问题
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗:
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行:
- 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
- 节省CPU切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
- 提供更强大的功能,延时定时线程池
线程池引发了什么问题
- 异步任务提交后,如果JVM宕机,已提交的任务会丢失,需要考虑确认机制
- 使用不合理可能导致内存溢出问题
- 参数过多,代码结构引入数据结构与算法,增加使用难度
线程池概述
线程池继承结构
- 最常用的是 ThreadPoolExecutor
- 调度用 ScheduledThreadPoolExecutor,类似 Timer 和 TimerTask。
- 任务拆分合并用 ForkJoinPool
- Executors是工具类,协助创建线程池的
线程池工作状态
-
RUNNING(运行状态):这是线程池的初始状态。
- 在此状态下,线程池接受新任务,并且也会处理等待队列中的任务。
- 线程池的线程会一直运行,直到被转换到其他状态。
-
SHUTDOWN(关闭状态):当调用线程池的shutdown()方法时,线程池会进入此状态。
- 在此状态下,线程池不接受新任务,但会继续处理等待队列中的任务。
- 等待队列中的任务处理完毕后,线程池中的线程会逐渐结束,直到所有线程结束运行。
-
STOP(停止状态):当调用线程池的shutdownNow()方法时,线程池会进入此状态。
- 在此状态下,线程池不接受新任务,同时也不处理等待队列中的任务
- 而是尝试停止所有正在执行的任务,并且回收线程池中的所有线程。
-
TIDYING(整理状态):当所有的任务已经终止,workerCount(工作线程数)为0时,线程池会进入此状态。
- 此时,会执行terminated()钩子方法,允许线程池执行一些收尾工作。
-
TERMINATED(终止状态):terminated()钩子方法执行完毕后,线程池会进入此状态。
- 在终止状态下,线程池的任务完全结束,不再有任何活动。
七个核心参数
参数名 |
描述 |
corePoolSize |
核心线程池基本大小,核心线程数 |
maximumPoolSize |
线程池最大线程数 |
keepAliveTime |
线程空闲后的存活时间 |
TimeUnit unit |
线程空闲后的存活时间单位 |
BlockingQueue workQueue |
存放任务的阻塞队列 |
ThreadFactory threadFactory |
创建线程的工厂 |
RejectedExecutionHandler handler |
当阻塞队列和最大线程池都满了之后的饱和策略 |
corePoolSize
核心线程数
- 线程池刚创建时,线程数量为0,当每次执行 execute 添加新的任务时会在线程池创建一个新的线
程,直到线程数量达到 corePoolSize 为止。
- 核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程
池也会优先创建新线程处理
- 设置 allowCoreThreadTimeout=true (默认false)时,核心线程超时会关闭
maximumPoolSize
最大线程数
- 当池中的线程数 >= corePoolSize ,且任务队列已满时。线程池会创建新线程来处理任务
- 当池中的线程数 = maximumPoolSize ,且任务队列已满时,线程池会拒绝处理任务而抛出异常
如果使用无界的阻塞队列,该参数不生效
BlockingQueue
阻塞队列
- 当线程池正在运行的线程数量已经达到 corePoolSize ,那么再通过 execute 添加新的任务则会被加到 workQueue 队列中
- 任务会在队列中排队等待执行,而不会立即执行
- 一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue , LinkedBlockingQueue , SynchronousQueue
keepAliveTime & TimeUnit
保活时间及其单位
- 当线程空闲时间达到 keepAliveTime 时,线程会退出,直到线程数量 =corePoolSize
- 如果 allowCoreThreadTimeout=true ,则会直到线程数量=0
keepAliveTime 是时间的大小,TimeUnit 是时间单位
ThreadFactory
线程工厂
- 主要用来创建线程
- 通过newThread()方法提供创建线程,该方法创建的线程都是“非守护线程”而且“线程优先级都是默认优先级”
RejectedExecutionHandler
拒绝策略
- 当线程数已经达到 maxPoolSize ,且队列已满,会拒绝新任务
- 当线程池被调用 shutdown() 后,会等待线程池里的任务执行完毕,再 shutdown 。如果在调用shutdown() 和线程池真正 shutdown 之间提交任务,会拒绝新任务
- 当拒绝处理任务时线程池会调用 rejectedExecutionHandler 来处理这个任务。
如果没有设置默认是 AbortPolicy ,另外在 ThreadPoolExecutor 类有几个内部实现类来处理这类情况:
- ThreadPoolExecutor.AbortPolicy :丢弃任务并抛出 RejectedExecutionException 异常。
- ThreadPoolExecutor.CallerRunsPolicy :由调用线程处理该任务
- ThreadPoolExecutor.DiscardPolicy :也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy :丢弃队列最前面的任务,然后重新尝试执行任务
另外实现 RejectedExecutionHandler 接口即可实现自定义的拒绝策略
线程池逻辑结构
线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的华线程。
一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
当一个任务被提交,线程池会进行如下工作:
-
首先判断当前的核心线程数量如果小于核心线程数,创建一个核心线程并执行任务
-
如果大于核心线程数,则尝试将其放入等待队列,如果队列没有满则放入队列等待执行
-
如果队列已满,则判断非核心线程数的数量+核心线程数是否小于最大线程数量
-
小于:则创建一个非核心线程并执行任务(并不会取队列中的任务)
-
大于:执行拒绝策略
线程池线程数设置
虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。
因此按照任务类型分类,对不同的任务类型确定不同的线程数量。
任务类型分类
- IO密集型任务:
- 此类任务主要是执行 IO 操作。由于执行 lO 操作的时间较长,导致 CPU 的利用率不高,这类任务 CPU 常处于空闲状态。
- Netty 的 IO 读写操作为此类任务的典型例子
- CPU 密集型任务:
- 此类任务主要是执行计算任务。由于响应时间很快,CPU 一直在运行,这种任务 CPU 的利用率很高。
- 例如设计加密解密算法等大量需要 CPU 运算的场景
- 混合型任务:
- 此类任务既要执行逻辑计算,又要进行 IO 操作(如 RPC 调用、数据库访问)相对来说,由于执行 IO 操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的 CPU 利用率也不是太高。
- Web 服务器的 HTTP 请求处理操作为此类任务的典型例子
确定任务线程数
-
IO 密集型任务:
- 由于 IO 密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开 CPU 核心数两倍的线程
- Netty 的 IO 处理任务就是典型的 IO 密集型任务,所以,Netty 的 Reactor 实现类(定制版的线程池)的 IO 处理线程数默认正好为 CPU 核数的两倍
-
CPU密集型任务:
- CPU 密集型的任务并行的任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,一般开等于 CPU 的核心数的线程数量
- 比如 4 个核心的 CPU,通过 4 个线程并行地执行 4 个 CPU 密集型任务,此时的效率是最高的。但是如果线程数远远超出 CPU 核心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。
-
混合型任务:
-
业界有一个比较成熟的估算公式,具体如下:
-
| 最佳线程数 = ((线程等待时间 + 线程 CPU 时间) / 线程 CPU 时间) * CPU核数 |
-
通过公式可以看出:等待时间所占的比例越高,需要的线程就越多,CPU 耗时所占的比例越高,需要的线程就越少
-
比如在 Web 服务器处理 HTTP 请求时,假设平均线程 CPU 运行时间为 100 毫秒,而线程等待时间(比如包括 DB 操作、RPC 操作作、缓存操作等)为 900 毫秒,如果 CPU 核数为 8 那么根据上面这个公式,估算如下:(900 毫秒+100 毫秒) / 100 毫秒 * 8 = 10 * 8 = 80
,最好开 80 个线程
关键源码剖析
属性 & 构造方法
对于 ThreadPoolExecutor 有几个关键的属性,这里需要先大致了解:
| public class ThreadPoolExecutor extends AbstractExecutorService { |
| |
| |
| private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
| |
| |
| private final BlockingQueue<Runnable> workQueue; |
| |
| |
| private final HashSet<Worker> workers = new HashSet<>(); |
| |
| |
| private final ReentrantLock mainLock = new ReentrantLock(); |
| |
| |
| private final Condition termination = mainLock.newCondition(); |
| |
| |
| private volatile ThreadFactory threadFactory; |
| |
| |
| private volatile RejectedExecutionHandler handler; |
| |
| |
| private volatile long keepAliveTime; |
| |
| |
| private volatile boolean allowCoreThreadTimeOut; |
| |
| |
| private volatile int corePoolSize; |
| |
| |
| private volatile int maximumPoolSize; |
| |
| ...... |
| } |
对于其构造方法如下:
| public ThreadPoolExecutor(int corePoolSize, |
| int maximumPoolSize, |
| long keepAliveTime, |
| TimeUnit unit, |
| BlockingQueue<Runnable> workQueue, |
| ThreadFactory threadFactory, |
| RejectedExecutionHandler handler) { |
| if (corePoolSize < 0 || |
| maximumPoolSize <= 0 || |
| maximumPoolSize < corePoolSize || |
| keepAliveTime < 0) |
| throw new IllegalArgumentException(); |
| if (workQueue == null || threadFactory == null || handler == null) |
| throw new NullPointerException(); |
| this.corePoolSize = corePoolSize; |
| this.maximumPoolSize = maximumPoolSize; |
| this.workQueue = workQueue; |
| this.keepAliveTime = unit.toNanos(keepAliveTime); |
| this.threadFactory = threadFactory; |
| this.handler = handler; |
| } |
由此可见,当我们手动创建线程池时,线程池并不会直接工作,而是将我们手动设置的关键参数进行初始化。
execute 方法
ThreadPoolExecutor 的最基本使用方式就是通过 execute 方法提交一个 Runnable 任务,首先看图理解 execute 的执行逻辑
查看源码中的 execute 方法,这里我们分层次来看,先看最主要的外部的几个条件判断:
| public void execute(Runnable command) { |
| if (command == null) |
| throw new NullPointerException(); |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| int c = ctl.get(); |
| |
| if (workerCountOf(c) < corePoolSize) { |
| if (addWorker(command, true)) |
| return; |
| c = ctl.get(); |
| } |
| |
| if (isRunning(c) && workQueue.offer(command)) { |
| int recheck = ctl.get(); |
| if (! isRunning(recheck) && remove(command)) |
| reject(command); |
| else if (workerCountOf(recheck) == 0) |
| addWorker(null, false); |
| } |
| else if (!addWorker(command, false)) |
| reject(command); |
| } |
其实源码里面的注释解释的已经很明白了,这里的逻辑其实已经说明了一些常见的问题:
- 当提交任务时,只要核心线程数没有满,都会新建核心线程执行任务,不会使用以前创建好的空闲的核心线程去执行
- 当无法向队列提交任务时,如果可以新建非核心线程,则会新建非核心线程并执行任务,不会在创建之后从队列中取任务执行
addWorker 方法
接下来我们灌注一下 addWorker 方法:
| |
| private boolean addWorker(Runnable firstTask, boolean core) { |
| |
| retry: |
| for (int c = ctl.get();;) { |
| |
| |
| |
| |
| if (runStateAtLeast(c, SHUTDOWN) |
| && (runStateAtLeast(c, STOP) |
| || firstTask != null |
| || workQueue.isEmpty())) |
| return false; |
| |
| for (;;) { |
| |
| |
| |
| if (workerCountOf(c) |
| >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) |
| return false; |
| |
| if (compareAndIncrementWorkerCount(c)) |
| break retry; |
| |
| c = ctl.get(); |
| |
| if (runStateAtLeast(c, SHUTDOWN)) |
| continue retry; |
| |
| |
| } |
| } |
| |
| |
| |
| |
| boolean workerStarted = false; |
| |
| boolean workerAdded = false; |
| Worker w = null; |
| try { |
| |
| |
| w = new Worker(firstTask); |
| final Thread t = w.thread; |
| if (t != null) { |
| |
| final ReentrantLock mainLock = this.mainLock; |
| mainLock.lock(); |
| try { |
| int c = ctl.get(); |
| |
| |
| |
| |
| |
| if (isRunning(c) || |
| (runStateLessThan(c, STOP) && firstTask == null)) { |
| if (t.isAlive()) |
| throw new IllegalThreadStateException(); |
| |
| workers.add(w); |
| int s = workers.size(); |
| |
| if (s > largestPoolSize) |
| largestPoolSize = s; |
| |
| workerAdded = true; |
| } |
| } finally { |
| mainLock.unlock(); |
| } |
| |
| if (workerAdded) { |
| t.start(); |
| |
| workerStarted = true; |
| } |
| } |
| } finally { |
| |
| if (! workerStarted) |
| addWorkerFailed(w); |
| } |
| return workerStarted; |
| } |
| |
| |
| private void addWorkerFailed(Worker w) { |
| final ReentrantLock mainLock = this.mainLock; |
| mainLock.lock(); |
| try { |
| |
| if (w != null) |
| workers.remove(w); |
| |
| decrementWorkerCount(); |
| |
| tryTerminate(); |
| } finally { |
| mainLock.unlock(); |
| } |
| } |
先看看 addWork 的两个参数,第一个是需要提交的线程Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。
其中三处调用 addWork:
- 第一次,条件if (workerCountOf(c) < corePoolSize):这个很好理解,工作线程数少于核心线程数,提交任务。所以addWorker(command, true)
- 第二次,如果 workerCountOf(recheck) == 0: 如果worker的数量为0,那就 addWorker(null,false),由于之前已经把command提交到阻塞队列了workQueue.offer(command)。所以提交一个空线程,直接从阻塞队列里面取就可以了。
- 第三次,如果线程池没有RUNNING或者offer阻塞队列失败,addWorker(command,false):说明阻塞队列满了,将任务提交到非核心线程池。
Worker 对象
线程池中的每一个线程被封装成了内部的一个Worker对象,ThreadPool维护的其实就是一组Worker对象:
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ |
| private static final long serialVersionUID = 6138294804551838833L; |
| |
| |
| final Thread thread; |
| |
| Runnable firstTask; |
| |
| volatile long completedTasks; |
| |
| |
| Worker(Runnable firstTask) { |
| |
| setState(-1); |
| this.firstTask = firstTask; |
| |
| this.thread = getThreadFactory().newThread(this); |
| } |
| |
| |
| public void run() { |
| runWorker(this); |
| } |
| |
| |
| |
| |
| |
| |
| protected boolean isHeldExclusively() { |
| return getState() != 0; |
| } |
| |
| |
| protected boolean tryAcquire(int unused) { |
| if (compareAndSetState(0, 1)) { |
| setExclusiveOwnerThread(Thread.currentThread()); |
| return true; |
| } |
| return false; |
| } |
| |
| |
| protected boolean tryRelease(int unused) { |
| setExclusiveOwnerThread(null); |
| setState(0); |
| return true; |
| } |
| |
| |
| public void lock() { acquire(1); } |
| |
| |
| public boolean tryLock() { return tryAcquire(1); } |
| |
| |
| public void unlock() { release(1); } |
| |
| |
| public boolean isLocked() { return isHeldExclusively(); } |
| |
| |
| void interruptIfStarted() { |
| Thread t; |
| if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { |
| try { |
| t.interrupt(); |
| } catch (SecurityException ignore) { |
| } |
| } |
| } |
| } |
runWorker 方法
Worker 方法中的 run 方法实际调用的是线程池中的 runWorker 方法:
| final void runWorker(Worker w) { |
| |
| Thread wt = Thread.currentThread(); |
| |
| Runnable task = w.firstTask; |
| |
| w.firstTask = null; |
| |
| w.unlock(); |
| |
| boolean completedAbruptly = true; |
| try { |
| |
| |
| |
| |
| |
| while (task != null || (task = getTask()) != null) { |
| |
| w.lock(); |
| |
| |
| if ((runStateAtLeast(ctl.get(), STOP) || |
| (Thread.interrupted() && |
| runStateAtLeast(ctl.get(), STOP))) && |
| !wt.isInterrupted()) |
| wt.interrupt(); |
| try { |
| |
| beforeExecute(wt, task); |
| try { |
| |
| task.run(); |
| |
| afterExecute(task, null); |
| } catch (Throwable ex) { |
| |
| afterExecute(task, ex); |
| throw ex; |
| } |
| } finally { |
| |
| task = null; |
| |
| w.completedTasks++; |
| |
| w.unlock(); |
| } |
| } |
| |
| completedAbruptly = false; |
| } finally { |
| |
| |
| processWorkerExit(w, completedAbruptly); |
| } |
| } |
runWorker 方法的本质其实就是通过一个 while 循环不断地通过 getTask() 方法获取任务,在调用方法执行的时候会先获取外部传递的任务,如果没有获取到外部传递的任务则调用 getTask() 方法获取任务队列中的任务并执行
getTask 方法
| private Runnable getTask() { |
| |
| boolean timedOut = false; |
| for (;;) { |
| int c = ctl.get(); |
| int rs = runStateOf(c); |
| |
| |
| |
| |
| |
| |
| |
| |
| if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { |
| decrementWorkerCount(); |
| return null; |
| } |
| int wc = workerCountOf(c); |
| |
| |
| |
| |
| |
| boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if ((wc > maximumPoolSize || (timed && timedOut)) |
| |
| && (wc > 1 || workQueue.isEmpty())) { |
| if (compareAndDecrementWorkerCount(c)) |
| return null; |
| |
| continue; |
| } |
| try { |
| |
| |
| Runnable r = timed ? |
| workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : |
| workQueue.take(); |
| if (r != null) |
| return r; |
| |
| timedOut = true; |
| } catch (InterruptedException retry) { |
| |
| timedOut = false; |
| } |
| } |
| } |
综上所述,线程池内部将我们提交的任务封装成了一个个 Woker 对象来管理,我们调用线程池的 execute 方法,实际上:
- execute 根据策略调用 -> addWoker 方法
- addWoker 方法调用 Worker 的 run 方法
- Worker 的 run 方法调用 runWorker 方法
- runWorker 方法不断去用 getTask 获取任务执行