Java 并发之闭锁 栅栏 信号量 交换器

记录以下 Java 中的并发工具类闭锁 栅栏 信号量和交换器的基本使用。

闭锁(CountDownLatch)

CountDownLatch 允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。闭锁是一次性对象,一旦进入终止状态,就不能被重置。
CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。
每当一个线程完成了自己的任务后,计数器的值就会减 1。当计数器值到达 0 时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

主要方法:

  1. CountDownLatch#await() 将某个线程阻塞直到计数器 count=0 才恢复执行。
  2. CountDownLatch#countDown() 将计数器 count 减 1。

使用场景:

  1. 开始执行前等待若干线程完成各自的任务或需要其他线程的结果。
  2. 计算并发执行某个任务的耗时。
  3. 死锁检测;

源码分析:

基于 AbstractQueuedSynchronizer(AQS) 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

}

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static void sampleCountDownLatch() {
final CountDownLatch downLatch = new CountDownLatch(2);

ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(() -> {
try {
//阻塞直到 count == 0
//downLatch.await();
boolean isZero = downLatch.await(1, TimeUnit.SECONDS);
if (isZero) {
System.out.println("准备工作已全部完成,可以开始工作了。");
} else {
System.out.println("准备工作未全部完成,但是已经超时。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

threadPool.execute(() -> {
System.out.println("线程 A 开始。" + System.currentTimeMillis());
sleep(4000);
System.out.println("线程 A 结束。" + System.currentTimeMillis());
downLatch.countDown();
});

threadPool.execute(() -> {
System.out.println("线程 B 开始。" + System.currentTimeMillis());
sleep(2000);
System.out.println("线程 B 结束。" + System.currentTimeMillis());
downLatch.countDown();
});
}

栅栏(CyclicBarrier)

用于阻塞一组线程直到某个事件发生。所有线程必须同时到达栅栏位置才能继续执行下一步操作,且能够被重复利用

源码分析:

基于 ReentrantLockCondition 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class CyclicBarrier {
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
private final Runnable barrierCommand;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public int await() throws InterruptedException, BrokenBarrierException {
return dowait(false, 0L);
}

public int await(long timeout, TimeUnit unit) {
return dowait(true, unit.toNanos(timeout));
}

private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//还剩余多少个线程在运行
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//执行回调
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 如果还有剩余线程运行,则进入死循环
for (;;) {
if (!timed)
trip.await();//没有超时,一致等待
else if (nanos > 0L)
//更新剩余时间
nanos = trip.awaitNanos(nanos);

//超时时间已到
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();//释放锁
}
}

}

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private static void sampleCyclicBarrier() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);

CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("都就绪了,开干!!!");
}
});

threadPool.execute(new Worker("[Thread-A] ", 6000, barrier));
threadPool.execute(new Worker("[Thread-B] ", 2000, barrier));
threadPool.execute(new Worker("[Thread-C] ", 3000, barrier));
}

private static class Worker implements Runnable {
private String name;
private CyclicBarrier barrier;
private long duration;

public Worker(String name, long duration, CyclicBarrier barrier) {
this.name = name;
this.duration = duration;
this.barrier = barrier;
}

@Override
public void run() {
System.out.println(name + "开始执行耗时任务。time: " + System.currentTimeMillis());
sleep(duration);
System.out.println(name + "完成耗时任务。time: " + System.currentTimeMillis());
try {
int remainCount = barrier.await();
System.out.printf(Locale.US, "%s是第%s个完成任务,耗时%s ms.%n", name, barrier.getParties() - remainCount, duration);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

Output:

1
2
3
4
5
6
7
8
9
10
[Thread-A] 开始执行耗时任务。time: 1612688907427
[Thread-B] 开始执行耗时任务。time: 1612688907427
[Thread-C] 开始执行耗时任务。time: 1612688907427
[Thread-B] 完成耗时任务。time: 1612688909427
[Thread-C] 完成耗时任务。time: 1612688910427
[Thread-A] 完成耗时任务。time: 1612688913428
都就绪了,开始!!!
[Thread-A] 是第3个完成任务,耗时6000 ms.
[Thread-B] 是第1个完成任务,耗时2000 ms.
[Thread-C] 是第2个完成任务,耗时3000 ms.

闭锁 CountDownLatch 和栅栏 CyclicBarrier 区别:

  1. 闭锁 CountDownLatch 做减计数,而栅栏 CyclicBarrier 则是加计数。
  2. CountDownLatch 是一次性的,CyclicBarrier 可以重用。
  3. CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,等大家都完成。
  4. CyclicBarrier 在一些场景中可以替代 CountDownLatch 实现类似的功能。

信号量(Semaphore)

信号量是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
通过许可(permits)控制操作资源的数量,同时可以指定是否公平。

使用场景:

可以用于做流量控制,特别公用资源有限的应用场景。比如数据库连接。

源码分析:

基于 AbstractQueuedSynchronizer(AQS) 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Semaphore implements java.io.Serializable {
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void release() {
sync.releaseShared(1);
}
}

交换器(Exchanger)

移相器(Phaser)