Java Thread

进程是操作系统资源分配和调度的基本单位,而线程是处理器任务调度和执行的基本单位。
线程寄宿在进程当中,线程的生命周期直接被进程所影响,而进程的存活又和其优先级直接相关。线程的调度受时间片轮转优先级等因素影响。

多线程编程复杂原因之一在于其并行的特性

优点:

  1. 提高 CPU 使用率;
  2. 防止阻塞;
  3. 便于建模;

缺点:

  1. 数据安全问题;
  2. 死锁;
  3. 消耗内存;
  4. 线程频繁调度,切换上下文耗时;

每一个新开的线程就像扔进湖面的石子,在你忽视的远处产生涟漪。

Thread 源码分析

源码基于 jdk 1.8

类定义

1
2
3
4
5
6
/**
* A thread is a thread of execution in a program.
* The Java Virtual Machine allows an application to have multiple
* threads of execution running concurrently.
*/
public class Thread implements Runnable {}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}

public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

Thread(Runnable target, AccessControlContext acc) {
init(null, target, "Thread-" + nextThreadNum(), 0, acc, false);
}

public Thread(ThreadGroup group, Runnable target) {
init(group, target, "Thread-" + nextThreadNum(), 0);
}

public Thread(String name) {
init(null, null, name, 0);
}

public Thread(ThreadGroup group, String name) {
init(group, null, name, 0);
}

public Thread(Runnable target, String name) {
init(null, target, name, 0);
}

public Thread(ThreadGroup group, Runnable target, String name) {
init(group, target, name, 0);
}

public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
init(group, target, name, stackSize);
}

构造方法会调用 init() 方法进行初始化操作。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void init(ThreadGroup g, Runnable target, String name, long stackSize) {
init(g, target, name, stackSize, null, true);
}

private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {

// 线程名不能为空,默认是 Thread-n
if (name == null) {
throw new NullPointerException("name cannot be null");
}

this.name = name;

// 只所以叫 parent,是因为 thread 对象是从当前线程初始化的
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
// 通过 SecurityManager 或当先线程设置 ThreadGroup,用于线程访问限制。
}

this.group = g;
this.daemon = parent.isDaemon();//守护线程
this.priority = parent.getPriority();//线程优先级
this.target = target;//设置运行的任务
setPriority(priority);// 设置优先级,不能大于线程组的最大优先级
if (inheritThreadLocals && parent.inheritableThreadLocals != null){
// 创建 ThreadLocal 对象
}
this.stackSize = stackSize;
this.stackSize = stackSize;//设置线程堆栈大小
tid = nextThreadID();//同步方法,设置线程的 id
}

线程状态

1
2
3
4
5
6
7
8
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
  1. NEW
    线程实例化以后就进入了初始状态。
  2. RUNNABLE
    线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。
    READY 状态:
    • 就绪状态只是说有资格运行,调度程序没有调度就永远是就绪状态。
    • 线程调用 start() 方法进入就绪状态。
    • 当前线程 sleep() 方法结束、其他线程 join() 结束、等待用户输入完毕、某个线程拿到对象锁,线程也将进入就绪状态。
    • 线程时间片结束、调用当前线程的 yield() 方法,当前线程进入就绪状态。
    • 锁池里的线程拿到对象锁后,进入就绪状态。
  3. BLOCKED
    线程阻塞在进入 synchronized 关键字修饰的方法或代码块时的状态。
  4. WAITING
    处于这种状态的线程不会被分配 CPU 执行时间,它们要等待被显式地唤醒,否则会处于无限期等待的状态。
  5. TIMED_WAITING
    处于这种状态的线程不会被分配 CPU 执行时间,不会无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。
  6. TERMINATED
    当线程的 run()方法完成时,或者出现异常,就认为它终止了。线程一旦终止了,就不能复生。

线程状态

常用方法

start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private volatile int threadStatus = 0;

public synchronized void start() {
// 线程状态,0 代表新建状态,非 0 时会抛出异常,即一个 Thread 只能 start() 一次
if (threadStatus != 0)
throw new IllegalThreadStateException();

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}

只有调用 start() 方法后,JVM 才会调度该线程。

sleep()

使当前线程休眠指定毫秒,释放 CPU 资源,不释放锁。

1
public static native void sleep(long millis) throws InterruptedException;

yield()

调用此方法的线程将放弃 CPU 资源,和其它线程重新一起争夺新的 CPU 使用权限。不释放锁。

1
public static native void yield();

join()

等待调用此方法的线程结束或超时以后,其他线程才继续运行。释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
// 判断调用 myThread.join() 方法的线程对象(myThread)是否存活
while (isAlive()) {
// 阻塞当前线程,即在哪个线程调用的 Thread.join() 方法,阻塞哪个线程
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
  • join() 是同步方法,如果进入此方法,说明其他线程持有此线程对象的监视器。
  • isAlive() 是 Thread 中的方法,用来判断线程是否存活。当线程不存活时,会调用 notifyAll() 唤醒其他线程;
  • wait() 是 Object 中的方法,使当前线程等待,直到唤醒;
1
2
3
4
5
6
7
8
9
10
11
12
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
ensure_join(this);
}

