记录以下 Java 中的并发工具类闭锁 栅栏 信号量和交换器的基本使用。
闭锁(CountDownLatch) CountDownLatch 允许一个或多个线程一直等待
,直到其他线程的操作执行完后再执行。闭锁是一次性对象
,一旦进入终止状态,就不能被重置。 CountDownLatch 是通过一个计数器
来实现的,计数器的初始值为线程的数量。 每当一个线程完成了自己的任务后,计数器的值就会减 1。当计数器值到达 0 时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
主要方法:
CountDownLatch#await() 将某个线程阻塞直到计数器 count=0 才恢复执行。
CountDownLatch#countDown() 将计数器 count 减 1。
使用场景:
开始执行前等待若干线程完成各自的任务或需要其他线程的结果。
计算并发执行某个任务的耗时。
死锁检测;
源码分析:
基于 AbstractQueuedSynchronizer(AQS)
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } }
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private static void sampleCountDownLatch () { final CountDownLatch downLatch = new CountDownLatch (2 ); ExecutorService threadPool = Executors.newFixedThreadPool(3 ); threadPool.execute(() -> { try { boolean isZero = downLatch.await(1 , TimeUnit.SECONDS); if (isZero) { System.out.println("准备工作已全部完成,可以开始工作了。" ); } else { System.out.println("准备工作未全部完成,但是已经超时。" ); } } catch (InterruptedException e) { e.printStackTrace(); } }); threadPool.execute(() -> { System.out.println("线程 A 开始。" + System.currentTimeMillis()); sleep(4000 ); System.out.println("线程 A 结束。" + System.currentTimeMillis()); downLatch.countDown(); }); threadPool.execute(() -> { System.out.println("线程 B 开始。" + System.currentTimeMillis()); sleep(2000 ); System.out.println("线程 B 结束。" + System.currentTimeMillis()); downLatch.countDown(); }); }
栅栏(CyclicBarrier) 用于阻塞一组线程直到某个事件发生
。所有线程必须同时
到达栅栏位置才能继续执行下一步操作,且能够被重复利用
。
源码分析:
基于 ReentrantLock
和 Condition
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock (); private final Condition trip = lock.newCondition(); private final Runnable barrierCommand; public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public int await () throws InterruptedException, BrokenBarrierException { return dowait(false , 0L ); } public int await (long timeout, TimeUnit unit) { return dowait(true , unit.toNanos(timeout)); } private int dowait (boolean timed, long nanos) { final ReentrantLock lock = this .lock; lock.lock(); try { int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } } }
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private static void sampleCyclicBarrier () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CyclicBarrier barrier = new CyclicBarrier (3 , new Runnable () { @Override public void run () { System.out.println("都就绪了,开干!!!" ); } }); threadPool.execute(new Worker ("[Thread-A] " , 6000 , barrier)); threadPool.execute(new Worker ("[Thread-B] " , 2000 , barrier)); threadPool.execute(new Worker ("[Thread-C] " , 3000 , barrier)); } private static class Worker implements Runnable { private String name; private CyclicBarrier barrier; private long duration; public Worker (String name, long duration, CyclicBarrier barrier) { this .name = name; this .duration = duration; this .barrier = barrier; } @Override public void run () { System.out.println(name + "开始执行耗时任务。time: " + System.currentTimeMillis()); sleep(duration); System.out.println(name + "完成耗时任务。time: " + System.currentTimeMillis()); try { int remainCount = barrier.await(); System.out.printf(Locale.US, "%s是第%s个完成任务,耗时%s ms.%n" , name, barrier.getParties() - remainCount, duration); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
Output:
1 2 3 4 5 6 7 8 9 10 [Thread-A] 开始执行耗时任务。time: 1612688907427 [Thread-B] 开始执行耗时任务。time: 1612688907427 [Thread-C] 开始执行耗时任务。time: 1612688907427 [Thread-B] 完成耗时任务。time: 1612688909427 [Thread-C] 完成耗时任务。time: 1612688910427 [Thread-A] 完成耗时任务。time: 1612688913428 都就绪了,开始!!! [Thread-A] 是第3个完成任务,耗时6000 ms. [Thread-B] 是第1个完成任务,耗时2000 ms. [Thread-C] 是第2个完成任务,耗时3000 ms.
闭锁 CountDownLatch 和栅栏 CyclicBarrier 区别:
闭锁 CountDownLatch 做减计数,而栅栏 CyclicBarrier 则是加计数。
CountDownLatch 是一次性的,CyclicBarrier 可以重用。
CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,等大家都完成。
CyclicBarrier 在一些场景中可以替代 CountDownLatch 实现类似的功能。
信号量(Semaphore) 信号量是用来控制同时访问
特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。 通过许可(permits)
控制操作资源的数量,同时可以指定是否公平。
使用场景:
可以用于做流量控制,特别公用资源有限的应用场景。比如数据库连接。
源码分析:
基于 AbstractQueuedSynchronizer(AQS)
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class Semaphore implements java .io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} public Semaphore (int permits) { sync = new NonfairSync (permits); } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync (permits) : new NonfairSync (permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void release () { sync.releaseShared(1 ); } }
交换器(Exchanger) 移相器(Phaser)