Java 线程池

  1. 线程的创建和销毁都需要时间,当有大量的线程创建和销毁时,那么这些时间的消耗则比较明显,会导致性能问题。
  2. 大量的线程创建、执行和销毁是非常耗 cpu 和内存的(每个线程需要大约 1MB 内存),这样将直接影响系统的吞吐量,导致性能下降,如果内存资源占用的比较多,还很可能造成 OOM。
  3. 大量的线程的创建和销毁很容易导致 GC 频繁的执行,从而发生内存抖动现象,而发生了内存抖动,对于移动端来说,最大的影响就是造成界面卡顿。

针对上述所描述的问题,解决的办法是重用已有的线程,减少线程的创建。 于是出现了线程池的概念。

线程池的本质是对任务或线程的管理。

线程池执行流程

java_thread_pool_process

  1. 如果没有达到核心线程数量,创建一个核心线程来执行任务;
  2. 如果超过了核心线程数,任务会被添加到任务队列等待执行;
  3. 如果任务队列已满,而且线程数量没有达到设定的最大值,启动一个非核心线程来执行任务。
  4. 如果线程数量达到了设定的最大值,就会执行拒绝策略,调用 RejectedExecutionHandler#rejectedExecution() 方法来通知调用者。

Java-ThreadPool-Execute-Process

创建不同类型的线程池

java.util.concurrent.Executors 工具类提供了 5 种类型线程池。

FixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
  1. 创建一个固定线程数量的线程池,全部是核心线程;
  2. 使用 LinkedBlockingQueue,任务队列容量没有限制,按 FIFO 方式处理任务任务;

优点:

  • 当核心线程空闲时,能更快的响应外界请求;
  • 任务队列没有限制,不会触发拒绝策略;

缺点:

  • 线程不会被回收,不活动时浪费资源;
  • 当核心线程都活动时,任务不能立即执行,直到核心线程空闲;

该线程池可以控制线程的最大并发数,适用于长时间执行的任务;

FixedThreadPool

SingleThreadExecutor

1
2
3
4
5
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  1. 线程池中只有一个线程,所有任务都交给该线程串行执行;
  2. 任务队列使用 LinkedBlockingQueue

优点:

  • 当核心线程空闲时,能更快的响应外界请求;
  • 串行执行,不需要处理线程同步的问题;
  • 任务队列没有限制;

缺点:

  • 并发量高时性能低;

适合并发量低,耗时短的任务;

SingleThreadExecutor

CachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
  1. 无核心线程,最大线程数为 Integer.MAX_VALUE,相当于无限制。线程空闲时间为 60s;
  2. 任务队列使用 SynchronousQueue,内部没有任何容量的阻塞队列,相当于空集合。任务提交后立即执行;
  3. 当线程池中的线程都处于活动状态的时候,会创建一个新的线程来处理任务。有可用的空闲线程时会重用线程;

优点:

  • 任务不用等待,会创建新的线程执行;
  • 适用于并发量高的情况;
  • 线程空闲超过 60s 后会被回收,长时间运行不消耗资源;

缺点:

  • 提交大量任务时,会出现内存抖动的情况;

适合大量且耗时较少的任务;

CachedThreadPool

ScheduledThreadPoolExecutor

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
  1. 可指定核心线程数,非核心线程数无限制,有 OOM 的风险的。非核心线程数空闲是会立即回收;
  2. 可定时执行或周期执行任务的线程池。并且当非核心线程处于空闲状态时就会立即被回收;
  3. 任务队列使用 DelayedWorkQueue

适合执行定时任务和固定周期的重复任务。

优点:

  • 可自定义核心线程数;
  • 可执行定时任务以及有固定周期的重复任务;

缺点:

  • 超过核心线程时,后创建的任务不会执行,且不会调用拒绝策略;

ScheduleThreadPool

ForkJoinPool

创建 ForkJoin 框架中用到的 ForkJoinPool 线程池。ForkJoinPool 和 ThreadPoolExecutor 都是 AbstractExecutorService 的实现类。

