网站内容建设平面设计,seo建站系统,重庆seo扣费,闻喜网站建设1.CountDownLatch
1.1 什么是CountDownLatch
CountDownLatch是一个同步工具类#xff0c;用来协调多个线程之间的同步#xff0c;或者说起到线程之间的通信#xff08;而不是用作互斥的作用#xff09;。
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之…1.CountDownLatch
1.1 什么是CountDownLatch
CountDownLatch是一个同步工具类用来协调多个线程之间的同步或者说起到线程之间的通信而不是用作互斥的作用。
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后计数器的值就会减一。当计数器的值为0时表示所有的线程都已经完成一些任务然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
1.2 CountDownLatch与join
使用join同样可以达到线程同步的效果但是调用thread.join() 方法必须等thread 执行完毕当前线程才能继续往下执行而CountDownLatch通过计数器提供了更灵活的控制只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕。
而且如果我们使用线程池的话就没有办法直接调用线程的join方法了。所有另一方面来说CountDownLatch要比join更加灵活。
1.3 CountDownLatch的基本使用
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch latch new CountDownLatch(5);for (int i0; i5; i) {new Thread(new Runnable() {Overridepublic void run() {System.out.println(Thread.currentThread().getName() 运行);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}}).start();}System.out.println(等待子线程运行结束);latch.await();System.out.println(子线程运行结束);}
}这里主线程会阻塞在latch.await()直到CountDownLatch技术为0。
1.4 CountDownLatch原理剖析
CountDownLatch类图 从上面我们可以直到Sync继承了AQS类CountDownLatch又持有一个成员变量Sync所有我们可以直到CountDownLatch是基于AQS实现的。
通过构造方法我们又可以得知CountDownLatch计数器的值赋给了AQS的state变量。
public CountDownLatch(int count) {if (count 0) throw new IllegalArgumentException(count 0);this.sync new Sync(count);
}Sync(int count) {setState(count);
}await()
当线程调用await方法以后当前线程会被阻塞直到下面情况之一时才会返回
当所有的线程都调用了CountDownLatch的countDown方法后也就是计数器为0时其他线程调用了当前线程的interrupt()方法中断了当前线程当前线程会抛出异常然后返回
// CountDownLatch的await()实现
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AQS中获取共享资源可被中断的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 判断当前线程是否已被中断如果是则抛出异常if (Thread.interrupted())throw new InterruptedException();// state不为0意味着CountDownLatch还没有减到0// 则执行AQS的doAcquireSharedInterruptibly方法让当前线程进入AQS队列if (tryAcquireShared(arg) 0)doAcquireSharedInterruptibly(arg);
}// CountDownLatch中的Sync中tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {// 当前state如果为0则返回1否则返回-1return (getState() 0) ? 1 : -1;
}由上述代码我们可以知道线程获取资源时可以被中断并且获取的是共享资源。
名为await的方法还有一个不过多个参数也就是指定时间后调用await(long timeout, TimeUnit unit)的线程会超时而返回false如果是正常返回的那么返回值就为true。
public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}countDown()
线程调用该方法以后计数器的值会递减递减后如果计数器的值为0那么就会唤醒所有因调用await方法而阻塞的线程否则什么都不做。
// CountDownLatch中的countDown方法
public void countDown() {sync.releaseShared(1);
}// AQS中的方法
public final boolean releaseShared(int arg) {// 调用Sync实现的tryReleaseShared方法if (tryReleaseShared(arg)) {// AQS中释放资源的方法doReleaseShared();return true;}return false;
}// Sync中重写的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {// 循环进行CAS操作将state做减一操作失败则一直重试for (;;) {// 获得当前的state变量int c getState();// 如果state已经等于0那么直接返回falseif (c 0)return false;// 将state-1int nextc c-1;// CAS操作修改state成功以后判断state是否已经为0为0则返回true// 再接下来就会调用AQS中的doReleaseShared方法释放资源if (compareAndSetState(c, nextc))return nextc 0;}
}2.CyclicBarrier
2.1 什么是CyclicBarrier
从字面理解CyclicBarrier就是回环屏障的意思它可以让一组线程达到一个状态后再同时执行。
回环的意思是在所有线程执行完毕以后会重置CyclicBarrier的状态使它可以被重用屏障的意思是线程调用await方法后都会被阻塞这个阻塞点就被称为屏障等到所有屏障都调用了await方法后线程们就会冲破屏障继续向下运行
2.2 CyclicBarrier的基本使用
public class CycleBarrierTest {private static CyclicBarrier cyclicBarrier new CyclicBarrier(2, new Runnable() {// 当计数器为0时立即执行Overridepublic void run() {System.out.println(汇总线程 Thread.currentThread().getName() 任务合并。);}});public static void main(String[] args) {ExecutorService executorService Executors.newFixedThreadPool(2);// 将线程A添加到线程池executorService.submit(new Runnable() {Overridepublic void run() {try {System.out.println(线程A Thread.currentThread().getName() 执行任务。);System.out.println(线程A到达屏障点);cyclicBarrier.await();System.out.println(线程A退出屏障点);} catch (Exception e) {e.printStackTrace();}}});// 将线程B添加到线程池executorService.submit(new Runnable() {Overridepublic void run() {try {System.out.println(线程B Thread.currentThread().getName() 执行任务。);System.out.println(线程B到达屏障点);cyclicBarrier.await();System.out.println(线程B退出屏障点);} catch (Exception e) {e.printStackTrace();}}});// 调用线程池的shutdown方法关闭线程池// 该方法会使线程池从RUNNING状态转变为SHUTDOWN状态// SHUTDOWN状态意味着不再接收新的任务但是会对任务队列中的任务进行处理executorService.shutdown();}
}执行结果为
线程Apool-1-thread-1执行任务。
线程A到达屏障点
线程Bpool-1-thread-2执行任务。
线程B到达屏障点
汇总线程pool-1-thread-2 任务合并。
线程B退出屏障点
线程A退出屏障点上面的例子说明了多个线程之间是相互等待的假如计数器值为N那么随后调用 await 方法的 N–1 个线程都会因为到达屏障点而被阻塞当第 N 个线程调用 await 后计 数器值为 0 了这时候第 N 个线程才会发出通知唤醒前面的 N–1 个线程。也就是当全部 线程都到达屏障点时才能一块继续向下执行。不过这个例子并没有体现出可重用性不过这个其实也很好理解就是可以反复使用感兴趣的同学可以自己去了解一下。
2.3 CyclicBarrier原理剖析
CyclicBarrier类图 由类图可知CyclicBarrier基于独占锁实现本质还是基于AQS实现的。 Generation内部有一个变量broken用来记录当前屏障是否被打破因为内部使用重入锁保证了线程安全所以该属性不需要使用volatile修饰 parties用来记录线程个数意味着parties个线程调用await方法以后才会“冲破屏障” count一开始等于parties每当有线程调用await方法就会减一当count为零就意味着所有线程到达了屏障点
使用两个变量的原因就是为了达成CyclicBarrier的复用性当count计数为0以后会将parties重新赋值给count从而进行复用。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;
}同时通过构造函数我们也可以直到我们可以传递一个任务而这个任务的执行时机是当所有的线程都到达屏障点以后。
await()
当线程调用await方法被阻塞直到满足以下条件之一时就会返回
parties个线程调用了await方法也就是所有线程都到达了屏障点其他线程调用了该线程的interrupt方法中断了该线程与当前屏障点关联的Generation对象的broken标志被设置为true时
public int await() throws InterruptedException, BrokenBarrierException {try {// 调用dowait方法阻塞当前线程第一个参数为false表示第二个参数不生效return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {// 指定timeout后会自动返回return dowait(true, unit.toNanos(timeout));
}dowait()
该方法实现了CyclicBarrier的核心功能。
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {// 获得锁并进行加锁final ReentrantLock lock this.lock;lock.lock();try {final Generation g generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 如果index0就说明所有的线程都到达了屏障点此时开始执行初始化传递的任务barrierActionint index --count;if (index 0) { boolean ranAction false;try {final Runnable command barrierCommand;// 执行任务if (command ! null)command.run();ranAction true;// 激活其他因调用await阻塞的线程并且重置了CyclicBarriernextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// 如果index!0for (;;) {try {// 没有设置超时时间的操作if (!timed)trip.await();// 设置了超时时间的操作else if (nanos 0L)nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g ! generation)return index;if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放锁lock.unlock();}
}// 看到这里你应该就明白了为什么CyclicBarrier可以被复用
private void nextGeneration() {// 唤醒条件队列中的阻塞线程trip.signalAll();// 重置CyclicBarriercount parties;generation new Generation();
}3.CountDownLatch与CyclicBarrier
CountDownLatch计数器不能重置CyclicBarrier可以重置循环利用。CountDownLatch是基于AQS的共享模式实现的CyclicBarrier是基于ReentrantLock和Condition实现的。两者最大的区别是进行下一步动作的动作实施者是不一样的。这里的“动作实施者”有两种一种是主线程即执行main函数另一种是执行任务的其他线程后面叫这种线程为“其他线程”区分于主线程。对于CountDownLatch当计数为0的时候下一步的动作实施者是main函数对于CyclicBarrier下一步动作实施者是“其他线程”。