广东省建设信息网网站,南通网站建设南通,沧州做网站哪家好,做推广哪个网站最热门同步工具类:CyclicBarrier介绍源码分析CyclicBarrier 基于ReetrantLock Condition实现。构造函数await() 函数业务场景方案一:代码实现测试截图方案二代码实现测试打印总结介绍
官方介绍: 一种同步辅助工具#xff0c;允许一组线程都等待对方到达共同的障碍点。CyclicBarrie…
同步工具类:CyclicBarrier介绍源码分析CyclicBarrier 基于ReetrantLock Condition实现。构造函数await() 函数业务场景方案一:代码实现测试截图方案二代码实现测试打印总结介绍
官方介绍: 一种同步辅助工具允许一组线程都等待对方到达共同的障碍点。CyclicBarrier在涉及固定大小的线程组的程序中非常有用这些线程组偶尔必须彼此等待。该屏障被称为循环屏障因为它可以在释放等待线程后重新使用。 CyclicBarrier支持可选的Runnable命令该命令在参与方中的最后一个线程到达后但在释放任何线程之前在每个障碍点运行一次。此屏障动作对于在任何一方继续之前更新共享状态都很有用。 通俗理解 它可以协同多个线程让多个线程在这个栅栏前等待直到所有线程都达到了这个栅栏时再一起继续执行后面的动作. 举个例子你和朋友约定在公交站汇合去公园玩。这个公交站相当于栅栏。只有你们都到了公交站才一起去公园。
源码分析
CyclicBarrier 基于ReetrantLock Condition实现。 /** The lock for guarding barrier entry *///用于线程之间互相唤醒private final ReentrantLock lock new ReentrantLock();/** Condition to wait on until tripped */private final Condition trip lock.newCondition();//总线程数private final int parties;
构造函数
可以看到不仅可以传入 参与方的总数量(即 parties)。还可以传入一个回调函数当所有的线程被唤醒时barrierAction 被执行该参数可以为空。 /*** Creates a new {code CyclicBarrier} that will trip when the* given number of parties (threads) are waiting upon it, and which* will execute the given barrier action when the barrier is tripped,* performed by the last thread entering the barrier.** param parties the number of threads that must invoke {link #await}* before the barrier is tripped* param barrierAction the command to execute when the barrier is* tripped, or {code null} if there is no action* throws IllegalArgumentException if {code parties} is less than 1*/public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;}await() 函数
1.CyclicBarrier 是可以被重用的。 2.CyclicBarrier 会响应中断N 个线程还没有到齐如果有线程收到了中断信号所有阻塞的线程也会被唤醒。也就是 breakBarrier函数。然后count 被重置为初始值(parties),重新开始 3.构造函数传入的回调函数barrierAction 只会被最后一个线程执行一次。 public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock this.lock;lock.lock();try {final Generation g generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) { //响应中断breakBarrier(); //唤醒所有阻塞的线程throw new InterruptedException();}int index --count; //每个线程调用一次await(). count 减一当count0时则唤醒其他的所有线程if (index 0) { // trippedboolean ranAction false;try {final Runnable command barrierCommand;if (command ! null)// 一起唤醒之和如果回调函数不为空还需要执行回调函数command.run();ranAction true;nextGeneration();//唤醒其他所有线程并将count值复原。//用于下一次的CyclicBarrier.这是可以复用的原因return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out//当count0,说明 人没有到齐需要阻塞自己for (;;) {try {if (!timed)trip.await();//当阻塞自己的时候await方法会释放锁这样其他线程调用await方法时会执行--countelse if (nanos 0L)nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {//响应中断如果有线程收到了中断信号所有的阻塞线程也会被唤醒。if (g generation ! g.broken) {breakBarrier();throw ie;} else {// Were about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// belong to subsequent execution.//如果不是响应的中断说明是被 sigalAll唤醒。则自己唤醒Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g ! generation)//从阻塞中被唤醒然后返回return index;if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}private void nextGeneration() {// signal completion of last generation// 唤醒所有阻塞的线程trip.signalAll();// set up next generation// 设置初始值开始下一个轮回count parties;generation new Generation();}业务场景
10 个求职者一起来公司应聘招聘方式为笔试和面试。首先需要等10个人到期后开始笔试笔试结束之后再一起参加面试。把10个人看作10个线程。如图所示:
方案一:
采用一个CyclicBarrier.重复实现两次等待
代码实现
class Solver {public static void main(String[] args) {CyclicBarrier barriernew CyclicBarrier(10);for (int i0;i10;i){//开启10个线程模拟10个求职者new Thread(new JobHunt(barrier)).start();}}
}class JobHunt implements Runnable {private CyclicBarrier cyclicBarrier;public JobHunt(CyclicBarrier cyclicBarrier) {this.cyclicBarrier cyclicBarrier;}Overridepublic void run() {//赶来公司路上doOnTheWay();//到公司后看人是否到齐如果没有到齐就阻塞// 到齐了就开始笔试try {System.out.println(Thread.currentThread().getName() 已经来公司了...);cyclicBarrier.await();doWriteExam();System.out.println(Thread.currentThread().getName() 笔试做完了....);cyclicBarrier.await();doInterview();System.out.println(Thread.currentThread().getName() 面试完啦.....);} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}/*** 模拟在路上方法*/public void doOnTheWay(){doCostTime(2000);}/*** 模拟笔试过程*/public void doWriteExam(){doCostTime(3000);}/*** 模拟面试过程*/public void doInterview(){doCostTime(5000);}private void doCostTime(int time){Random randomnew Random();try {//随机休眠时间int countrandom.nextInt(time);// System.out.println(count);Thread.sleep(count);} catch (InterruptedException e) {e.printStackTrace();}}}测试截图
从截图中我们可以看出CyclicBarrier 实现了大家一起等待直至人到齐了再去一起做笔试或者面试。
方案二
由于两次等待结束后打印的消息不一样。所以我们采用两个 CyclicBarrier。分别传入不同的 barrierAction来实现自定义的 等待结束后的打印事件。
代码实现
class Solver {public static void main(String[] args) {//将笔试等待的回调函数传入CyclicBarrier barrierOnWriteExamnew CyclicBarrier(10,new BarrierActionOnWriteExam());//将面试等待的回调函数传入CyclicBarrier barrierOnInterviewnew CyclicBarrier(10,new BarrierActionOnInterview());for (int i0;i10;i){//开启10个线程模拟10个求职者new Thread(new JobHunt(barrierOnWriteExam,barrierOnInterview)).start();}}
}class JobHunt implements Runnable {private CyclicBarrier cyclicBarrierOnWriteExam;private CyclicBarrier cyclicBarrierOnInterview;public JobHunt(CyclicBarrier cyclicBarrierOnWriteExam,CyclicBarrier cyclicBarrierOnInterview) {this.cyclicBarrierOnWriteExam cyclicBarrierOnWriteExam;this.cyclicBarrierOnInterview cyclicBarrierOnInterview;}Overridepublic void run() {//赶来公司路上doOnTheWay();//到公司后看人是否到齐如果没有到齐就阻塞// 到齐了就开始笔试try {System.out.println(Thread.currentThread().getName() 已经来公司了...);cyclicBarrierOnWriteExam.await();doWriteExam();System.out.println(Thread.currentThread().getName() 笔试做完了....);cyclicBarrierOnInterview.await();doInterview();System.out.println(Thread.currentThread().getName() 面试完啦.....);} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}/*** 模拟在路上方法*/public void doOnTheWay(){doCostTime(2000);}/*** 模拟笔试过程*/public void doWriteExam(){doCostTime(3000);}/*** 模拟面试过程*/public void doInterview(){doCostTime(5000);}private void doCostTime(int time){Random randomnew Random();try {//随机休眠时间int countrandom.nextInt(time);// System.out.println(count);Thread.sleep(count);} catch (InterruptedException e) {e.printStackTrace();}}}class BarrierActionOnWriteExam implements Runnable{Overridepublic void run() {//自定义等待完成后的回调函数System.out.println(大家人到齐了,开始笔试吧);}}class BarrierActionOnInterview implements Runnable{Overridepublic void run() {//自定义等待完成后的回调函数System.out.println(大家人到齐了,开始面试吧);}
}测试打印
通过打印结果可以看到首先是能正确实现效果。其次 是通过传入 回调事件参数给 CyclicBarrier可以很方便实现 自己的业务逻辑。
总结
虽然 CountDownLatch 和CyclicBarrier 都能实现多个线程一起等待然后一起做某些事情。 CountDownLatch 更多的是 一个主线程等待 分支线程完成。然后主线程去做其他事情。 CyclicBarrier 是 大家分别做某些事情等每个人都做完后大家再一起去做另外一件事情。 并且两者实现的 原理完全不同。 希望通过本文大家能对 CyclicBarrier 有个更加理性的认识。多敲敲小demo。看能否有优化的地方。这样才能更好的理解。 CountDownLatch 学习的地址: https://blog.csdn.net/echohuangshihuxue/article/details/129280219