1
2
3
4
5
6
7
/**
* Creates a work-stealing thread pool using all available processors as its target parallelism level.
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}

阻塞队列

阻塞队列用于存放线程池未能及时处理执行的任务,既解耦了任务的提交与执行,又能起到一个缓冲的作用。

阻塞队列

1. ArrayBlockingQueue

基于数组实现的有界阻塞队列,创建的时候需要指定容量。此类型的队列按照 FIFO(先进先出)的规则对元素进行排序。

ArrayBlockingQueue

2. LinkedBlockingQueue

基于链表实现阻塞队列,默认大小为 Integer.MAX_VALUE。按照 FIFO(先进先出)的规则对元素进行排序。

LinkedBlockingQueue

3. SynchronousQueue

一个不存储元素的阻塞队列。每一个 put 操作必须阻塞等待其他线程的 take 操作,take 操作也必须等待其他线程的 put 操作。

SynchronousQueue

4. PriorityBlockingQueue

一个基于数组利用堆结构实现优先级效果的无界队列,默认自然序排序,可以实现 Comparator 接口自定义排序规则。

PriorityBlockingQueue

5. DelayedWorkQueue

一个实现了优先级队列功能且实现了延迟获取的无界队列,在创建元素时,可以指定多久多久才能在队列中获取当前元素。只有延时期满了后才能从队列中获取元素。

DelayQueue

ExecutorService

java_thread_pool_exectuor

ExecutorService 是一个接口,从意义上来说可以叫做线程池的服务,因为它提供了众多接口来控制线程池中的线程。
ThreadPoolExecutor 是其实现类,它封装了一系列的 api 使得它具有线程池的特性,其中包括工作队列、核心线程数、最大线程数等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface Executor {
void execute(Runnable command);
}

public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//...
}

public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

public Future<?> submit(Runnable task) {
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
}

public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}

参数说明:
corePoolSize:线程池中的核心线程数量。
maximumPoolSize:线程池中的最大线程数量。
keepAliveTime:多余的空闲线程在超过 keepAliveTime 时间内没有任务的话则被销毁。如果设置 allowCoreThreadTimeOut(true) 则超时策略同样作用于核心线程池。
unit: keepAliveTime 的单位,常用的如:TimeUnit.SECONDS。
workQueue:任务队列(BlockingQueue),主要用来存储已经提交但未被执行的任务,不同的线程池采用的排队策略不一样。
threadFactory:线程工厂,用来创建线程池中的线程,通常用默认的即可。
handler:通常叫做拒绝策略。默认是 AbortPolicy,抛出异常。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
}

ThreadPoolExecutor 的运行状态有 5 种,分别为:

运行状态 状态描述
RUNNING 能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 关闭状态,不能再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP 不能接受新任务,也不能处理队列中的任务,会中断正在处理任务的线程。
TIDYING 所有的任务都已终止了,workerCount(有效线程数)为 0。
TERMINATED 在 terminated() 方法执行完后进入该状态。

线程池生命周期转换

拒绝策略

当提交线程时,如果核心线程数达到最大值,并且缓冲队列已满,并且达到了最大线程数,就会触发拒绝策略。

JDK 中内置的 4 种线程池拒绝策略:

  1. CallerRunsPolicy
  2. AbortPolicy
  3. DiscardPolicy
  4. DiscardOldestPolicy
1
2
3
4
5
6
7
8
9
10
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

1. CallerRunsPolicy:

1
2
3
4
5
6
7
8
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

2. AbortPolicy:

1
2
3
4
5
6
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}

功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程。
使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。

3. DiscardPolicy:

1
2
3
4
5
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

功能:直丢弃这个任务,不触发任何动作。
使用场景:如果提交的任务无关紧要可以使用它。因为它就是个空实现,会悄无声息的吞噬你的的任务。

4. DiscardOldestPolicy:

1
2
3
4
5
6
7
8
9
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行该任务。
使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是首个未执行的任务,而且是待执行优先级最高的任务。

下面介绍几种自定义的拒绝策略。

1. 打印日志,记录当前线程信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* https://github.com/sanshengshui/dubbo/blob/master/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/AbortPolicyWithReport.java
* AbortPolicyWithReport实现自ThreadPoolExecutor.AbortPolicy,拒绝策略实现类,
* 打印JStack,分析线程状态。
*/
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
/**
* 打印告警日志
*/
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// 打印 JStack,分析线程状态。
dumpJStack();
//抛出 RejectedExecutionException 异常
throw new RejectedExecutionException(msg);
}

}

2. 创建新线程执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}

3. 重新添加任务到阻塞队列,超时重试策略:

1
2
3
4
5
6
7
8
9
10
11
12
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}

throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;

public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
Objects.requireNonNull(chain, "handlerChain");
RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
return new RejectedExecutionHandlerChain(handlerChain);
}

private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain");
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}

定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的 rejectedExecution 依次执行一遍。

核心线程复用机制

一个线程只能启动一次,run() 方法执行完毕也就意味着线程结束。要想达到线程复用,在 run() 方法中死循环检查是否任务队列是否为空,不为空则取出任务执行,为空则 wait(),直到任务队列不为空。