static void ensure_join(JavaThread* thread) {
// Clear the native thread instance - this makes isAlive return false and allows the join()
// to complete once we've done the notify_all below
// 会让 isAlive() 方法返回 false
java_lang_Thread::set_thread(threadObj(), NULL);
// 调用了 thread.notifyAll,让 wait 返回了
lock.notify_all(thread);
}

interrupt()

中断调用 wait(),join(),sleep() 的线程,接收到 InterruptedException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0();// Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

private native void interrupt0();

public boolean isInterrupted() {
return isInterrupted(false);
}

private native boolean isInterrupted(boolean ClearInterrupted);

Thread.interrupted()

中断当前线程,并清除中断标记。

1
2
3
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

setDefaultUncaughtExceptionHandler()

给线程设置异常处理器,当线程出现异常时,回调 UncaughtExceptionHandler#uncaughtException() 方法,获取异常信息。

线程优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;

public final void setPriority(int newPriority) {
ThreadGroup g;
checkAccess();
if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
throw new IllegalArgumentException();
}
if((g = getThreadGroup()) != null) {
if (newPriority > g.getMaxPriority()) {
newPriority = g.getMaxPriority();
}
setPriority0(priority = newPriority);
}
}

private native void setPriority0(int newPriority);

ThreadLocal

ThreadLocal(线程局部变量)为每个线程指定了一个新的对象,消除了多线程访问同一对象可能导致的数据不安全问题。

相关 Api

Runnable

1
2
3
public interface Runnable {
public abstract void run();
}

run() 方法无返回值,无异常处理。

Callable

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

call() 有返回值,有异常处理。

Future

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

表示异步计算,有取消任务,判断任务是否完成和获取结果的功能。

RunnableFuture

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

接口,继承 Runnable,可直接使用 Thread。

FutureTask

1
public class FutureTask<V> implements RunnableFuture<V> {}

Future 实现类。可包装 Runnable 和 Callable,提交到线程池。

CompletionService

1
2
3
4
5
6
7
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

ExecutorCompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;

public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
}

CompletionService 实现类,基于线程池和阻塞队列。可管理一组任务,获取任务结果。

ThreadLocalRandom

如何停止线程

线程运行后无法停止,直到 run() 方法返回,即线程执行完毕。可以控制何时或什么条件下让线程执行完毕,避免 CPU 等资源的消耗。

  1. 信号量,只适用于不阻塞线程的情况
  2. 中断

信号量的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class StopThread extends Thread {
private volatile boolean finished = false; // ① volatile条件变量
// BlockingQueue blockingQueue = new LinkedBlockingQueue();

public void stopMe() {
finished = true; // ② 发出停止信号
}

@Override
public void run() {
while (!finished) { // ③ 检测条件变量
// do heavy work // ④业务代码
// blockingQueue.put() // 可能阻塞在此处,不会检查标志位
}
}
}

中断的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class StopThread extends Thread {

@Override
public void run() {
try {
int i = 0;
// 检查是否中断
while (!isInterrupted() && i < 100) {
System.out.println("Run");
sleep(1000);
i++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 只 catch sleep() 不会结束线程
class StopThread extends Thread {

@Override
public void run() {
int i = 0;
// 检查是否中断
while (!isInterrupted() && i < 100) {
System.out.println("Run");
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java 没有提供任何机制来安全地终止线程。但它提供了中断(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的的工作。—— 『Java 并发编程实战』 第 7 章 取消与关闭 p111

导致线程阻塞的场景

  1. 线程进行了休眠:线程调用了 Thread.sleep(int n)方法,线程放弃 CPU,睡眠 n 毫秒,然后恢复运行。不释放对象的监视器。
  2. 等待获取同步锁:线程要执行一段同步代码,由于无法获得相关的同步锁进入了阻塞状态,只有获得了同步锁后才能恢复运行。
  3. 线程执行 wait() 进入阻塞状态:线程执行了一个对象的 wait()方法,进入阻塞状态,只有等到其他线程执行了该对象的 notify()或 notifyAll()方法,才可能将其唤醒。释放了对象的监视器。
  4. 等待相关资源:线程执行 I/O 操作或进行远程通信时,会因为等待相关的资源而进入阻塞状态。(例如,当线程执行 System.in.read()方法时,如果用户没有向控制台输入数据,则该线程会一直等读到了用户的输入数据才从 read()方法返回。进行远程通信时,在客户程序中,线程在以下情况可能进入阻塞状态。)

分析工具

jconsole

jstack

jstack 用于打印出给定的 java 进程 ID 或 core file 或远程调试服务的 Java 堆栈信息。

总结

  1. Thread 初始化时会被添加到指定或父线程的 ThreadGroup 中进行管理。
  2. Thread 真正启动是一个 native 函数完成的。

参考

[1] 图解 Java 多线程
[2] Java 内存模型
[3] Java 线程的 6 种状态及切换(透彻讲解)