源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//创建核心线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池是运行状态 && 添加任务到队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池没有运行,移除掉这个任务,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 没有线程,创建线程执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}// 线程池停止或任务队列满的情况
else if (!addWorker(command, false))//创建非核心线程
// 说明线程池已经关闭或达到最大线程数,执行拒绝策略
reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = new Worker(firstTask);
final Thread t = w.thread;

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
if (workerAdded) {
t.start();//启动线程
workerStarted = true;
}
return workerStarted;
}

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 以当前 Worker 创建线程,所以线程启动后会调用 Worker#run() 方法
this.thread = getThreadFactory().newThread(this);
}

public void run() {
// 该方法返回后,当前线程执行完毕,线程回收
runWorker(this);
}
}

final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
try{
// getTask() 从队列中获取任务,只有队列不为空,方法就不会返回
// getTask() 返回 null,线程就结束了
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
try{
task.run();
} finally{
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally{
processWorkerExit() 方法
}
}

// 循环从队列中获取任务
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// SHUTDOWN 状态下,队列不为空仍可执行任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();// AQS 操作将当前线程数量减一
return null;//返回 null,销毁线程
}

//获取线程池中线程的数量
int wc = workerCountOf(c);

//允许核心线程回收或当前线程数 > 核心线程数,则当前线程允许超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
1. 原则上线程池数量不可能大于 maximumPoolSize,但可能并发操作时 maximumPoolSize
设置比原来的小,就出现了当前工作线程大于最大线程的情况。这时就需要线程超时回收,
以维持线程池最大线程小于maximumPoolSize
2. timed && timedOut == true 表示当前线程需要进行超时控制 && 从队列中获取任务超时了
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

核心代码如下:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

只要 timed 为 false,这个 workQueue.take() 就一直阻塞,也就保证了线程不会被销毁。timed 的值又是通过 allowCoreThreadTimeOut 和正在运行的线程数量是否大于 coreSize 控制的。

其他

1. 核心线程能被回收吗?

核心线程默认不会被回收。但是可以调用 allowCoreThreadTimeOut() 让核心线程可以被回收。

2. 核心线程数设置多少合适?

需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为 CPU 密集型任务、IO 密集型任务和混合型任务。(N 代表 CPU 个数)

  • IO 型任务(比如读取数据库、文件读写以及网络通信),这些任务不会占据很多 cpu 的资源但是会比较耗时。线程数设置为 N x 2 + 1,充分利用 CPU 资源。
  • CPU 密集型任务(比如大量计算、解压、压缩等操作),这些任务会占据大量的 cpu。此时设置线程数为 N +1。

通常核心线程数设为 N + 1,而最大线程数设为 N x 2 + 1。

获取 CPU 数量的方法为 Runtime.getRuntime().availableProcessors()

3. shutdown()和 shutdownNow()的区别

  1. shutdown() 方法会等待正在执行的任务先完成后再关闭。
  2. shutdownNow() 方法则是阻止正在任务队列中等待任务的启动,并试图停止当前正在执行的任务。

4. 核心线程和非核心线程是如何区分的?

核心线程和非核心线程是一个抽象概念,只是用于更好的表述线程池的运行逻辑,实际上都对应操作系统的 osThread,都是重量级线程。

在新增 Worker 的时候,通过一个 boolean 表达是核心线程还是非核心线程,本质上两者没有什么不同。

5. 线程池执行的用户任务抛出异常会怎样?

  1. ThreadPoolExecutor.execute() 提交的任务如果出现了异常,会抛出异常,输出异常信息。移除异常的 Worker,并重新创建一个 Worker 线程。
    这里就可能存在一个风险,如果用户任务大量的抛出异常,可能会导出线程资源频繁的销毁、创建。
    源码是在 ThreadPoolExecutor#runWorker() 方法中。
  2. ThreadPoolExecutor.submit() 提交的用户任务,会包装成一个 FutureTask。出现异常时不会抛出异常,只是保存了异常信息,不显示日志,但是在调用 Future.get() 时会将异常信息重新抛出。并且异常是在用户线程中抛出的,因此不影响线程池中的线程。
    源码是在 FutureTask#run() 方法中。
  3. ScheduledThreadPoolExecutor.schedule() 提交的任务
    • 对于非周期性任务,同 ThreadPoolExecutor.submit() 不抛异常,也无日志。
    • 对于周期性任务,任何一次调度出现异常,都会导致后续无法调用。
      在使用这个线程池时,我们应当保证用户任务不会抛出异常到执行线程,避免任务调度失败和异常排查困难等问题。
      源码位置是 ScheduledThreadPoolExecutor.ScheduledFutureTask#run()

参考

[1] Thread pools and work queues(IBM)
[2] Android 性能优化之使用线程池处理异步任务
[3] Java 线程池实现原理及其在美团业务中的实践
[4] DynamicTp - 美团轻量级动态线程池