当前位置: 首页 > news >正文

做网站建设的wordpress 浮动导航插件

做网站建设的,wordpress 浮动导航插件,网站数据库怎么建,网络营销是什么等综合因素促成这里续写上一章博客 Phaser新特性 #xff1a; 特性1#xff1a;动态调整线程个数 CyclicBarrier 所要同步的线程个数是在构造方法中指定的#xff0c;之后不能更改#xff0c;而 Phaser 可以在运行期间动态地 调整要同步的线程个数#xff0c;Phaser 提供了下面这些方…这里续写上一章博客 Phaser新特性 特性1动态调整线程个数 CyclicBarrier 所要同步的线程个数是在构造方法中指定的之后不能更改而 Phaser 可以在运行期间动态地 调整要同步的线程个数Phaser 提供了下面这些方法来增加、减少所要同步的线程个数 register() // 注册一个 bulkRegister(int parties) // 注册多个 arriveAndDeregister() // 解除注册//这里了解即可就不多说了public int register() {return doRegister(1);}public int bulkRegister(int parties) {if (parties 0)throw new IllegalArgumentException();if (parties 0)return getPhase();return doRegister(parties);}public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);}private static final int ONE_ARRIVAL 1;private static final int ONE_PARTY 1 PARTIES_SHIFT;private static final int ONE_DEREGISTER ONE_ARRIVAL|ONE_PARTY;private static final int EMPTY 1;private static final int PARTIES_SHIFT 16;特性2层次Phaser 多个Phaser可以组成如下图所示的树状结构可以通过在构造方法中传入父Phaser来实现 public Phaser(Phaser parent, int parties) {// ...}先简单看一下Phaser内部关于树状结构的存储如下所示 private final Phaser parent;可以发现在Phaser的内部结构中每个Phaser记录了自己的父节点但并没有记录自己的子节点列表所 以每个 Phaser 知道自己的父节点是谁但父节点并不知道自己有多少个子节点对父节点的操作是通过子节 点来实现的 树状的Phaser怎么使用呢考虑如下代码会组成下图的树状Phaser Phaser root new Phaser(2); Phaser c1 new Phaser(root, 3); Phaser c2 new Phaser(root, 2); Phaser c3 new Phaser(c1, 0);public Phaser() {this(null, 0);}//一般这个使用的最多 public Phaser(int parties) {this(null, parties);}public Phaser(Phaser parent) {this(parent, 0);}//上面最终都会操作这个这里了解即可 public Phaser(Phaser parent, int parties) {if (parties PARTIES_SHIFT ! 0)throw new IllegalArgumentException(Illegal number of parties);int phase 0;this.parent parent;if (parent ! null) {final Phaser root parent.root;this.root root;this.evenQ root.evenQ;this.oddQ root.oddQ;if (parties ! 0)phase parent.doRegister(1);}else {this.root this;this.evenQ new AtomicReferenceQNode();this.oddQ new AtomicReferenceQNode();}this.state (parties 0) ? (long)EMPTY :((long)phase PHASE_SHIFT) |((long)parties PARTIES_SHIFT) |((long)parties);}本来root有两个参与者参与者对应的参数值即线程上限然后为其加入了两个子Phaserc1c2每个子Phaser会算作1个参与者root的 参与者就变成224个c1本来有3个参与者为其加入了一个子Phaser c3参与者数量变成314个c3的参与 者初始为0后续可以通过调用register()方法加入 对于树状Phaser上的每个节点来说可以当作一个独立的Phaser来看待其运作机制和一个单独的Phaser是 一样的父Phaser并不用感知子Phaser的存在当子Phaser中注册的参与者数量大于0时会把自己向父节点注册当 子Phaser中注册的参与者数量等于0时会自动向父节点解除注册 简单来说就是父Phaser把子Phaser当作一个正常参与的线程 就即可 state变量解析 大致了解了Phaser的用法和新特性之后下面仔细剖析其实现原理Phaser没有基于AQS来实现但具备AQS的核⼼特性state变量、CAS操作、阻塞队列所以AQS是利用CAS的一般来说大多数的操作都是使用AQS但是我们说成CAS也行因为CAS是主要核心我们先从state变量说起 private volatile long state;这个64位long的的state变量被拆成4部分下图为state变量各部分 最高位一般代表最后面的一位比如100那么1就是最高位若是0则表示未同步完成若是1则表示同步完成初始最高位为0 Phaser提供了一系列的成员方法来从state中获取上图中的⼏个数字如下所示 //获取当前的轮数当前轮数同步完成返回值是一个负数最高位为1 public final int getPhase() {return (int)(root.state PHASE_SHIFT);//当前phase未完成返回值也是一个负数最高位为1}private final Phaser root; private volatile long state; private static final int PHASE_SHIFT 32; private static final int PARTIES_SHIFT 16; //在前面的构造方法中有个这个 /* 前面有int phase 0;this.state (parties 0) ? (long)EMPTY :((long)phase PHASE_SHIFT) |((long)parties PARTIES_SHIFT) |((long)parties);当对应的传递的参数是5时即parties是5时那么操作后面的即最终结果是032|516|5结果是327685那么this.state就是this.state而对应的this.root this;前提是没有父phaser否则就是final Phaser root parent.root;this.root root; 为父的root即操作父的state对于032|516|5来说优先级大于|具体的和|的作用可以参照第8章博客的说明 其中032是0516是327680因为是1010000000000000000327680然后最终结果是|5所以会加上5即327685 *///经过上面的说明我们假设是第一个phaser且参数是5那么 /* public final int getPhase() {return (int)(root.state PHASE_SHIFT);}其中就是32768532即101000000000000010132对应int来说那么结果是不变的无论是正数还是负数因为32的移动就是不变的这是固定的因为超出其存储范围的位移是没有意义的所以在移位中32这里是32因为通常是int类型相当于0那么正数或者负数移动0位自然是不变的同理那么33相当于1一般来说移动负数值不包括0而不是正数值通常也包括0之所以要写包括0因为比0大的数叫正数0本身不算正数默认返回值是0即那么移动负数值默认返回值就是0因为一般是不能移动负数的即最小是0但是我们观察可以发现他是long类型而在long中64位就是不变的32则就是移动所以对应的返回值就是0 *///综上所述返回的结果就是0public boolean isTerminated() {return root.state 0L; //当前轮数同步完成最高位为1也就是负数} public int getRegisteredParties() {return partiesOf(state); //获取总注册线程数}private static final int PARTIES_SHIFT 16; private static int partiesOf(long s) {return (int)s PARTIES_SHIFT; //先把state转成32位int不是long了再右移16位不是将值变成int哦因为不是之前的(int)(root.state PHASE_SHIFT);的}public int getUnarrivedParties() {return unarrivedOf(reconcileState()); //获取未到达的线程数}private static final int EMPTY 1; private static final int UNARRIVED_MASK 0xffff; private static int unarrivedOf(long s) {int counts (int)s;//截取低16位return (counts EMPTY) ? 0 : (counts UNARRIVED_MASK);}private long reconcileState() {final Phaser root this.root;long s state;if (root ! this) {int phase, p;// CAS to root phase with current parties, tripping unarrivedwhile ((phase (int)(root.state PHASE_SHIFT)) !(int)(s PHASE_SHIFT) !STATE.weakCompareAndSet(this, s,s (((long)phase PHASE_SHIFT) |((phase 0) ? (s COUNTS_MASK) :(((p (int)s PARTIES_SHIFT) 0) ? EMPTY :((s PARTIES_MASK) | p))))))s state;}return s;}//这些方法了解即可这里就不多说了没有注释的基本只需要了解我们再次的看一下state变量在构造方法中是如何被赋值的 public Phaser(Phaser parent, int parties) {if (parties PARTIES_SHIFT ! 0) //先操作后操作!// 如果parties数超出了最大个数即2的16次方当然也包括2的16次方抛异常throw new IllegalArgumentException(Illegal number of parties);// 初始化轮数为0int phase 0;this.parent parent;if (parent ! null) {final Phaser root parent.root;// 父节点的根节点就是自己的根节点this.root root;// 父节点的evenQ就是自己的evenQthis.evenQ root.evenQ;// 父节点的oddQ就是自己的oddQthis.oddQ root.oddQ;// 如果参与者不是0则向父节点注册自己if (parties ! 0)phase parent.doRegister(1);}else {// 如果父节点为null则自己就是root节点this.root this;// 创建奇数节点this.evenQ new AtomicReferenceQNode();// 创建偶数节点this.oddQ new AtomicReferenceQNode();}// 位或操作赋值state最高位为0表示同步未完成this.state (parties 0) ? (long)EMPTY :((long)phase PHASE_SHIFT) |((long)parties PARTIES_SHIFT) |((long)parties);}private static final int PARTIES_SHIFT 16;private static final int PHASE_SHIFT 32;private static final int EMPTY 1;当parties0时state被赋予一个EMPTY常量常量为1 当parties ! 0时并且没有超过2的16次方那么把phase值左移32位把parties左移16位然后parties也作为最低的16位3个值做或操 作赋值给state前面以及说明了流程了 阻塞与唤醒Treiber Stack 基于上述的state变量对其执行CAS操作并进行相应的阻塞与唤醒如下图所示右边的主线程会调用awaitAdvance()进行阻塞左边的arrive()会对state进行CAS的累减操作也的确与之前的CountDownLatch类似当未到达的线程数减到0时唤醒右边阻 塞的主线程 在这里阻塞使用的是一个称为Treiber Stack的数据结构而不是AQS的双向链表Treiber Stack是一个无锁 的栈它是一个单向链表出栈、入栈都在链表头部所以只需要一个head指针而不需要tail指针如下的实 现 //两个引用表示链表的头部通常可以有效的减少或者稍微避免线程冲突线程冲突是指当两个运行在不同线程的操作作用在同一个数据上就会认为是发生了线程冲突都是链表的头指针 private final AtomicReferenceQNode evenQ; private final AtomicReferenceQNode oddQ;static final class QNode implements ForkJoinPool.ManagedBlocker {final Phaser phaser;final int phase;final boolean interruptible;final boolean timed;boolean wasInterrupted;long nanos;final long deadline;//每个Node节点对应一个线程volatile Thread thread; // nulled to cancel waitQNode next; //下一个节点的引用//.. }为了减少并发冲突这里定义了2个链表也就是2个Treiber Stack当phase为奇数轮的时候阻塞线程放在oddQ里面当phase为偶数轮的时候阻塞线程放在evenQ里面代码如下所示后面会再次的说明或者给出他的 private void releaseWaiters(int phase) {QNode q; // first element of queueThread t; // its thread//这个是注意的判断只要尾部是1那么就返回1否则返回0这是的作用具体可以看第8章博客所以如果是奇数那么就是1那么结果就是oddQ反之若不是奇数那么结果就是evenQ所以说当phase为奇数轮的时候阻塞线程放在oddQ里面当phase为偶数轮的时候阻塞线程放在evenQ里面AtomicReferenceQNode head (phase 1) 0 ? evenQ : oddQ;while ((q head.get()) ! null q.phase ! (int)(root.state PHASE_SHIFT)) {if (head.compareAndSet(q, q.next) (t q.thread) ! null) {q.thread null;LockSupport.unpark(t);}}}arrive()方法分析 下面看arrive()方法是如何对state变量进行操作又是如何唤醒线程的 public int arrive() {return doArrive(ONE_ARRIVAL);//private static final int ONE_ARRIVAL 1; }//解除注册的不是解除阻塞 public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);/*private static final int ONE_DEREGISTER ONE_ARRIVAL|ONE_PARTY; //1|6553665537|最后操作因为优先级低具体优先级可以看这个博客https://blog.csdn.net/weixin_58569983/article/details/125900188同一个优先级一般就看位置了一般左边先操作private static final int ONE_ARRIVAL 1;private static final int ONE_PARTY 1 PARTIES_SHIFT; //11665536private static final int PARTIES_SHIFT 16;*/}arrive()和 arriveAndDeregister()内部调用的都是 doArrive方法 区别在于前者只是把未达到线程数减1对应的值默认就是1且基本不能改变至于其final是否可以修改可以看这个博客https://blog.csdn.net/afdafvdaa/article/details/115190755后者则把未到达线程数和下一轮的总线程数都减1我们主要看第一种下面看一下对应的doArrive方法的实现 private int doArrive(int adjust) {final Phaser root this.root;for (;;) {long s (root this) ? state : reconcileState();// private static final int PHASE_SHIFT 32;//若s是327685那么phase0若是int类型自然不变但是这里是long类型那么就是0int phase (int)(s PHASE_SHIFT);if (phase 0)return phase;int counts (int)s;// 获取未到达线程数int unarrived (counts EMPTY) ? 0 : (counts UNARRIVED_MASK); //0xffff是十进制的65535 private static final int UNARRIVED_MASK 0xffff; 即1111 1111 1111 1111一般counts是state那么结果就是counts了而若根据前面的说明若参数是5那么通常是327685655355s是327685即对应的就是其参数的数量也就是线程数量所以他说获取未到达线程数也是合理的// 如果未到达线程数小于等于0抛异常if (unarrived 0)throw new IllegalStateException(badArrive(s));// CAS操作将state的值减去adjust对应的减值操作可以认为是确定前面的未到达线程数和下一轮的总线程数的交替不是真正的减去线程数哦所以后面会继续操作减去这里就不多说了//private static final VarHandle STATE;if (STATE.compareAndSet(this, s, s-adjust)) { //设置这个设置会使得state发生改变使得减1那么s也会改变使得对应的unarrived也会减1当剩下0时他会返回true单纯的减少通常也设置成功不会变成true了因为本身类的原因具体在前面说明过了比如前面说过的因为对应注解可能使用本身类存在的某些方法或者变量即东西但大多数都是上面的说法这个地方使得进入一般来说他只能让一个线程进入然后里面的操作会使得他释放虽然这里不说明// 如果未到达线程数为1if (unarrived 1) {//private static final long PARTIES_MASK 0xffff0000L;16进制的表示方式这里需要加上l否则保存保存因为变成long需要l他本身赋值时默认是认为变成int因为单纯的赋值整数数值注意是整数值基本是这样可能有问题但基本没有默认都认为是int无论是什么形式也要注意不要超过int范围了否则会报错的除非你加上了lL那么在不超过Llong的情况下也不会报错否则就需要某些类如BigDecimal来进行计算报错了但一般不能直接的赋值16进制的除非有其他的某些类这里可以百度查看代表后面的0都是0000一般来说16进制的对应的代表如下/* A代表十进制的10二进制的1010 B代表十进制的11二进制的1011 C代表十进制的12二进制的1100 D代表十进制的13二进制的1101 E代表十进制的14二进制的1110 F代表十进制的15二进制的1111*///假设对应是5那么这里就是3276854294901760/*也就是101 00000000 0000010111111111 11111111 00000000 00000000其中 int最大数是2147483647但是总数量是429496729642949017601111111 11111111 11111111 11111111上面两个没有int只是说明即提一下而已通过使得变成1010000000000000000也就是327680了所以n是327680*/long n s PARTIES_MASK; // base of next state//private static final int PARTIES_SHIFT 16;//进行移动那么就是1010000000000000000变成1015int nextUnarrived (int)n PARTIES_SHIFT;if (root this) {if (onAdvance(phase, nextUnarrived))n | TERMINATION_BIT;else if (nextUnarrived 0)n | EMPTY;elsen | nextUnarrived; //n又变成了327685//private static final int MAX_PHASE Integer.MAX_VALUE;//Native public static final int MAX_VALUE 0x7fffffff; 结果也就是2147483647若不用数字f直接的7的话说明就是7的数对应的位置以为最大只能是9前面的A就是10所以0~f总数量就是10616一般我们包括0若是显示具体数字的话一般不包括0的即15个对于显示具体数字来说而显示具体数字就是我们低版本的平常的直觉高版本我们会认为进一位也算比如12345678910这个10我们也算否则按照进制的话是需要包括0的int nextPhase (phase 1) MAX_PHASE; //结果是11111111 11111111 11111111 11111111//也就是/*11111111 11111111 11111111 11111111结果是1所以 nextPhase1*/n | (long)nextPhase PHASE_SHIFT; //private static final int PHASE_SHIFT 32;//132结果是1因为他是int类型所以是1那么设置为0了所以说如果只有一个线程没有到达那么他自然设置为0然后解除阻塞STATE.compareAndSet(this, s, n);//解除阻塞也就是唤醒releaseWaiters(phase); //这里是主要的后面会说明}// 如果下一轮的未到达线程数为0else if (nextUnarrived 0) { // propagate deregistrationphase parent.doArrive(ONE_DEREGISTER);STATE.compareAndSet(this, s, s | EMPTY);}else// 否则调用父节点的doArrive方法传递参数1表示当前节点已完成一般代表真正的减去phase parent.doArrive(ONE_ARRIVAL);}return phase;}}}关于上面的方法有以下⼏点说明 1定义了2个常量如下 当 deregisterfalse 时代表不解除注册只是最低的16位需要减 1s-adjust因为adjONE_ARRIVAL当deregistertrue时代表解除注册32位中 的2个低16位都需要减1即前面的16位减1后面的16位减1因为adjONE_ARRIVAL|ONE_PARTY那么是什么意思呢在前面的s-adjust中可以这样的理解因为实际上如果是0000 0000 0000 0001那么这个1就是低16位的1而adjONE_ARRIVAL|ONE_PARTY代表两个这个因为他先操作了左移动16然后操作ONE_ARRIVAL|ONE_PARTY所以是低32位的两个低16位都会减1这自然是对二进制来说的比如1110减去2即减去0010那么结果就是1100即其中一个1减了即减1了只是上面的1是最低的 private static final int ONE_ARRIVAL 1;private static final int ONE_PARTY 1 PARTIES_SHIFT;private static final int PARTIES_SHIFT 16;2若以结果看若把未到达线程数减1因为有些操作会使得state改变那么最终就会使得未到达线程数发生改变减了之后如果还未到0说明其他的情况基本什么都不做因为至少到1才会减到0那么其他的情况基本直接返回那么他对应的操作结束执行后面的代码就如前面说过的latch.countDown();后面还可以操作latch.countDown();就是可以执行因为他只是操作减而已那么只要他进行了减说明操作完毕并且由于先后顺序自然使得先操作的基本不会抢到如果到0最终会做2件事情第1 重置state把state的未到达线程个数重置到总的注册的线程数中同时phase加1第2唤醒队列中的 线程 首先我们看一下唤醒方法 private void releaseWaiters(int phase) {QNode q; // first element of queueThread t; // its thread//根据phase是奇数还是偶数使用evenQ或oddQAtomicReferenceQNode head (phase 1) 0 ? evenQ : oddQ;while ((q head.get()) ! null q.phase ! (int)(root.state PHASE_SHIFT)) { //遍历栈if (head.compareAndSet(q, q.next) (t q.thread) ! null) {q.thread null;LockSupport.unpark(t);}}} 遍历整个栈只要栈当中节点的phase不等于当前Phaser的phase说明该节点不是当前轮的而是前一轮 的应该被释放并唤醒 awaitAdvance()方法分析 public int awaitAdvance(int phase) {final Phaser root this.root;//当只有一个Phaser没有树状结构时root就是thislong s (root this) ? state : reconcileState();int p (int)(s PHASE_SHIFT);if (phase 0) //phase已经结束无需阻塞直接返回return phase;if (p phase)//阻塞在phase这一轮上return root.internalAwaitAdvance(phase, null);return p;}下面的while循环中有4个分⽀ 初始的时候nodenull进入第1个分⽀进行自旋自旋次数满⾜之后会新建一个QNode节点 之后执行第3、第4个分⽀分别把该节点入栈并阻塞 private int internalAwaitAdvance(int phase, QNode node) {// assert root this;releaseWaiters(phase-1); // ensure old queue cleanboolean queued false; // true when node is enqueuedint lastUnarrived 0; // to increase spins upon changeint spins SPINS_PER_ARRIVAL;long s;int p;while ((p (int)((s state) PHASE_SHIFT)) phase) {// 不可中断模式的自旋if (node null) { // spinning in noninterruptible modeint unarrived (int)s UNARRIVED_MASK;if (unarrived ! lastUnarrived (lastUnarrived unarrived) NCPU)spins SPINS_PER_ARRIVAL;boolean interrupted Thread.interrupted();// 自旋结束建一个节点之后进入阻塞if (interrupted || --spins 0) { // need node to record intrnode new QNode(this, phase, false, false, 0L);node.wasInterrupted interrupted;}elseThread.onSpinWait();}// 从阻塞唤醒退出while循环else if (node.isReleasable()) // done or abortedbreak;else if (!queued) { // push onto queueAtomicReferenceQNode head (phase 1) 0 ? evenQ : oddQ;QNode q node.next head.get();if ((q null || q.phase phase) (int)(state PHASE_SHIFT) phase) // avoid stale enq// 节点入栈queued head.compareAndSet(q, node);}else {try {// 调用node.block()阻塞ForkJoinPool.managedBlock(node);} catch (InterruptedException cantHappen) {node.wasInterrupted true;}}}if (node ! null) {if (node.thread ! null)node.thread null; // avoid need for unpark()if (node.wasInterrupted !node.interruptible)Thread.currentThread().interrupt();if (p phase (p (int)(state PHASE_SHIFT)) phase)return abortWait(phase); // possibly clean up on abort}releaseWaiters(phase);return p;}这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)方法⽬的是把node对应的线程阻塞ManagerdBlocker是ForkJoinPool里面的一个静态接口定义如下 public class ForkJoinPool extends AbstractExecutorService {//.. public static interface ManagedBlocker {boolean block() throws InterruptedException;boolean isReleasable(); }//.. }//而对应的node是QNode类型他是如下所以可以赋值因为子类可以指向父类 static final class QNode implements ForkJoinPool.ManagedBlocker {QNode实现了该接口实现原理还是park()如下所示之所以没有直接使用park()/unpark()来实现阻塞、唤醒而是封装了ManagedBlocker这一层主要是出于使用上的方便考虑一方面是park()可能被中断唤醒另一 方面是带超时时间的park()把这二者都封装在一起 static final class QNode implements ForkJoinPool.ManagedBlocker {final Phaser phaser;final int phase;final boolean interruptible;final boolean timed;boolean wasInterrupted;long nanos;final long deadline;volatile Thread thread; // nulled to cancel waitQNode next;QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) {this.phaser phaser;this.phase phase;this.interruptible interruptible;this.nanos nanos;this.timed timed;this.deadline timed ? System.nanoTime() nanos : 0L;thread Thread.currentThread();}public boolean isReleasable() {if (thread null)return true;if (phaser.getPhase() ! phase) {thread null;return true;}if (Thread.interrupted())wasInterrupted true;if (wasInterrupted interruptible) {thread null;return true;}if (timed (nanos 0L || (nanos deadline - System.nanoTime()) 0L)) {thread null;return true;}return false;}public boolean block() {while (!isReleasable()) {if (timed)LockSupport.parkNanos(this, nanos);elseLockSupport.park(this);}return true;} } 理解了arrive()和awaitAdvance()arriveAndAwaitAdvance()就是二者的一个组合版本这里就不说明了 Atomic类Atomic英文意思原子的所以接下来基本都是对某些数据的原子操作比如后面学习的AtomicInteger类一般代表操作int类型的原子操作其也正好也是有private volatile int value;value他刚好是int类型所以即也基本是这样说明的 AtomicInteger和AtomicLong 如下面代码所示对于一个整数的加减操作要保证线程安全需要加锁也就是加synchronized关键字 public class MyClass {private int count 0;public void synchronized increment() {count;}public void synchronized decrement() {count--;} } 但有了Concurrent包的Atomic相关的类之后synchronized关键字可以用AtomicInteger代替其性能更 好对应的代码变为 public class MyClass {private AtomicInteger count new AtomicInteger(0); //定义初始的数为0public void add() {count.getAndIncrement();}public long minus() {return count.getAndDecrement();} }/* private volatile int value;public AtomicInteger(int initialValue) {value initialValue;}public final int get() {return value;} *///一个简单的操作 package main5;import java.util.concurrent.atomic.AtomicInteger;/****/ public class MyClass extends Thread {private AtomicInteger count new AtomicInteger(0);int i 0;public void add() {i;count.getAndIncrement();}public long minus() {i--;return count.getAndDecrement();}public static class add extends Thread {MyClass m;public add(MyClass target) {m target;}Overridepublic void run() {m.add();}}public static class add1 extends Thread {MyClass m;public add1(MyClass target) {m target;}Overridepublic void run() {m.minus();}}public static void main(String[] args) throws InterruptedException {MyClass m new MyClass();new add(m).start();new add1(m).start();new add(m).start();new add1(m).start();new add(m).start();new add1(m).start();new add(m).start();new add1(m).start();new add(m).start();new add1(m).start();Thread.sleep(5000); //这里可以选择删除阻塞来多次进行测试那么一般就可以看到他们的区别了//当删除阻塞多次操作执行时会出现如下说明System.out.println(m.i); //会发生变化因为他会在中途获取或者说直接先获取了如果在前面加上了Thread.sleep(5000);那么通常始终为0但是可能会出现重排序使得操作相同的值虽然这里基本不会出现因为几率是很小的基本直接的运行非常难测试出来那么为什么也是始终为0呢这是因为对应的或者其类似的是与读操作一起的即与前面说明的而后面正好是直接的进行操作一样由于间隙的存在导致也是没有得到相同的值间隙在前面说明过了即这个地方float value myFloat;对应的解释里面System.out.println(m.count.get()); //始终为0这是因为对应的volatile存在使得前面进行修改时我不能直接的获取所以CAS只是保证原子性只能一个人进行操作但不能保证可见性重排序所以需要volatile保证解决重排序问题即一般的CAS的完整操作都需要volatile否则这里也基本不会始终为0那么有没有可能CAS前一次操作和后一次操作的缝隙中操作获取呢答不会CAS操作的始终认为是写操作底层设置的因为写的操作也是代码完成我们自然也能进行改变或者设置某些东西虽然底层一般都是native通常是C/C代码其他语言也有可能但通常是C/C的代码的也就是说如果CAS不结束使得操作完毕而不会进行阻塞或者自旋那么才能进行获取所以这里是始终为0的}} //所以对应也的确相当于加上了锁这里也能得出对应的CAS实际上也是分先后操作的即他们谁先操作CAS所以这里也要进行注意具体为什么这样可以实现前面的方式且不加锁我们看如下其对应的源码如下 public final int getAndIncrement() {return U.getAndAddInt(this, VALUE, 1);}public final int getAndDecrement() {return U.getAndAddInt(this, VALUE, -1);}//唯一的区别就是1和-1很明显我们可以认为是加1和减1那么他操作的共同数是谁呢很明显在上面可以看到是private volatile int value;所以操作的共同数是value上面中的U是Unsafe的对象 private static final jdk.internal.misc.Unsafe U jdk.internal.misc.Unsafe.getUnsafe(); //是Unsafe类的 //注意一般来说jdk.internal.misc.Unsafe并不是所有的类能够导入或者直接使用他因为他只允许特点的类使用所以当你创建项目使用他时基本上不能使用可以选择那个地方然后alt回车或者等待他自己的提示通常点击第一个进行解决即可即加上允许使用他的权限但是可能还是会运行报错因为他idea自带的解决方式可能在不同的jdk中是不行的一般jdk8开始及其以后的版本就不行了或者需要本身类的某些其他的要求因为CAS包括VarHandle即不只是Unsafe对应的注解通常是需要本身类的某些要求的具体可以百度查看后面会给出具体地址的 /* //这里是Unsafe类里面的因为jdk.internal.misc.Unsafepublic static Unsafe getUnsafe() {return theUnsafe;}//这里是Unsafe类里面的即提供的自带的对象不能改变private static final Unsafe theUnsafe new Unsafe(); */private static final long VALUE U.objectFieldOffset(AtomicInteger.class, value);private volatile int value; //基本上不会操作这个在后面会说明的因为上面的VALUE利用这个而已所以实际上定义的初始值最终操作初始化了VALUE而不是value但是我们会利用VALUE来返回value的值的所以最终还是操作value的值的设置也是如此AtomicInteger的 getAndIncrement() 方法和 getAndDecrement() 方法都调用了一个方法即U.getAndAddInt(…) 方法该方法基于CAS实现 //这里是Unsafe类里面的那么对应的方法一般也是Unsafe类里面的 HotSpotIntrinsicCandidatepublic final int getAndAddInt(Object o, long offset, int delta) {int v;do {v getIntVolatile(o, offset); //一般代表将对应的值读取出来是VALUE因为参数就是这个提取出对应的value值/*HotSpotIntrinsicCandidatepublic native int getIntVolatile(Object o, long offset);*/} while (!weakCompareAndSetInt(o, offset, v, v delta)); //这里进行delta那么-1和1就相当于减和加了而继续传递offset是为了进行比较防止发生改变然后将对应的value进行设置值因为我们能提取value的值自然也能设置其值具体看本身类本身类通常代表当前类前面也说明过了以及注解的作用当然你只要知道他能实现即可因为我们主要学习的是思想return v;} //实际上乐观锁并非没有利用锁的概念只是其底层的操作我们一般不称为锁而已就如cpu抢占一样而而实际上使用CAS效率高是因为使用的锁少所以效率高的 //最后一般来说阻塞都是自旋后面会说明的通常有两个策略一般我们都是第二种因为我们一般是多核后面会说明为什么的do-while循环直到判断条件返回true为⽌因为有个!该操作就是之前有时候提到的自旋一般称为自旋当然他里面或者条件通常会有阻塞时间的比如类似于sleep的操作即并不会始终的运行使得浪费资源 getAndAddInt 方法具有volatile的语义因为注解的存在也就是对所有线程都是同时可⻅的对应的操作可见 而 weakCompareAndSetInt 方法的实现 HotSpotIntrinsicCandidatepublic final boolean weakCompareAndSetInt(Object o, long offset,int expected,int x) {return compareAndSetInt(o, offset, expected, x);}调用了 compareAndSetInt 方法该方法的实现 public final class Unsafe {//.. HotSpotIntrinsicCandidatepublic final native boolean compareAndSetInt(Object o, long offset,int expected,int x);//.. }/* 第一个参数表示要修改哪个对象的属性值 第二个参数是该对象属性一般代表其指定的value属性在内存的偏移量 第三个参数表示期望值也就是说如果读取出来的值没有被其他线程操作过那么说明可以放心的设置值也就是返回true这是Unsafe类的作用也就是说如果设置成功返回true否则返回false这里并不是类似于就是之前说明的实际的变量设置说明实际上会是state因为他们是VarHandle类的操作而不是Unsafe类的操作即虽然都是进行比较设置但不一样基本上相当于先找到原来的值进行比较然后设置这里如果一样就进行设置然后返回true否则返回false对应的VarHandle在前面说明过了这里就不多说了虽然对应的是三个参数且只是操作this但是他是保存赋值的所以可以找到一般是注解的作用而这里利用偏移量来找到也就是说明的保存的因为之前的是VarHandle而这里是Unsafe他们是不同的 第四个参数表示要设置为的⽬标值//注意他的注解通常可能并不会像VarHandle一样利用本身类的一些东西或者也利用了只是基本都相同而已我们偏向于后者所以他的作用基本是统一的而不会发生改变比如之前的if (STATE.compareAndSet(this, s, s-adjust)) {是VarHandle的操作就使得单纯的减少通常也设置成功不是true了而是false了但是可能并不决定即可能也会影响吧如果影响了大概会说明如果没有说明可以自己去百度虽然可能找不到 *///之前的是 package java.lang.invoke;public abstract class VarHandle {// ...// CAS原子操作其他语言的操作native一般主要是用来在Java程序中调用c/c的代码也有可能是其他语言的代码public final nativeMethodHandle.PolymorphicSignatureHotSpotIntrinsicCandidateboolean compareAndSet(Object... args); //方法名称是不同的// ...}实际上我们说明的源码并不重要重要的是其中的设计思想因为源码是非常多的要将所有的类都进行学习是不现实的所以重要的是思想 悲观锁与乐观锁 对于悲观锁认为数据发⽣并发冲突的概率很大读操作之前就上锁通常比如加上synchronized关键字后面要讲的ReentrantLock都是悲观锁的典型 对于乐观锁认为数据发⽣并发冲突的概率比较小读操作之前不上锁等到写操作的时候再判断数据在此 期间是否被其他线程修改了如果被其他线程修改了就把数据重新读出来重复该过程如果没有被修改就写 回去判断数据是否被修改同时写回新值这两个操作要合成一个原子操作也就是CAS Compare And Set AtomicInteger的实现就是典型的乐观锁 我们可以看到AtomicInteger的实现就是利用Unsafe现在我们来看看Unsafe这个类 Unsafe 的CAS详解注意是Unsafe的CAS因为CAS有多种实现方式比如前面的VarHandle但本质也是一样的比如在77章博客提到的乐观锁 我们知道在前面多次的使用了CAS或者其相关的AQS现在我们来说明一下CAS Unsafe类是整个Concurrent包的基础里面所有方法都是native的如具体到上面提到的compareAndSetInt方 法即 HotSpotIntrinsicCandidatepublic final native boolean compareAndSetInt(Object o, long offset,int expected,int x);要特别说明一下第二个参数它是一个long型的整数经常被称为xxxOffset意思是某个成员变量在对应的 类中的内存偏移量该变量在内存中的位置表示该成员变量本身或者说将对应的成员变量值取出来 第二个参数的值为AtomicInteger中的属性VALUE public final int getAndDecrement() {return U.getAndAddInt(this, VALUE, -1);}private static final long VALUE U.objectFieldOffset(AtomicInteger.class, value); private static final jdk.internal.misc.Unsafe U jdk.internal.misc.Unsafe.getUnsafe(); //之所以这样写jdk.internal.misc.Unsafe是因为没有导入那么就需要导入的方式来调用了 //所以这个U是Unsafe类的 package jdk.internal.misc;import jdk.internal.HotSpotIntrinsicCandidate; import jdk.internal.vm.annotation.ForceInline;import java.lang.reflect.Field; import java.security.ProtectionDomain; public final class Unsafe {//.. }而Unsafe的 objectFieldOffset(…) 方法调用就是为了找到AtomicInteger类中value属性所在的内存偏 移量或者说其位置使得可以获取值或者设置值 objectFieldOffset 方法的实现 public long objectFieldOffset(Field f) {if (f null) {throw new NullPointerException();}return objectFieldOffset0(f);}//很明显对应的VALUE就是调用这个U.objectFieldOffset(AtomicInteger.class, value); //即这里的方法 public long objectFieldOffset(Class? c, String name) {if (c null || name null) {throw new NullPointerException();}return objectFieldOffset1(c, name);}其中objectFieldOffset1的实现为 private native long objectFieldOffset0(Field f); //这个也写上吧private native long objectFieldOffset1(Class? c, String name); //即这个是主要的操作那么他很明显可以帮你找到内存里面对应的属性的偏移量当我们返回后可以通过这个数值找到对应的value属性并进行实时的操作或者说找到具体值修改当然具体的修改和获取值也是native的比如前面的public final native boolean compareAndSetInt进行设置以及public native int getIntVolatile进行获取值因为java不能直接操作内存但C/C可以所以可以认为的确是设置了或者直接的获取值了很明显他们都是native即其他语言一般是非java代码的的操作 这里的Unsafe所有调用CAS的地方基本都会先通过这个方法把成员变量转换成一个Offset即VALUE他也正好是long类型因为对应的返回值也是long类型再次以AtomicInteger为例 private static final jdk.internal.misc.Unsafe U jdk.internal.misc.Unsafe.getUnsafe();private static final long VALUE U.objectFieldOffset(AtomicInteger.class, value); //即这个操作 从上面代码可以看到无论是Unsafe对应的对象自带的还是VALUE都是静态的也就是类级别的所有对象共用的此处的VALUE实际上就代表了value变量本身后面执行CAS操作的时候不是直接操作value而是操作VALUE使得间接的操作value的获取以及修改因为我们的value是不能直接的给其他静态资源类方法等等修改的所以利用其他静态资源进行操作获取或者修改设置值虽然大多数并不是静态的或者方法没有静态一般只有三个变量是静态的但也是一种加强保护不只是防止别人也防止自身了使得对应的一些不能进行操作或者不能直接的进行操作 private static final long serialVersionUID 6214790243416807050L;/** This class intended to be implemented using VarHandles, but there* are unresolved cyclic startup dependencies.*/private static final jdk.internal.misc.Unsafe U jdk.internal.misc.Unsafe.getUnsafe();private static final long VALUE U.objectFieldOffset(AtomicInteger.class, value); //上面三个静态private volatile int value; //一般只有这四个变量//其他方法通常都不是静态的一般都不是静态的自旋与阻塞 当一个线程拿不到锁的时候有以下两种基本的等待策略 策略1放弃CPU进入阻塞状态等待后续被唤醒再重新被操作系统调度 策略2不放弃CPU空转不断重试也就是所谓的自旋一般我们也会设置自旋的时间就如sleep设置的时间一样当然也可以设置时间到了也可以使得操作策略1通常我们都会设置比如wait虽然没有操作自旋但是可以认为是时间非常少的自旋然后时间到了操作策略1虽然并不是 很显然如果是单核的CPU只能用策略1因为如果不放弃CPU那么其他线程无法运行就如释放锁释放锁一般实际上就是释放cpu因这里给出一个容易理解的方式只有你在运行那么cpu就是占用的否则单纯的阻塞或者说没有运行那么cpu就是释放因为运行是需要cpu的那么sleep占用cpu吗答是占用的他可以认为是自旋当时间到了就不阻塞了而wait可以认为没有占用即可以认为是策略1也就无法释放 锁但对于多CPU或者多核策略2就很有用了因为没有线程切换的开销所以一般多核会选择使用第二种一般现在的电脑都是多核所以以策略2为主所以你也基本上测试不出来单核cpu的情况因为这是硬件的问题你也通常改变不了那么这里就可以得出一个结论线程切换阻塞到唤醒之间的切换导致线程切换简称阻塞唤醒切换的开销大于始终占用cpu的开销也就是无限循环所以一般自旋性能大于单纯的阻塞唤醒切换阻塞唤醒切换一种操作说明他是可以被唤醒或者阻塞的或者说被唤醒当然这是对于对应的线程数竞争不激烈的情况下说明的因为线程多点cpu还有那么若阻塞和唤醒就会操作多即不好但是若非常多的话那么阻塞和唤醒要好了因为自旋是始终占用cpu的那么cpu需要等了所以若是少的那么自旋自然可能会比阻塞切换的效率高即就是是少的自旋性能通常也是好的因为他是始终的重试的你可能说即再一定的线程下那么自旋好了因为重试的存在使得我单纯的不用切换也就是切换变成了重试所以开销少效率好即运行效率好即更快的执行获取锁完毕但是如果你一直重试并且认为他永不切换或者线程过多就阻塞唤醒好所以通常自旋会定义一个临界值synchronized一般比较大认为你没有作用或者到自旋一定次数还是操作阻塞唤醒切换这些操作吧因为无限制的重试或者线程过多自然比单纯的直接阻塞唤醒要开销大 AtomicInteger的实现就用的是自旋策略或者说对应的Unsafe的CAS就是这样的策略CAS为什么会操作一个人是因为CAS其当有人操作成功后其他的人基本都会失败而这种失败我们也称为CAS的没有获取锁的意思这个失败的意思在后面会提到的如果拿不到锁就会一直重试 注意以上两种策略并不互斥可以结合使用如果获取不到锁先自旋如果自旋还拿不到锁再阻塞策略1synchronized关键字就是这样的实现策略一般他实际上他也操作了自旋拿不到锁才阻塞会自旋一会时间这个时间通常比较大一般来说synchronized释放锁后也会操作唤醒一般是全部唤醒因为是不公平的公平的可能只会唤醒前一个或者也全部唤醒但是因为顺序的原因导致可能继续阻塞的若第一个快速的释放那么可能他会在阻塞前又操作了唤醒所以如果是后面一种情况一般我们都会自旋一下当然这基本作用不大主要是需要在选择之后主动等待对方都阻塞才往后面操作当然主要是公平的可能只会唤醒前一个 除了AtomicInteger是这样外AtomicLong也是同样的原理所以就不多说了一般现在的锁都会操作自旋并且留有一定的时间准备操作策略1通常原语操作基本都是策略1的比如wait也可能操作策略2通常lock和synchronized基本操作策略2 综上所述我们将CAS称为锁的作用也行在前面我们通常也是这样认为的因为自旋的存在所以在前面若有认为CAS也是锁那么也是正确的因为一般来说CAS都需要操作自旋来保证锁的阻塞作用前面说明的源码相关CAS基本都操作了自旋而由于CAS一般利用版本号思想77章博客有说明而CAS的版本号只是利用其他语言实现的而已因为数据库可以实现那么自然其他语言也能实现即操作了native这个其他语言通常是C/C可能还有其他的语言操作具体要看对应的配置给该方法的指向语言的操作通常是C/C而已极少或者没有其他的除了C/C的语言操作了且对应的设置通常是唯一的那么自旋自然可以操作也就能认为可以阻塞了策略1这也是之前说明的CAS可以阻塞的原因而我们一般将这种有阻塞CAS的操作称为AQS 那么为什么自旋要进行操作呢这是因为始终的无限循环对内存不好需要有个结果才是好的所以需要进行操作 AtomicBoolean和AtomicReference 为什么需要AtomicBoolean 对于int或者long型变量需要进行加减操作被多次的操作所以要加锁因为对于boolean来说int或者long进行操作的多特别是计算有多个该两个类型所以通常需要加锁但对于一个boolean类型来说true或false的赋值和取值操作基本上是非常少的通常只需要加上volatile关键字就够了为什么还需要AtomicBoolean呢 这是因为往往要实现下面这种功能或者说需要加锁的功能因为虽然通常只需要加上volatile即可但是有些情况还是需要加锁的 if (!flag) {flag true;// ...}// 或者更清晰一点的 if (flag false) {flag true; //如果没有加锁或者说乐观锁和悲观锁因为上面只是说明了只需要加上volatile那么这里会赋值两次也就是对应的两个true// ...} 上面总体来说也就是要实现 compare比较和set赋值两个操作合在一起的原子性而这也正是CAS自带提供的功能所以对应的AtomicBoolean基本也就是使用CAS操作即上面的代码就可以变 成 if (compareAndSet(false, true)) { //传递两个参数准备将false指定的变量设置为true// ... 利用CAS操作进行设置操作} public class AtomicBoolean implements java.io.Serializable {//..private static final VarHandle VALUE;//.. //具体的AtomicBoolean里面就操作如下 public final boolean compareAndSet(boolean expectedValue, boolean newValue) {return VALUE.compareAndSet(this,(expectedValue ? 1 : 0),(newValue ? 1 : 0));}//很明显也就是使用之前的VarHandle类那么就需要使用VarHandle类的对应的说明了是返回true还是false在前面的对应的VarHandle在前面说明过了这里就不多说了也进行提到过//..}//基本上都是返回true因为一般来说只有设置成功或者减少的是参数的数量或者到0那么基本返回true前面说明过了同样地AtomicReference也需要同样的功能一般我们主要说明AtomicBoolean对应的方法如下 public class AtomicReferenceV implements java.io.Serializable {//.. private static final VarHandle VALUE;//.. public final boolean compareAndSet(V expectedValue, V newValue) {return VALUE.compareAndSet(this, expectedValue, newValue);}//.. }对应的VarHandle的compareAndSet在前面说明过这里再次的提一下给出 public final nativeMethodHandle.PolymorphicSignatureHotSpotIntrinsicCandidateboolean compareAndSet(Object... args);其中以AtomicBoolean为例expect是旧的引用update为新的引用其他的基本都是如此因为是Object所以是引用 上面两种可以看到是利用了VarHandle来使得支持CAS并且支持boolean的类型实际上由于是引用所以基本上可以支持任意类型所以我们只看看Unsafe类如何⽀持boolean和double类型 在Unsafe类中也提供了三种类型的CAS操作int、long、Object也就是引用类型如下所示 //前面对应的就操作了说明过了 HotSpotIntrinsicCandidatepublic final native boolean compareAndSetInt(Object o, long offset,int expected,int x);HotSpotIntrinsicCandidatepublic final native boolean compareAndSetLong(Object o, long offset,long expected,long x); HotSpotIntrinsicCandidatepublic final native boolean compareAndSetObject(Object o, long offset,Object expected,Object x);即在jdk的实现中这三种CAS操作都是由底层实现的其他类型的CAS操作都要转换为这三种之一进行操 作 比如对应的支持的boolean和double类型 ForceInlinepublic final boolean compareAndSetDouble(Object o, long offset,double expected,double x) {//是long的对应操作return compareAndSetLong(o, offset,Double.doubleToRawLongBits(expected),Double.doubleToRawLongBits(x)); }//下面四个是一起的ForceInlinepublic final boolean compareAndSetBoolean(Object o, long offset,boolean expected,boolean x) {return compareAndSetByte(o, offset, bool2byte(expected), bool2byte(x));}HotSpotIntrinsicCandidatepublic final boolean compareAndSetByte(Object o, long offset,byte expected,byte x) {return compareAndExchangeByte(o, offset, expected, x) expected;}HotSpotIntrinsicCandidatepublic final byte compareAndExchangeByte(Object o, long offset,byte expected,byte x) {long wordOffset offset ~3;int shift (int) (offset 3) 3;if (BE) {shift 24 - shift;}int mask 0xFF shift;int maskedExpected (expected 0xFF) shift;int maskedX (x 0xFF) shift;int fullWord;do {fullWord getIntVolatile(o, wordOffset);if ((fullWord mask) ! maskedExpected)return (byte) ((fullWord mask) shift);//看这里} while (!weakCompareAndSetInt(o, wordOffset,fullWord, (fullWord ~mask) | maskedX));return expected;}HotSpotIntrinsicCandidatepublic final boolean weakCompareAndSetInt(Object o, long offset,int expected,int x) {//是int的对应操作return compareAndSetInt(o, offset, expected, x); //也的确到这里了}对应这三种CAS操作中其中的参数以compareAndSetInt为例其他的基本都是一样的 1第一个参数是要修改的对象 2第二个参数是对象的成员变量value在内存中的位置一个long型的整数前面说明过了 3第三个参数是该变量是该成员变量的旧值 4第四个参数是该变量的新值 AtomicBoolean类型如何⽀持CAS即对应的类似的compareAndSet操作虽然前面提到过了 对于用int型来代替的在入参的时候将boolean类型转换成int类型在返回值的时候将int类型转换成boolean类型如下所示 public final boolean compareAndSet(boolean expectedValue, boolean newValue) {return VALUE.compareAndSet(this,(expectedValue ? 1 : 0),(newValue ? 1 : 0));} //对应的同理AtomicReference已经也给出过了如果是double类型Unsafe类又如何⽀持呢 这依赖double类型提供的一对double类型和long类型互转的方法 public final class Double extends Number implements ComparableDouble {//..HotSpotIntrinsicCandidatepublic static native long doubleToRawLongBits(double value); //double变long//.. HotSpotIntrinsicCandidatepublic static native double longBitsToDouble(long bits); //long变double//..}而在Unsafe类中的方法实现 //具体double的支持 ForceInlinepublic final boolean compareAndSetDouble(Object o, long offset,double expected,double x) {return compareAndSetLong(o, offset,Double.doubleToRawLongBits(expected),Double.doubleToRawLongBits(x));} //可以看到使用了Double.doubleToRawLongBits将对应double变成long了即对应都是支持的比如intlongbooleandouble等等其他就不多说了由于VarHandle是引用所以通常都能支持所以上面只是说明Unsafe的情况其中对应的类基本都是因为Unsafe类或者VarHandle类使得可以支持CAS 具体Unsafe比如其他的情况 ForceInlinepublic final boolean compareAndSetFloat(Object o, long offset,float expected,float x) {//int的操作return compareAndSetInt(o, offset,Float.floatToRawIntBits(expected),Float.floatToRawIntBits(x));}其他情况看源码吧这里就不依次列出来了但是基本上都也是支持的因为其也包括Object的方法 HotSpotIntrinsicCandidatepublic final native boolean compareAndSetObject(Object o, long offset,Object expected,Object x); //但是他的底层操作比较麻烦比VarHandle要麻烦一点所以一般只是操作具体的这是因为注解是不同的 //但是他虽然麻烦一点但是其他具体操作却比VarHandle要好虽然VarHandle兼容所有而Unsafe对应的要好的确只能是对应的其中一种即他们各有优点吧AtomicStampedReference和AtomicMarkableReference ABA问题与解决办法 到⽬前为⽌CAS都是基于值来做比较的但如果另外一个线程把变量的值从A改为B再从B改回到A那么 尽管修改过两次可是在当前线程做CAS操作的时候却会因为值没变而认为数据没有被其他线程修改过这就是 所谓的ABA问题特别的再某些时候可能会重复的进行操作造成问题比如如果还是A那么进行减减等等虽然可以加锁但是CAS是不操作锁的即这里是乐观锁的问题 举例来说 小张⽋小李100块约定今天还给打到银行卡 小李的银行卡余额是0打过来之后应该是100块 小张今天还钱这个事小李知道小李还告诉了自己媳妇小张还钱了并且小李媳妇看到了马上就取出来花掉了 然后小李恰好在他媳妇取出之后检查账户一看余额还是0然后找小张要账即被还钱和被媳妇花掉是一瞬间的还没有来得及看余额 这其中小李的账户余额从0到100再从100到0小李一开始检查是0第二次检查还是0就认为小张没 还钱 实际上小李媳妇花掉了这就是ABA问题及A到B再到A其实小李可以查看账户的收⽀记录 所以我们要解决ABA问题就需要解决上面说的查看账户的收支记录所以不仅要比较值还要比较版本号77章博客那里就是这样的操作因为ABA问题是乐观锁的问题所以也属于乐观锁也就还要比较是否是原来的那条记录因为值可以一样但是可能并不是原来的那个操作了 而这正是 AtomicStampedReference一般AtomicReference和AtomicStampedReference都是操作引用也就是除了基本的对象如Integer外的引用对象或者说加上了Reference相关的基本都是操作这样的引用代表操作对象比如我们创建的对象当然实际上也包括基本的对象所以说他基本能够操作所有的类型因为他是泛型做的事情其对应的CAS方法如下 public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {PairV current pair;//private volatile PairV pair;returnexpectedReference current.reference expectedStamp current.stamp ((newReference current.reference newStamp current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));}之前的 CAS一般只有两个重要参数这里的 CAS一般有四个重要参数后两个参数就是版本号的旧值和新值当expectedReference ! 对象当前的reference时说明该数据肯定被其他线程修改过也就会返回false因为当expectedReference 对象当前的reference时说明没有被修改但不确定是否是原来的记录了所以那么再进一步比较expectedStamp是否等于对象当前的版本 号继续的以此判断数据是否被其他线程修改过而只要修改过那么自然返回false使得不进行操作因为我的是旧数据CAS就是这样操作的即只能一人操作成功当然虽然他们失败但是可以进行阻塞或者说自旋等等操作所以失败实际上就是判断是否操作过的意思这里解释前面的而这种失败我们也称为CAS的没有获取锁的意思这个失败的意思在后面会提到的这个地方 一般来说VarHandle只会操作三个参数而Unsafe一般是2个或者4个参数如果发现compareAndSet或者xxxcompareAndSetxxx的操作那么通常说明的是VarHandle或者Unsafe但是并不绝对比如AbstractQueuedSynchronizer里面的compareAndSetState也是操作compareAndSet所以对应的xxx应该代表具体的Unsafe操作的类型名称通常固定而不是所有的xxx未知数比如前面出现的weakCompareAndSetInt他里面就操作了compareAndSetInt至此说明完毕如果对应的参数不对通常他并不是执行对应的方法他对应方法里面通常才会执行比如之前的head.compareAndSet(q, q.next)可以全局搜索使用ctrlf然后复制粘贴即可找到实际上他里面还有其他的操作最终使得操作完毕一般是三个参数从上面可以看出那么AtomicStampedReference对应的操作是使用VarHandle的因为是compareAndSet可以选择自己看一下上面的casPair方法即可 private boolean casPair(PairV cmp, PairV val) {return PAIR.compareAndSet(this, cmp, val);}private static final VarHandle PAIR;public final nativeMethodHandle.PolymorphicSignatureHotSpotIntrinsicCandidateboolean compareAndSet(Object... args);发现也的确如此 为什么没有AtomicStampedInteger或AtomictStampedLong 我们要解决Integer或者Long型变量的ABA问题但是为什么只有AtomicStampedReference而没有AtomicStampedInteger或者AtomictStampedLong呢 因为这里要同时比较数据的值和版本号而Integer型或者Long型的CAS没有办法同时比较两个变量他们优先只是考虑值的而不是具体引用于是为了进行解决所以实际上AtomicStampedReference会有其他的操作进行解决即操作把值和版本号封装成一个对象也就是其里面的Pair内部类然后通过对象引用的CAS来实现代码 如下所示 public class AtomicStampedReferenceV {//没有//..因为后面就是下面的Pair的代码 private static class PairT {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference reference; //值this.stamp stamp; //版本号是int类型}static T PairT of(T reference, int stamp) {return new PairT(reference, stamp); //得到对象}}//.. // VarHandle mechanicsprivate static final VarHandle PAIR; //使用的是VarHandle的CASstatic {try {MethodHandles.Lookup l MethodHandles.lookup();PAIR l.findVarHandle(AtomicStampedReference.class, pair,Pair.class);} catch (ReflectiveOperationException e) {throw new ExceptionInInitializerError(e);}}private boolean casPair(PairV cmp, PairV val) {return PAIR.compareAndSet(this, cmp, val);} //也没有//..因为这里就是最后的位置了 }当使用的时候在构造方法里面传入值和版本号两个参数应用程序对版本号进行累加操作然后调用上面的CAS进行总体设置具体可以参照77章博客如下所示 //对应的AtomicStampedReference里面的而不是Pair里面的public AtomicStampedReference(V initialRef, int initialStamp) {pair Pair.of(initialRef, initialStamp); //对应是静态的可以直接使用} AtomicMarkableReference AtomicMarkableReference与AtomicStampedReference原理类似只是Pair里面的版本号是boolean类型 的而不是整型的累加变量如下所示 public class AtomicMarkableReferenceV {private static class PairT {final T reference;final boolean mark;private Pair(T reference, boolean mark) {this.reference reference;this.mark mark; //这里是boolean类型而不是对应的前面的int类型}static T PairT of(T reference, boolean mark) {return new PairT(reference, mark);}}//..}因为是boolean类型只能有true、false 两个版本号所以并不能完全避免ABA问题只是降低了ABA发⽣的 概率因为可能当你修改后其他多个线程或多次的修改使得版本号还是会变回来那么自然就没有避免ABA问题了即这里的版本号会回来的而不是累加的不会回来所以通常只是针对于两个线程交替来操作的或者少量的线程但也建议不用使用因为可能一个线程可以多次的进行操作 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater 为什么需要AtomicXXXFieldUpdater 如果一个类是自己编写的则可以在编写的时候把成员变量定义或者实现操作Atomic原子类型但如果是一个已经有的类在 不能更改其源代码的情况下要想实现对其成员变量的原子操作通常就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater来在外部进行操作当然我们也可以自己进行操作只是他给出了方便的类那么自然我们可以直接使用即可 通过AtomicIntegerFieldUpdater理解它们的实现原理 AtomicIntegerFieldUpdater是一个抽象类 ⾸先其构造方法是protected不能直接构造其对象必须通过它提供的一个静态方法来创建如下所示 public abstract class AtomicIntegerFieldUpdaterT {//..protected AtomicIntegerFieldUpdater() { //他不是public类型的所以通常不能直接的构造其对象}//.. }方法 newUpdater 用于创建AtomicIntegerFieldUpdater类对象 CallerSensitivepublic static U AtomicIntegerFieldUpdaterU newUpdater(ClassU tclass,String fieldName) {return new AtomicIntegerFieldUpdaterImplU(tclass, fieldName, Reflection.getCallerClass()); //不是空参的构造因为对应的AtomicIntegerFieldUpdaterImpl只有一个构造方法就是这个构造方法}newUpdater(…)静态方法传入的是要修改的类不是对象和对应的成员变量的名字内部通过反射拿到这个 类的成员变量然后包装成一个AtomicIntegerFieldUpdater对象所以这个对象表示的是类的某个成员而不 是该对象的成员变量因为我们是操作类的而不是单只对象因为类的对象可以有多个即只要操作该成员然后传递对应的对象那么都会操作一个类只能有一个Class 即若要修改某个对象的成员变量的值一般需要再传入相应的对象如下所示 public int getAndIncrement(T obj) {int prev, next;do {prev get(obj);//使用了子类的对应方法/*public final int get(T obj) {accessCheck(obj);return U.getIntVolatile(obj, offset);}*/next prev 1;} while (!compareAndSet(obj, prev, next));return prev;} //但是好像对应的对象里面也有该方法即一般我们使用的是这个public final int getAndIncrement(T obj) {return getAndAdd(obj, 1);}public final int getAndAdd(T obj, int delta) {accessCheck(obj);return U.getAndAddInt(obj, offset, delta); //这个方法在前面说明过就不多说了//这里直接操作了对应的offset实际上在进行选择时构造方法里面会生成对应的offset的这里就不给出具体位置了}public abstract class AtomicIntegerFieldUpdaterT {//..//内部类 private static final class AtomicIntegerFieldUpdaterImplTextends AtomicIntegerFieldUpdaterT { private static final Unsafe U Unsafe.getUnsafe(); //他AtomicIntegerFieldUpdater导入了import jdk.internal.misc.Unsafe;所以可以直接的使用getUnsafe来得到对应的对象前面说明过了private final long offset;//..private final Class? cclass;//..private final void accessCheck(T obj) {//private final Class? cclass;if (!cclass.isInstance(obj))throwAccessCheckException(obj);}//..public final boolean compareAndSet(T obj, int expect, int update) {accessCheck(obj);return U.compareAndSetInt(obj, offset, expect, update);}//..} }//很明显无论是父类的对应的getAndIncrement还是子类的getAndIncrement最终操作基本是一样的虽然父类多次的操作了accessCheck(obj);操作两次这是唯一的区别但总体是一样的但是由于父类指向子类所以一般我们只会使用子类的那么父类就相当于是保留扩展因为我们可以自己进行定义操作但是他既然给出了我们自然不用费力去进行自己编写并且也没有什么编写方向具体自己的编写方式可以百度这里就不多说了accecssCheck方法的作用是检查该obj是不是对应的class类型如果不是则拒绝修改CAS操作即也是服务了多个线程抛出异常从代码可以看到其 CAS 原理和 AtomicInteger 是一样的底层都调用了 Unsafe 的compareAndSetInt(…)方法 限制条件 要想使用AtomicIntegerFieldUpdater修改成员变量成员变量必须是volatile的int类型不能是Integer包装 类该限制从其构造方法中可以看到 //AtomicIntegerFieldUpdaterImpl的构造因为父类指向子类 AtomicIntegerFieldUpdaterImpl(final ClassT tclass,final String fieldName,final Class? caller) {final Field field;final int modifiers;try {field AccessController.doPrivileged(new PrivilegedExceptionActionField() {public Field run() throws NoSuchFieldException {return tclass.getDeclaredField(fieldName);}});modifiers field.getModifiers();sun.reflect.misc.ReflectUtil.ensureMemberAccess(caller, tclass, null, modifiers);ClassLoader cl tclass.getClassLoader();ClassLoader ccl caller.getClassLoader();if ((ccl ! null) (ccl ! cl) ((cl null) || !isAncestor(cl, ccl))) {sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);}} catch (PrivilegedActionException pae) {throw new RuntimeException(pae.getException());} catch (Exception ex) {throw new RuntimeException(ex);}//两个限制if (field.getType() ! int.class)throw new IllegalArgumentException(Must be integer type);if (!Modifier.isVolatile(modifiers))throw new IllegalArgumentException(Must be volatile type);// Access to protected field members is restricted to receivers only// of the accessing class, or one of its subclasses, and the// accessing class must in turn be a subclass (or package sibling)// of the protected members defining class.// If the updater refers to a protected field of a declaring class// outside the current package, the receiver argument will be// narrowed to the type of the accessing class.this.cclass (Modifier.isProtected(modifiers) tclass.isAssignableFrom(caller) !isSamePackage(tclass, caller))? caller : tclass;this.tclass tclass;this.offset U.objectFieldOffset(field); //偏移量的赋值}⾄于 AtomicLongFieldUpdater、AtomicReferenceFieldUpdater也有类似的限制条件其底层的CAS原 理也和对应的AtomicLong、AtomicReference是一样的这样我们就不多说了以后看到这样的说明那么代表类似即我不会进行多说明 AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子 操作注意这里并不是说对整个数组的操作是原子的而是针对数组中一个元素的原子操作而言 使用方式 以AtomicIntegerArray为例其使用方式如下 public final int getAndIncrement(int i) {return (int)AA.getAndAdd(array, i, 1);}public class AtomicIntegerArray implements java.io.Serializable {//..private static final VarHandle AA MethodHandles.arrayElementVarHandle(int[].class); //使用的是VarHandle的CASprivate final int[] array; //一般情况下我们都是this来进行找到比较赋值但这里是array即是对应的//.. } 相比于AtomicInteger的getAndIncrement()方法这里只是一个传入参数数组的下标i而不是偏移量当然这是因为对应的CAS是不同的因为其是Unsafe其一般操作偏移量而这里操作VarHandle具体为什么Unsafe一般操作偏移量在前面说明过了即这个所以需要进行查找然后找到后比较地方 其他方法也与此类似相比于 AtomicInteger 的各种加减方法也都是对应的一个下标 i如下所示 public final int getAndDecrement(int i) {return (int)AA.getAndAdd(array, i, -1);} public final int getAndSet(int i, int newValue) {return (int)AA.getAndSet(array, i, newValue);}public final boolean compareAndSet(int i, int expectedValue, int newValue) {return AA.compareAndSet(array, i, expectedValue, newValue);}//对应的AtomicInteger 的对应方法 public final int getAndIncrement() {return U.getAndAddInt(this, VALUE, 1);} public final int getAndDecrement() {return U.getAndAddInt(this, VALUE, -1);}public final boolean compareAndSet(int expectedValue, int newValue) {return U.compareAndSetInt(this, VALUE, expectedValue, newValue);}//很明显只是VALUE变成i了当然对应的this在AtomicIntegerArray里是对应的数组因为我们的目标是数组实现原理 其底层的CAS方法直接调用VarHandle中native的getAndAdd方法如下所示 public final nativeMethodHandle.PolymorphicSignatureHotSpotIntrinsicCandidateObject getAndAdd(Object... args); //之前的我们一般都操作如下public final nativeMethodHandle.PolymorphicSignatureHotSpotIntrinsicCandidateboolean compareAndSet(Object... args); //但是他们都代表进行改变的意思只是操作可能不同但结果基本一样一般来说getAndAdd在数值的操作比compareAndSet好但是compareAndSet基本可以操作任何类型而getAndAdd虽然也可以操作其他类型但是对应的方面除了数值的操作外基本都比compareAndSet差即各有缺点明⽩了AtomicIntegerArray的实现原理另外两个数组的原子类实现原理与之类似这样就不多说了 Striped64与LongAdder 从JDK 8开始针对Long型的原子操作前面大致说明了AtomicLongJava又提供了LongAdder、LongAccumulator针对Double类型Java提供了DoubleAdder、DoubleAccumulatorStriped64相关的类的继承层次如下图所示 没有颜色的白色的箭头代表是其指向的子类 abstract class Striped64 extends Number {//.. } public class LongAdder extends Striped64 implements Serializable {//.. }public class LongAccumulator extends Striped64 implements Serializable {//.. } public class DoubleAdder extends Striped64 implements Serializable {//.. } public class DoubleAccumulator extends Striped64 implements Serializable {//.. } //即对应的都进行继承Striped64即是Striped64的子类没有颜色的白色的箭头代表是其指向的子类LongAdder原理 LongAdder内部最终操作的是一个volatile long型变量与AtomicLong基本是一样的即base变量和其他Cell对应的变量的和主要默认base变量因为一开始定义的是long sum base;由多个线程对这个变量进行CAS操作多个线程同时对一个变量 进行CAS操作在高并发的场景下仍不够快因为只能操作一个并且他是原子的所以会慢点如果再要提高性能该怎么做呢 把一个变量拆成多份变为多个变量也能解决单个变量的存储上限的问题当然一般超过上限通常会操作特别的操作有些类似于 ConcurrentHashMap 的分段锁每个头节点分别加锁的例子如下图所示把一 个Long型拆成一个base变量外加多个Cell每个Cell包装了一个Long型变量当多个线程并发累加的时候如果并 发度低就直接加到base变量上如果并发度高冲突大平摊到这些Cell上具体平摊看当时的算法你可以认为是平均的加上即除以4等等当然这是不好的因为有精度的问题所以通常是依次的给予即第一个操作获取数据那么到第二个那么很明显从下图中可以知道第四个之后就是第一个了而这样他们的数据基本是不会一样的因为是依次而依次的参数的值通常不同所以基本不会是一样的在最后取值的时候再把base和这 些Cell求sum运算 以LongAdder的sum()方法为例如下所示 public long sum() {//transient volatile Cell[] cells; 是Striped64里面的因为父类指向子类对象如果子类没有自然使用父类的因为是继承关系Cell[] cs cells;// transient volatile long base;是Striped64里面的long sum base;if (cs ! null) { //如果不为null那么就进行累加而不为null说明已经平摊了通常代表高并发下的作用for (Cell c : cs)if (c ! null)sum c.value; //使用下面内部类的变量因为他是c来调用的}return sum; //一般超过上限会操作特别的操作} //是Striped64里面的内部类 jdk.internal.vm.annotation.Contended static final class Cell {volatile long value;//.. }由于无论是long还是double都是64位的但因为没有double型的CAS操作所以是通过把double型转化 成long型来实现的所以上面的base和cell[]变量是位于基类Striped64当中的英文Striped意为条带也就 是分⽚ abstract class Striped64 extends Number {transient volatile Cell[] cells;transient volatile long base;jdk.internal.vm.annotation.Contended static final class Cell {// ...volatile long value;Cell(long x) { value x; }// ...}//..Striped64() {}//只有空构造并且LongAdder也只有空构造且什么都没有做即/*public LongAdder() {}*/ //.. } //既然他们都是空构造那么前面对应的cells和base一般代表sum是怎么来的实际上一开始他们自然都是空的即一个是null一个是0但是在满足一定的条件时会进行执行某些方法使得赋值或者改变所以才会有的一般一开始就会操作base或者说在一定的小并发下才会操作后续在高并发下才会操作cells最终一致性 或者说弱一致性 在sum求和方法中并没有对cells[]数组加锁也就是说一边有线程对其执行求和操作一边还有线程修改 数组里的值也就是最终一致性在某些集群中一般代表会获取旧值但是他们的读取和更新并不会使得破坏数据只是读取的可能是旧值而已当然他们是不同的副本的所以没有操作破坏数据因为单个副本的更新的原子的就如mysql一样有写锁在83章博客也说明过最终一致性的问题而不是强一致性这也类似于ConcurrentHashMap 中的 clear()方法一边执行 清空操作一边还有线程放入数据clear()方法调用完毕后再读取hash map里面可能还有元素因此LongAdder适合高并发的统计场景而不适合要对某个 Long 型变量进行严格同步的场景 这里之所以也说最终一致性 伪共享与缓存行填充 在Cell类的定义中用了一个独特的注解jdk.internal.vm.annotation.Contended有些版本可能是sun.misc.Contended这是JDK 8之后才有的背后涉及一个很重 要的优化原理伪共享与缓存行填充 jdk.internal.vm.annotation.Contended static final class Cell {//.. }//实际上只有这么点jdk.internal.vm.annotation.Contended static final class Cell {volatile long value;Cell(long x) { value x; }final boolean cas(long cmp, long val) {return VALUE.compareAndSet(this, cmp, val);}final void reset() {VALUE.setVolatile(this, 0L);}final void reset(long identity) {VALUE.setVolatile(this, identity);}final long getAndSet(long val) {return (long)VALUE.getAndSet(this, val);}// VarHandle mechanicsprivate static final VarHandle VALUE;static {try {MethodHandles.Lookup l MethodHandles.lookup();VALUE l.findVarHandle(Cell.class, value, long.class);} catch (ReflectiveOperationException e) {throw new ExceptionInInitializerError(e);}}}每个 CPU 都有自己的缓存一般来说缓存代表保留的数据比如当你重复的读取某个数据时那么会第二次开始就直接的使用缓存的而不用去找对应的地址这样就可以使得提高效率在前面也说明过初始化后或者普通的打印称为1这个初始化就是缓存的利用当然我们还是操作本来的数据的只是我们多出了缓存而已注意是先保留缓存然后将缓存数据到主内存前面也说明过了即即缓存是在写入内存之前的那么缓存与主内存进行数据交换的基本单位叫Cache Line缓存行在64位x86架 构中缓存行是64字节也就是8个Long型的大小这也意味着当缓存失效也称为更新专业名词一般我们称为失效代表原来的对应缓存直接没了所以我们称为失效要刷新到主内存的时候最少要刷新64字节 如下图所示主内存中有变量X、Y、Z假设每个变量都是一个Long型被CPU1Core0和CPU2Core1分别读入自己的缓 存放在了同一行Cache Line里面当CPU1修改了X变量它要失效整行Cache Line也就是说对应的缓存要更新了但是他确直接更新整个行而不是对应的一个操作即变量然后也就是往总线上发消息通 知CPU 2对应的Cache Line失效由于Cache Line是数据交换的基本单位无法只失效X要失效就会失效整行的Cache Line这会导致Y、Z变量的缓存也失效 虽然只修改了X变量本应该只失效更新X变量的缓存但Y、Z变量也随之失效Y、Z变量的数据没有修改本应该 很好地被 CPU1 和 CPU2 共享使得缓存存在让效率变高却没做到那么效率会变低因为要去内存找了这就是所谓的伪共享问题注意上面是没有解决的方式所以你可能会感觉到不适应看后面就好了 问题的原因是Y、Z和X变量处在了同一行Cache Line里面要解决这个问题需要用到所谓的缓存行填充 分别在X、Y、Z后面加上7个无用的Long型填充整个缓存行让X、Y、Z处在三行不同的缓存行中如下图所示 声明一个jdk.internal.vm.annotation.Contended即可实现缓存行的填充之所以这个地方要用缓存行填 充是为了不让Cell[]数组中相邻的元素落到同一个缓存行里那么由于他们都各自不是同一个行所以不会互相影响但是这样需要更多的空间即使用空间来提高效率因为缓存没了效率会变低的 LongAdder核⼼实现 上面基本都是优化的方式现在回归正题他是如何实现原子的即对应的加和减操作的原子因为我们是与对应的AtomicLong是基本一样的操作而AtomicLong与AtomicInteger也说基本一样的操作 所以下面来看LongAdder最核⼼的累加方法add(long x)自增、自减操作都是通过调用该方法实现的因为我们可以操作正数和负数使得操作自增和自减并且他不是固定像AtomicLong或者AtomicInteger一样的操作1或者-1而是自定义的 public void add(long x) { //一般来说这个参数是自定义的Cell[] cs; long b, v; int m; Cell c;if ((cs cells) ! null || !casBase(b base, b x)) { //第一次尝试boolean uncontended true;if (cs null || (m cs.length - 1) 0 ||(c cs[getProbe() m]) null ||!(uncontended c.cas(v c.value, v x))) //第二次尝试longAccumulate(x, null, uncontended);}}public void increment() {add(1L);}public void decrement() {add(-1L);}//下面都是对应父类Striped64的内容因为父类执行子类子类没有就使用父类的而子类也的确没有 final boolean casBase(long cmp, long val) {return BASE.compareAndSet(this, cmp, val);}private static final VarHandle BASE;//下面是Striped64里面的内部类Cell的内容final boolean cas(long cmp, long val) {return VALUE.compareAndSet(this, cmp, val);}private static final VarHandle VALUE;当一个线程调用add(x)的时候⾸先在对应数值为null下会尝试使用casBase把x加到base变量上如果不成功因为!所以不成功就为true即往后执行但是就不会继续执行另外一个CAS了因为cs null 为true对于||来说后面的不操作若对于的不为null那么第一次不会尝试那么考虑再用 c.cas(…)方 法尝试把 x 加到 Cell 数组的某个元素上如果还不成功最后再调用longAccumulate(…)方法所以对应的cell在null或者不为null下都有对应的CAS操作并且CAS只会有一个执行而不会重复操作 注意Cell[]数组的大小始终是2的整数次方在运行中会不断扩容每次扩容都是增长2倍上面代码中的cs[getProbe() m] 其实就是对数组的大小取模因为mcs.length–1getProbe()为该线程⽣成一个随机数 用该随机数对数组的长度取模因为数组长度是2的整数次方所以可以用操作来优化取模运算对于一个线程来说它并不在意到底是把x累加到base上面还是累加到Cell[]数组上面只要累加成功就可 以因此这里使用随机数来实现Cell的长度取模 如果对应的尝试都是不成功的通常则都会调用 longAccumulate(…)方法该方法在 Striped64 里面其中LongAccumulator也会 用到LongAdder、LongAccumulator如下所示 final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h getProbe()) 0) {ThreadLocalRandom.current(); // force initializationh getProbe();wasUncontended true;}// 若是true表示最后一个slot⾮空boolean collide false; // True if last slot nonemptydone: for (;;) {Cell[] cs; Cell c; int n; long v;// 如果cells不是null且cells长度大于0if ((cs cells) ! null (n cs.length) 0) {// cells最大下标对随机数取模得到新下标// 如果此新下标处的元素是nullif ((c cs[(n - 1) h]) null) {// 自旋锁标识用于创建cells或扩容cells// 尝试添加新的Cellif (cellsBusy 0) { // Try to attach new CellCell r new Cell(x); // Optimistically create// 如果cellsBusy为0则CAS操作cellsBusy为1获取锁if (cellsBusy 0 casCellsBusy()) {try { // Recheck under lock// 获取锁之后再次检查Cell[] rs; int m, j;if ((rs cells) ! null (m rs.length) 0 rs[j (m - 1) h] null) {// 赋值成功返回rs[j] r;break done;}} finally {// 重置标志位释放锁cellsBusy 0;}// 如果slot⾮空则进入下一次循环continue; // Slot is now non-empty}}collide false;}// CAS操作失败else if (!wasUncontended) // CAS already known to fail// rehash之后继续因为是无限循环wasUncontended true; // Continue after rehashelse if (c.cas(v c.value,(fn null) ? v x : fn.applyAsLong(v, x)))break;else if (n NCPU || cells ! cs)collide false; // At max size or staleelse if (!collide)collide true;else if (cellsBusy 0 casCellsBusy()) {try {// 扩容每次都是上次的两倍长度if (cells cs) // Expand table unless stalecells Arrays.copyOf(cs, n 1);} finally {cellsBusy 0;}collide false;continue; // Retry with expanded table}h advanceProbe(h);}// 如果cells为null或者cells的长度为0则需要初始化cells数组// 此时需要加锁进行CAS操作else if (cellsBusy 0 cells cs casCellsBusy()) {try { // Initialize tableif (cells cs) {// 实例化Cell数组实例化Cell保存x值Cell[] rs new Cell[2];// h为随机数对Cells数组取模赋值新的Cell对象rs[h 1] new Cell(x);cells rs;break done;}} finally {// 释放CAS锁cellsBusy 0;}}// 如果CAS操作失败最后回到对base的操作// 判断fn是否为null如果是null则执行加操作否则执行fn提供的操作// 如果操作失败则重试for循环流程成功就退出循环// Fall back on using baseelse if (casBase(v base,(fn null) ? v x : fn.applyAsLong(v, x)))break done;}} 由于自旋的存在无限循环使得失败时进行阻塞那么当对应操作完毕后由于会解除阻塞那么通常进行添加那么也会操作扩容 面LongAccumulator LongAccumulator的原理和LongAdder类似只是功能更强大下面为两者构造方法的对比 public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {this.function accumulatorFunction;base this.identity identity; //手动设置初始化值}LongAdder只能进行累加操作并且初始值默认为0LongAccumulator可以自己定义一个二元操作符并且 可以传入一个初始值 FunctionalInterface public interface LongBinaryOperator {/*** Applies this operator to the given operands.** param left the first operand* param right the second operand* return the operator result*/long applyAsLong(long left, long right); //我们将这个称为二元操作符 }操作符的左值就是base变量或者Cells[]中元素的当前值右值就是add()方法传入的参数x即可以选择一个进行操作 下面是LongAccumulator的accumulate(x)方法与LongAdder的add(x)方法类似最后都是调用的Striped64的LongAccumulate(…)方法 public void accumulate(long x) {Cell[] cs; long b, v, r; int m; Cell c;if ((cs cells) ! null|| ((r function.applyAsLong(b base, x)) ! b !casBase(b, r))) { //这里利用的了function而function就是前面的this.function accumulatorFunction;由于他是我们传递的所以对应的操作我们需要自己编写但是这比较麻烦并且不知道如何动手那么一般我们不会使用这个具体如何动手可以百度查看boolean uncontended true;if (cs null|| (m cs.length - 1) 0|| (c cs[getProbe() m]) null|| !(uncontended (r function.applyAsLong(v c.value, x)) v|| c.cas(v, r)))longAccumulate(x, function, uncontended); //也是这样的调用}}唯一的差别就是LongAdder的add(x)方法调用的是casBase(b, bx)这里调用的是casBase(b, r)其中rfunction.applyAsLong(bbase, x)也就是自己再次的操作修改值可以用来满足某些条件比如什么值不能设置等等比如不能是100 DoubleAdder与DoubleAccumulator DoubleAdder 其实也是用 long 型实现的因为没有 double 类型的 CAS 方法下面是DoubleAdder的add(x)方法和LongAdder的add(x)方法基本一样只是多了long和double类型的相互转换 public void add(double x) {Cell[] cs; long b, v; int m; Cell c;if ((cs cells) ! null ||!casBase(b base,Double.doubleToRawLongBits(Double.longBitsToDouble(b) x))) {boolean uncontended true;if (cs null || (m cs.length - 1) 0 ||(c cs[getProbe() m]) null ||!(uncontended c.cas(v c.value,Double.doubleToRawLongBits(Double.longBitsToDouble(v) x))))doubleAccumulate(x, null, uncontended);}}其中的关键Double.doubleToRawLongBits(Double.longBitsToDouble(b) x)在读出来的时候它把 long 类型转换成 double 类型然后进行累加累加的结果再转换成 long 类型通过CAS写回去这是保证double的相加完整因为浮点类型和整型类型的互相操作如相加需要进行转换否则报错编译期报错自然会使得运行时或者点击运行后报错而我们操作的是double类型那么操作之前就要变成与我们的一致这样就不会使得省略一些数据虽然反过来的结果一样但也只是防止而已 DoubleAccumulate也是Striped64的成员方法和longAccumulate类似也是多了long类型和double类型的 互相转换 DoubleAccumulator和DoubleAdder的关系与LongAccumulator和LongAdder的关系类似只是多了一个 二元操作符 Lock与Condition 互斥锁 锁的可重入性 可重入锁是指当一个线程调用 object.lock()获取到锁进入临界区后再次调用object.lock()仍然可以获取 到该锁显然通常的锁都要设计成可重入的否则就会发⽣死锁 synchronized关键字就是可重入锁如下所示 在一个synchronized方法method1()里面调用另外一个synchronized方法method2()如果synchronized关键 字不可重入那么在method2()处就会发⽣阻塞这显然不可行 public void synchronized method1() {// ...method2();// ...}public void synchronized method2() {// ...} 类继承层次 在正式介绍锁的实现原理之前先看一下 Concurrent 包中的与互斥锁ReentrantLock相关类之间的继承 层次如下图所示 上面虚线的白色指向代表是其子类但是是实现接口的方式而黑色的指向代表是其内部类单纯白色实线不是虚线的指向是其子类但是是继承类的方式 Lock是一个接口其定义如下 public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition(); }常用的方法是lock()/unlock()lock()不能被中断对应的lockInterruptibly()可以被中断 ReentrantLock本身除了内部类外没有代码逻辑基本实现都在其内部类Sync中 public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;public void lock() {sync.acquire(1); //一般来说对应的方法里面都有Sync}public void unlock() {sync.release(1);}// ...} 锁的公平性vs⾮公平性 Sync是一个抽象类它有两个子类FairSync与NonfairSync分别对应公平锁和⾮公平锁从下面的ReentrantLock构造方法可以看出会传入一个布尔类型的变量fair指定锁是公平的还是⾮公平的默认为⾮公平 的 public ReentrantLock() {sync new NonfairSync(); }public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync(); }//在前面中的Semaphore基本与这里类似也是分为公平和不公平注意公平一般代码队列的特性而不公平一般代表栈的特性但是这些只是极少数的一般来说公平代表对应关系或者说有顺序的关系而不公平代表没有绝对的顺序或者开始有顺序但是操作时没有顺序通常lock和synchronized就是这样而之前的SynchronousQueue一个阻塞队列的说明就是队列和栈的特性 什么叫公平锁和⾮公平锁呢先举个现实⽣活中的例子一个人去⽕⻋站售票窗口买票发现现场有人排队 于是他排在队伍末尾遵循先到者优先服务的规则这叫公平如果他去了不排队直接冲到窗口买票这叫作不 公平一般来说操作了对应的公平的线程叫做公平锁而操作不公平的线程叫做不公平锁其中lock和synchronized默认是不公平的lock可以设置公平但是synchronized基本不能为了让你理解他们两个的公平说明这里给出例子在比赛跑步是总共有100米现在我们将小明放在50米小军放在0米很明显小明在前面如果是公平的那么我们在小明后面放一个墙壁可以认为是其他的方法判断由于CAS存在就算多个线程进入后续的判断中的CAS基本也只能一个成功所以该方法也相当于是CAS操作了随着小明移动那么就算小军速度再快由于墙壁的原因那么他始终不能超过小明这就是公平如果是不公平的那么没有墙壁即对应的小军可以超过小明即更快的选择冠军很明显如果是不公平的那么一般可以更快的选择冠军因为在公平的情况下快的确有可能需要等待慢的所以通常我们也说不公平效率比公平效率高因为不公平冠军基本更加容易出现之所以说基本是因为如果第一名本来是最快的那么是否公平基本都是一样的时间得到冠军但是这个只有第一名最快使得是否公平效率都一样但是其他情况不公平效率都比公平效率高所以一般来说lock和synchronized默认都是不公平的 对应到锁的例子一个新的线程来了之后看到有很多线程在排队自己排到队伍末尾有墙壁这叫公平线程来了 之后直接去抢锁没有墙壁这叫作不公平默认设置的是⾮公平锁其实是为了提高效率减少线程切换 锁实现的基本原理 Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器AQS这个类⾮常重要该类的父类是AbstractOwnableSynchronizer由于大多数操作都会使用AbstractQueuedSynchronizer来进行操作阻塞队列只是大多数有些不是比如前面说明的ConcurrentLinkedQueue类所以一般我们将阻塞的队列称为AQS此处的锁具备synchronized功能既然只是具备自然我们需要使用CAS即可以阻塞一个线程为了实现一把具有阻塞或唤醒功能的锁需要⼏个核 ⼼要素 1需要一个state变量标记该锁的状态state变量⾄少有两个值0、1是至少后面会说明对state变量的操作使用CAS保 证线程安全 2需要记录当前是哪个线程持有锁 3需要底层⽀持对一个线程进行阻塞或唤醒操作 4需要有一个队列维护所有阻塞的线程这个队列也必须是线程安全的无锁队列也需要使用CAS 其中后面会在原语说明中进行CAS和synchronized的核心说明 针对要素1和2在上面两个类中有对应的体现 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {// ...private transient Thread exclusiveOwnerThread; // 记录持有锁的线程}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {private volatile int state; // 记录锁的状态通过CAS修改state的值。// ...}//从这里开始现在我只是操作具体的位置在前面我是很认真的让每个变量进行顺序现在不这样麻烦了因为是没有必要的 state取值不仅可以是0、1还可以大于1所以前面说明的是至少就是为了⽀持锁的可重入性例如同样一个线程调用5次lockstate会变成5然后调用5次unlockstate减为0 当state0时没有线程持有锁exclusiveOwnerThreadnull 当state1时有一个线程持有锁exclusiveOwnerThread该线程 当state 1时说明该线程重入了该锁 对于要素3Unsafe类提供了阻塞或唤醒线程的一对操作原语也就是park/unpark public native void unpark(Object thread);public native void park(boolean isAbsolute, long time); 有一个LockSupport的工具类对这一对原语做了简单封装 public class LockSupport {// ...private static final Unsafe U Unsafe.getUnsafe();public static void park() {U.park(false, 0L);}public static void unpark(Thread thread) {if (thread ! null)U.unpark(thread);}//具体位置实际上是unpark在park前面但是前面也说明了比较麻烦只需要在对应的里面即可所以这里就不改变了 } 我们可以发现他是一个工具类也就是说我们也能够进行使用他一般的可以直接操作并且是原子的或者最终会操作原语的主要调用我们称为原语操作类即可以操作原语的类一般synchronized代码块主要是靠monitorenter和monitorexit这两个原语来实现同步的但是我们也知道synchronized的原语有中间操作因为他操作JVM阻塞前面有过说明但是最终还是操作系统阻塞而CAS直接操作系统阻塞所以通常CAS效率比synchronized高那么换言之lock效率比synchronized高这是因为中间操作需要时间所以synchronized比较慢点其中这个中间操作也是原语的就算不是原语由于被原语包括那么我们也称为原语操作那么很明显CAS也就是类似于原语的一种因为底层也是使用原语来完成的native我们可以看一下上面的代码可以发现对应的park和unpark都是一个native方法具体如何操作原语那就不是java该做的了具体可以学习C/C或者汇编自己去学习吧或者其他可以操作原语的语言虽然基本没有 在当前线程中调用park()该线程就会被阻塞在另外一个线程中调用unpark(Thread thread)传入一 个被阻塞的线程就可以唤醒阻塞在park()地方的线程 unpark(Thread thread)它实现了一个线程对另外一个线程的精准唤醒notify也只是唤醒某一个线程但 无法指定具体唤醒哪个线程而所谓的精准唤醒指的是对应的解锁或者唤醒只是操作最近的一个锁或者指定一个解锁一般说明的是指定 虽然lock在抢占方面对应的是可能有顺序的但是在唤醒方面可以认为是在对应的线程对象中进行操作 其中对于可重入操作可以认为内部加上一个阻塞或者说无限循环或者是某种标识认为没有释放而对应的释放只是释放该一个无限循环或者只是减少某个值当最后一个或者说释放唤醒的对应值为0了那么才会进行真正的释放一般我们只会考虑标识的操作在后面的unlock的说明中会体现其中在抢占锁后会删除对应的线程对象或者释放锁后对应的某个变量设置为null在后面的unlock会说明这个线程对象在后面会说明的 针对要素4在AQS中利用双向链表和CAS实现了一个阻塞队列如下所示 public abstract class AbstractQueuedSynchronizer {// ...static final class Node {volatile Thread thread; // 每个Node对应一个被阻塞的线程volatile Node prev;volatile Node next;// ...}private transient volatile Node head;private transient volatile Node tail;// ...} 阻塞队列是整个AQS核⼼中的核⼼如下图所示head指向双向链表头部tail指向双向链表尾部入队就是 把新的Node加到tail后面然后对tail进行CAS操作最终tail指向新加入的线程对象这里就是前面说的线程对象出队就是对head进行CAS操作然后把head向后移一个位置 初始的时候headtailNULL然后在往队列中加入阻塞的线程时会新建一个空的Node让head和tail都指向这个空Node之后在后面加入被阻塞的线程对象tail然后指向他所以当headtail的时候说明队列为空 公平与⾮公平的lock()实现差异 下面分析基于AQSReentrantLock在公平性和⾮公平性上的实现差异 //首先说明不公平的操作 static final class NonfairSync extends Sync {private static final long serialVersionUID 7316153563782823691L;protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}//从这里开始我就不给出这些方法是那些类调用的了因为没有必要反正基本是最子类的说明 ReservedStackAccessfinal boolean nonfairTryAcquire(int acquires) {//此处没有考虑队列中有没有其他线程直接考虑使用当前线程去进行获取抢占锁不排队所以不公平final Thread current Thread.currentThread();int c getState();if (c 0) {//如果state为0直接将当前线程设置为排他线程抢到的话同时设置state的值if (compareAndSetState(0, acquires)) {//一般来说只要修改了那么必然会改变int c的结果从而实现锁的操作setExclusiveOwnerThread(current); //记录持有锁的线程/*protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread thread;}private transient Thread exclusiveOwnerThread; //知道是哪个线程*/return true;}}//如果state不是0但是当前线程是排他线程那么就是可重入操作了则直接设置state即可因为他是抢到的自然只有他一个人所以不用考虑CAS操作所以下面是直接的设置else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0) // overflowthrow new Error(Maximum lock count exceeded);setState(nextc);/*直接的设置protected final void setState(int newState) {state newState;} private volatile int state;*/return true;}return false; //没有抢到返回false获取失败因为没有设置成功那么返回false自然到这里来了}//这里说明公平的操作 ReservedStackAccessprotected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();if (c 0) {//如果state为0且队列中没有等待的线程考虑排队即公平则设置当前线程是排他线程同时设置state的值if (!hasQueuedPredecessors() compareAndSetState(0, acquires)) { //hasQueuedPredecessors()是操作是否公平的主要代码这里就不说明了这里代表操作公平的前提而没有自然就是不公平的具体就不说明了/*protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread thread; //设置其是排他线程即抢到锁了}*/setExclusiveOwnerThread(current);return true;}}//如果排他线程是当前线程那么就是可重入了直接设置state值即可不考虑CAS操作所以下面是直接的设置else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;}}阻塞队列与唤醒机制 下面进入锁的最为关键的部分即acquireQueued(…)方法内部一探究竟 public void lock() {sync.acquire(1);}public final void acquire(int arg) { //参数一般是1代表只有该线程操作即一个线程因为一个线程只有一个也就是1if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //Node.EXCLUSIVE默认是nullstatic final Node EXCLUSIVE null;selfInterrupt(); /*static void selfInterrupt() {Thread.currentThread().interrupt();}*/} //注意这里说明公平问题的最关键问题这里理解了就能知道公平与队列对于抢占或者释放的具体了解了首先我们要知道对应的阻塞队列是保存对应公平和不公平的队列这是主要的原因就如之前的SynchronousQueue类似是根据已有队列来操作的因为经过对应的公平和不公平那么阻塞队列就是其最终结果因为公平和不公平只是操作抢占而不操作结果所以可以认为在操作是否公平之后才会放入队列而不是以队列来决定是否公平的抢也就是谁抢到就先给谁先放入队列那么操作时他就会先释放即当然这个抢我们操作是否公平先说addWaiter(…)方法就是为当前线程⽣成一个Node然后把Node放入双向链表的尾部要注意的是这 只是把Thread对象放入了一个队列中而已线程本身并未阻塞 private Node addWaiter(Node mode) {Node node new Node(mode);/*Node(Node nextWaiter) {this.nextWaiter nextWaiter;THREAD.set(this, Thread.currentThread());}private static final VarHandle THREAD;*/for (;;) {Node oldTail tail;if (oldTail ! null) {node.setPrevRelaxed(oldTail); //设置prev的指向后面会将对应的tail到下一个节点这里是set方法一般代表指向对方的值而不是设置成对应的值p当然若说成tail的设置也可因为注解和自身类原因所以可以以固定的tail为准的所以一般CAS改变的是tail的值那么这里就是设置自己的prev为tail/*private static final VarHandle PREV;final void setPrevRelaxed(Node p) {PREV.set(this, p); //即prev的设置}*///看看是否抢到只要抢到了那么进行真正的添加因为上面只是头prev现在是尾部next//他没有操作阻塞因为没有操作自旋或者没有自己给出cpu资源一般来说CAS都是操作自旋的而不会操作给出cpu资源虽然也可以设置给出cpu资源一般synchronized是给出的而lock基本也是一样的实际上好像没有因为如果你需要等待超长的时间还不如切换好所以一般来说自旋都会有临界值当然没有也行这只是可选项比如synchronized一般就有而lock一般没有所以才说实际上好像没有if (compareAndSetTail(oldTail, node)) { //改变tail的值那么原来的oldTail就是头的指向对象oldTail.next node; //头对应的指向对象自然设置next的指向return node;}} else {initializeSyncQueue();}}}//private static final VarHandle HEAD; private final void initializeSyncQueue() {Node h;if (HEAD.compareAndSet(this, null, (h new Node())))tail h; //初始化得到了一个空的new Node()由于无限循环会继续操作当然这里也只能初始化一次所以也需要CAS总不能所有的线程都初始化吧}创建节点尝试将节点追加到队列尾部获取tail节点将tail节点的next设置为当前节点 如果tail不存在或者说为null就初始化队列创建一个空线程对象 在addWaiter(…)方法把Thread对象加入阻塞队列而之后的阻塞工作就要靠acquireQueued(…)方法完成线程一旦进 入acquireQueued(…)就会被无限期阻塞即使有其他线程调用interrupt()方法也不能将其唤醒除⾮有其他线程 释放了锁并且该线程拿到了锁才会从accquireQueued(…)返回 进入acquireQueued(…)该线程被阻塞在该方法返回的一刻就是拿到锁的那一刻也就是被唤醒的那一 刻此时会删除队列的第一个元素head指针前移1个节点那么我们看一下acquireQueued方法 final boolean acquireQueued(final Node node, int arg) {boolean interrupted false;try {for (;;) {/*final Node predecessor() {Node p prev;if (p null)throw new NullPointerException();elsereturn p;}*/final Node p node.predecessor();if (p head tryAcquire(arg)) { //前面的公平和不公平就是操作tryAcquire这也是为什么先说明tryAcquire的原因只要你没有抢到那么不会进入这里但前提必须是对应的头是head否则不会进入因为对应的结果是最终的公平和不公平的结果所以第一个就一定是获取的线程setHead(node);p.next null; // help GCreturn interrupted;}if (shouldParkAfterFailedAcquire(p, node))interrupted | parkAndCheckInterrupt(); //阻塞的前提后面会说明的基本会到这里来因为对应基本返回true}} catch (Throwable t) {cancelAcquire(node);if (interrupted)selfInterrupt();throw t;}}⾸先acquireQueued(…)方法有一个返回值表示什么意思呢虽然该方法不会中断响应但它会记录被阻 塞期间有没有其他线程向它发送过中断信号如果有则该方法会返回true否则返回false除非被其他人又操作回来了当然他之所以是记录是因为他并没有进行处理即没有结束对应的方法操作而是继续执行方法 基于这个返回值才有了下面的代码 static void selfInterrupt() {Thread.currentThread().interrupt();}当 acquireQueued(…)返回 true 时会调用 selfInterrupt()自己给自己发送中断信号也就是自己把自己的 中断标志位设为true之所以要这么做是因为自己在阻塞期间收到其他线程中断信号没有及时响应现在要进 行补偿因为我还在阻塞当中虽然对应的原语是处理中断的但是可能还会阻塞所以如果对方操作中断并且又设置回去那么实际上我操作了中断但是没有响应虽然你设置回去了但是具体的变量我已经保存了你设置回去没有用我必须要操作中断因为中断的操作是严谨的并且你在中断时再次的设置回去需要的时间比我直接得到返回需要的时间长导致你只要操作了中断我保存的就是中断的true信息而不会让你多次的操作使得又回来变成false即不会使得再次的设置回去就算之后直接操作也是一样的只要我获得了锁那么会操作这个使得对应锁内部操作进行中断这样一来如果该线程在lock代码块内部有调用sleep()之类的阻塞方法就可以抛出异常响应该中断信 号 阻塞就发⽣在下面这个方法中 private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //使用了原语操作是主要调用所以这里称为原语操作return Thread.interrupted();}线程调用 park()方法自己把自己阻塞起来直到被其他线程唤醒该方法返回 park()方法返回有两种情况 1其他线程调用了unpark(Thread t) 2其他线程调用了t.interrupt()t代表对应的线程因为中断基本只能有对方自身调用这里要注意的是lock()不能响应中断但LockSupport.park()会响应中 断因为lock他并没有对应的报错信息而你的报错只是解除阻塞而已即内部操作了没有给lock带来影响所以说lock不响应中断但是他只是没有直接响应而已而是自己给内部代码进行响应但是内部代表又不是lock所以综上所述lock的确没有中断响应操作也正因为LockSupport.park()可能被中断唤醒acquireQueued(…)方法才写了一个for死循环唤醒之后如 果发现自己排在队列头部就去拿锁如果拿不到锁则再次自己阻塞自己不断重复此过程直到拿到锁简单来说就是自旋然后阻塞这个自旋就是主要的操作 被唤醒之后通过Thread.interrupted()来判断是否被中断唤醒如果是情况1会返回false如果是情况2 则返回true public final void acquire(int arg) {//若只有一个自然直接的是false因为自然他必然是抢到锁的否则要考虑添加阻塞队列然后在里面进行操作抢占锁的操作了所以里面也有tryAcquire方法前面的源码里有体现即这个acquireQueued方法里面就有if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();} unlock()实现分析 说完了lock下面分析unlock的实现unlock不区分公平还是⾮公平没有对应的tryAcquire操作即没有具体操作是否公平的操作 public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;}上面代码中当前线程要释放锁先调用tryRelease(arg)方法如果返回true则取出head让head获取锁 对于tryRelease方法 ReservedStackAccessprotected final boolean tryRelease(int releases) {int c getState() - releases;/*protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread; //前面设置的排他线程}*/if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;if (c 0) { //真正的释放的前提对应变成true否则还没有这里考虑可重入free true;setExclusiveOwnerThread(null); //重置也就是说当前线程离开了所以实际上是在释放或者唤醒其他线程或者唤醒所有线程之前当前线程给出锁在synchronized也是如此只不过他们是一起的所以统称为释放或者唤醒而后续才会操作真正的释放}setState(c);return free;}⾸先计算当前线程释放锁后的state值如果当前线程不是排他线程则抛异常因为只有获取锁的线程才可以进行释放锁的操作此时设置state没有使用CAS因为是单线程操作 再看unparkSuccessor方法 private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws node.waitStatus;if (ws 0)node.compareAndSetWaitStatus(ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node p tail; p ! node p ! null; p p.prev)if (p.waitStatus 0)s p;}if (s ! null)LockSupport.unpark(s.thread); //操作唤醒head节点的线程}release()里面做了两件事tryRelease(…)方法释放锁unparkSuccessor(…)方法唤醒队列中的后继者头节点因为第一个就是所以综上所述实际上公平和不公平只是决定了阻塞队列的顺序但是具体的顺序还是固定的所以前面也认为公平问题是对抢占问题的描述而不是对获取锁的描述所以当出现了lock或者synchronized他们是有顺序的说明那么这是对阻塞队列顺序的说明因为经历了多个线程的不规则进入导致的顺序所以如果出现说他们的抢占是随机的那么是产生阻塞队列之前的抢占如果是说有顺序的那么说明的是抢占只后的阻塞队列即这样的说明 lockInterruptibly()实现分析 上面的 lock 不能被中断这里的 lockInterruptibly可以被中断 public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1); //可中断的获取锁}public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}这里的 acquireInterruptibly(…)也是 AQS 的模板方法里面的 tryAcquire(…)也是分别被 FairSync和NonfairSync实现 主要看doAcquireInterruptibly(…)方法 private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node addWaiter(Node.EXCLUSIVE);try {for (;;) {final Node p node.predecessor();if (p head tryAcquire(arg)) {setHead(node);p.next null; // help GCreturn;}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())throw new InterruptedException();//收到中断信号直接抛出异常不阻塞直接返回在try里抛出那么自然操作try里面的捕获操作}} catch (Throwable t) {cancelAcquire(node);throw t;}}当parkAndCheckInterrupt()返回true的时候说明有其他线程发送中断信号直接抛出InterruptedException跳出for循环整个方法返回介绍也就没有阻塞了 tryLock()实现分析 public boolean tryLock() {return sync.nonfairTryAcquire(1); //直接调用非公平锁的对于操作}tryLock()实现基于调用⾮公平锁的tryAcquire(…)对state进行CAS操作如果操作成功就拿到锁如果操作不 成功则直接返回false也不阻塞也没有中断的处理也就只能抢一次只能不会可以再操作了即他并没有真正的操作是否公平因为他是固定的 读写锁 和互斥锁相比读写锁ReentrantReadWriteLock就是读线程和读线程之间不互斥读读不互斥读写互斥写写互斥而不是与互斥锁一样基本所有都互斥 类继承层次 ReadWriteLock是一个接口内部由两个Lock接口组成 public interface ReadWriteLock {Lock readLock();Lock writeLock(); }同理黑色指向是其内部类白色指向是其子类是继承因为不是虚线若是虚线那么就是实现 ReentrantReadWriteLock实现了该接口使用方式如下之前操作过ReentrantLock的大致使用方式所以就没有给出具体的ReentrantLock的使用方式了但这个ReentrantReadWriteLock没有使用过所以这里就给出ReentrantReadWriteLock的使用方式 ReadWriteLock readWriteLock new ReentrantReadWriteLock(); Lock readLock readWriteLock.readLock(); readLock.lock(); // 进行读取操作 readLock.unlock(); Lock writeLock readWriteLock.writeLock(); writeLock.lock(); // 进行写操作 writeLock.unlock(); 也就是说当使用 ReadWriteLock 的时候并不是直接使用而是获得其内部的读锁和写锁然后分别调lock/unlock 读写锁实现的基本原理 从表面来看ReadLock和WriteLock是两把锁实际上它只是同一把锁的两个视图而已什么叫两个视图呢 可以理解为是一把锁但是却将线程分成两类读线程和写线程读线程和写线程之间互斥一把锁要么写线程获取要 么读线程获取读线程之间不互斥写线程之间互斥 从下面的构造方法也可以看出readerLock和writerLock实际共用同一个sync对象sync对象同互斥锁一样 分为⾮公平和公平两种策略并继承自AQS public ReentrantReadWriteLock() {this(false); //一般来说锁基本都操作非公平的因为效率高 }public ReentrantReadWriteLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();readerLock new ReadLock(this);writerLock new WriteLock(this);//很明显要么都公平要么都不公平 } 同互斥锁一样读写锁也是用state变量来表示锁状态的只是state变量在这里的含义和互斥锁完全不同在 内部类Sync中对state变量进行了重新定义如下所示 abstract static class Sync extends AbstractQueuedSynchronizer {// ...static final int SHARED_SHIFT 16;static final int SHARED_UNIT (1 SHARED_SHIFT);static final int MAX_COUNT (1 SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK (1 SHARED_SHIFT) - 1;// 持有读锁的线程的重入次数static int sharedCount(int c) { return c SHARED_SHIFT; }// 持有写锁的线程的重入次数static int exclusiveCount(int c) { return c EXCLUSIVE_MASK; }//既然能获得次数自然可以根据这个次数来判断代表对应的锁是什么锁了0自然是没有的大于0自然是有的// ...} 也就是把 state 变量int类型有32位拆成两半注意并不是真正的两半只是重新定义而已或者说操作我定义的变量而不是只操作state变量或者不操作state变量当然由于是定义可能并没有使用他一般来说都会使用即可能还是操作state因为定义了并非一定使用低16位用来记录写锁但同一时间既然只能有一个线程写为什么还需要16位呢这是因为一个写线程可能多次重入例如低16位的值等于5表示一个写线程重入了5次 高16位用来读锁例如高16位的值等于5既可以表示5个读线程都拿到了该锁也可以表示一个读线程 重入了5次 为什么要把一个int类型变量拆成两半而不是用两个int型变量分别表示读锁和写锁的状态呢 这是因为无法用一次CAS 同时操作两个int变量一般只能操作一个所以用了一个int型的高16位和低16位分别表示读锁和写锁的 状态 当state0时说明既没有线程持有读锁也没有线程持有写锁当state ! 0时要么有线程持有读锁要么 有线程持有写锁两者不能同时成立因为读和写互斥这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁 AQS面的两对模板方法 下面介绍在ReentrantReadWriteLock的两个内部类ReadLock和WriteLock中是如何使用state变量的 public static class ReadLock implements Lock, java.io.Serializable {// ...public void lock() {sync.acquireShared(1);}public void unlock() {sync.releaseShared(1);}// ...}public static class WriteLock implements Lock, java.io.Serializable {// ...public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}// ...} acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法互斥锁和读写锁的写锁都是基 于acquire/release模板方法来实现的读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现 的这两对模板方法的代码如下 //写锁互斥锁 public void lock() {sync.acquire(1);}public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;}//读锁public void lock() {sync.acquireShared(1);} public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0)doAcquireShared(arg);}public void unlock() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;} 其中他们又可以操作是否公平那么我们可以将读/写、公平/⾮公平进行排列组合就有4种组合如下图所示上面的两个方法都是在Sync中实现的Sync中的两个方法又是模板方法在NonfairSync和FairSync中分别有实现最终的对应关系如下 1读锁的公平实现Sync.tryAccquireShared()FairSync中的两个重写的子方法只有其中一个主要操作读的 2读锁的⾮公平实现Sync.tryAccquireShared()NonfairSync中的两个重写的子方法只有其中一个主要操作读的 3写锁的公平实现Sync.tryAccquire()FairSync中的两个重写的子方法只有其中一个主要操作写的 4写锁的⾮公平实现Sync.tryAccquire()NonfairSync中的两个重写的子方法只有其中一个主要操作写的 白色的指向是其子类继承关系 static final class NonfairSync extends Sync {private static final long serialVersionUID -8159625535654395037L;// 写线程抢锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁时永远不被阻塞⾮公平锁前提是考虑了对应是否是写或者读return false;}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程抢锁的时候当队列中第一个元素是写线程的时候要阻塞也就是说若你是写线程我就阻塞否则你也得到锁因为这里可不是互斥锁了他考虑多个人进入了考虑不公平的情况下也要考虑是否互斥//通常情况下在没有写锁时读锁可能直接阻塞或者返回默认值所以这里只会考虑阻塞的情况即抢锁的时候不是抢锁之前return apparentlyFirstQueuedIsExclusive();} }static final class FairSync extends Sync {private static final long serialVersionUID -2274990926593161451L;// 写线程抢锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁时如果队列中有其他线程在排队则阻塞公平锁return hasQueuedPredecessors();}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程在抢锁之前如果队列中有其他线程在排队阻塞公平锁由于是公平只要有就阻塞所以无论对应是否互斥都可以认为是包含的return hasQueuedPredecessors(); } } //即再满足是否公平的情况下再考虑是否互斥所以以上面的公平锁为例在一开始抢的时候若你有线程那么我阻塞在阻塞队列的就是阻塞的当你释放时我直接进行释放锁都释放了自然不用考虑他们释放互斥了所以说上面的是包含以不公平锁为例由于也是阻塞队列但是对应的是可以多人进入的那么看起得到的锁是否是互斥的按照队列顺序对于公平比较容易理解不论是读锁还是写锁只要队列中有其他线程在排队排队等读锁或者排队等 写锁就不能直接去抢锁要排在队列尾部 对于⾮公平读锁和写锁的实现策略略有差异 写线程能抢锁前提是state0只有在没有其他线程持有读锁或写锁的情况下它才有机会去抢锁或者state ! 0但那个持有写锁的线程是它自己再次重入写线程是⾮公平的那么即writerShouldBlock()方法一直返 回false 对于读线程假设当前线程被读线程持有然后其他读线程还⾮公平地一直去抢可能导致写线程永远拿不到 锁所以对于读线程的⾮公平要做一些约束当发现队列的第1个元素是写线程的时候读线程也要阻塞不能 直接去抢即偏向写线程因为对写锁来说写锁和读锁都不能抢但是对于读锁来说写锁不能抢但是读锁可以所以写锁的不能抢是多个的即偏向写锁的互斥 综上所述在满足对于的阻塞队列的添加下然后操作释放时看看后面的锁是否互斥若不互斥那么他也能得到锁这是读写锁的根本操作 WriteLock公平vs⾮公平实现 写锁是排他锁实现策略类似于互斥锁 public boolean tryLock() {return sync.tryWriteLock();}ReservedStackAccessfinal boolean tryWriteLock() {Thread current Thread.currentThread();int c getState();if (c ! 0) {//当state不是0的时候如果写线程获取锁的个数是0或者写线程不是当前线程则返回抢锁失败int w exclusiveCount(c);if (w 0 || current ! getExclusiveOwnerThread())return false;if (w MAX_COUNT)throw new Error(Maximum lock count exceeded);}//只要不是上面的情况则通过CAS设置state的值如果设置成功就将排他线程设置成当前线程并返回trueif (!compareAndSetState(c, c 1))return false;setExclusiveOwnerThread(current);return true;}lock()方法 public void lock() {sync.acquire(1);}public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();} //在互斥锁部分大致讲过了虽然对应的tryAcquire方法内容不同但是他只是操作对应的写的操作而已操作是否公平这里就不在说明所以才说写锁实际上就是完全互斥也就是互斥锁的操作的tryLock方法不区分公平/⾮公平因为他基本并没有操作是否公平的方法来完成是否公平的操作 unlock()实现分析 public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;}//unlock()方法不区分公平/⾮公平 //与之前的自然是一样的因为是互斥锁的操作ReadLock公平vs⾮公平实现 读锁是共享锁其实现策略和排他锁有很大的差异 tryLock()实现分析 public boolean tryLock() {return sync.tryReadLock();}ReservedStackAccessfinal boolean tryReadLock() {// 获取当前线程Thread current Thread.currentThread();for (;;) {// 获取state值int c getState();// 如果是写线程占用锁在这个前提下自然需要判断当前线程是否是排他线程若是则往后走否则抢锁失败当前一般会继续循环一次的因为这里没有CAS这样才能使得他不操作阻塞因为对应的tryLock基本是不阻塞的这样的说明基本包括所有的情况比如前面说明的互斥锁写锁和这里的读锁都是这样当然由于他只是一个名称所以这里只是说明基本因为名称相同的操作也是可以阻塞的因为他是可以修改的因为既然是人写的自然可以人来修改if (exclusiveCount(c) ! 0 getExclusiveOwnerThread() ! current)return false;// 获取读锁state值int r sharedCount(c);// 如果获取锁的值达到极限则抛异常if (r MAX_COUNT)throw new Error(Maximum lock count exceeded);// 使用CAS设置读线程锁state值if (compareAndSetState(c, c SHARED_UNIT)) {// 如果r0则当前线程就是第一个读线程if (r 0) {firstReader current;// 读线程个数为1firstReaderHoldCount 1;// 如果写线程是当前线程} else if (firstReader current) {// 如果第一个读线程就是当前线程表示读线程重入读锁firstReaderHoldCount;} else {// 如果firstReader不是当前线程则从ThreadLocal中获取当前线程的读锁个数并设置当前线程持有的读锁个数HoldCounter rh cachedHoldCounter;if (rh null ||rh.tid ! LockSupport.getThreadId(current))cachedHoldCounter rh readHolds.get();else if (rh.count 0)readHolds.set(rh);rh.count;}return true;}}} //上面的了解即可public void lock() {sync.acquireShared(1);}public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0) //tryAcquireShared也是操作对应的是否公平这是读doAcquireShared(arg); //这个操作在前面也说明过类似的也就是acquireQueued} //所以从上面看主要是对应操作是否公平的代码导致的根本不同当然他们是偏向的所以有四种情况unlock()实现分析 public void unlock() {sync.releaseShared(1);}//在前面我们操作过类似的只不过对应的类是不同的自然操作不同 public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}tryReleaseShared()的实现 ReservedStackAccessprotected final boolean tryReleaseShared(int unused) {Thread current Thread.currentThread();// ... 这里省略了for (;;) {int c getState();int nextc c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc 0; //如果返回true代表都释放了说明会调用前面的doReleaseShared();来进行唤醒} } 因为读锁是共享锁多个线程会同时持有读锁所以对读锁的释放不能直接减1而是需要通过一个for循环CAS操作不断重试保证他们都是减的而不会出现某些问题因为有同步的问题这样就是防止出现虽然有间隙但是不一定是一直不相同的前面这个地方有具体说明因为可能操作时间长这是tryReleaseShared和tryRelease的根本差异所在 实际上读写锁的存在就是为了使得提高在读读的效率当然可能其他的情况效率会变低因为通常有对应的判断的所以通常来说读写锁是主要操作读读的后面也会说明写的饿死当然了互斥锁由于整体要好可能在一定的规模下我们还是会使用互斥锁因为这个时候可能读写锁的读读甚至比互斥锁要差因为可能太卡顿 Condition Condition与Lock的关系 Condition本身也是一个接口其功能和wait/notify类似如下所示 public interface Condition {void await() throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;long awaitNanos(long nanosTimeout) throws InterruptedException;void awaitUninterruptibly();boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll(); }wait()/notify()必须和synchronized一起使用Condition也必须和Lock一起使用因此在Lock的接口中有 一个与Condition相关的接口 public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;// 所有的Condition都是从Lock中构造出来的Condition newCondition();boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock(); } Condition的使用场景 以ArrayBlockingQueue为例前面说明过了如下所示为一个用数组实现的阻塞队列执行put(…)操作的时候队列满了 ⽣产者线程被阻塞执行take()操作的时候队列为空消费者线程被阻塞 public class ArrayBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {//...final Object[] items;int takeIndex;int putIndex;int count;// 一把锁两个条件final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity 0)throw new IllegalArgumentException();this.items new Object[capacity];// 构造器中创建一把锁加两个条件lock new ReentrantLock(fair);// 构造器中创建一把锁加两个条件notEmpty lock.newCondition();// 构造器中创建一把锁加两个条件notFull lock.newCondition();}public void put(E e) throws InterruptedException {Objects.requireNonNull(e);final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count items.length)// ⾮满条件阻塞队列容量已满notFull.await();enqueue(e);} finally {lock.unlock();}}private void enqueue(E e) {// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() 1;// assert items[putIndex] null;final Object[] items this.items;items[putIndex] e;if (putIndex items.length) putIndex 0;count;// put数据结束通知消费者⾮空条件notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count 0)// 阻塞于⾮空条件队列元素个数为0无法消费notEmpty.await();return dequeue();} finally {lock.unlock();}}private E dequeue() {// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() 1;// assert items[takeIndex] ! null;final Object[] items this.items;SuppressWarnings(unchecked)E e (E) items[takeIndex];items[takeIndex] null;if (takeIndex items.length) takeIndex 0;count--;if (itrs ! null)itrs.elementDequeued();// 消费成功通知⾮满条件队列中有空间可以⽣产元素了notFull.signal();return e;}// ...}具体前面也说明过了所以这里可以直接大致的过一遍 Condition实现原理 实际上Condition的使用很方便避免了wait/notify的⽣产者通知⽣产者、消费者通知消费者的问题具 体实现如下 由于Condition必须和Lock一起使用所以Condition的实现也是Lock的一部分⾸先查看互斥锁和读写锁中Condition的构造方法 public class ReentrantLock implements Lock, java.io.Serializable {// ...public Condition newCondition() {return sync.newCondition();} }public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {// ...private final ReentrantReadWriteLock.ReadLock readerLock;private final ReentrantReadWriteLock.WriteLock writerLock;// ...public static class ReadLock implements Lock, java.io.Serializable {// 读锁不⽀持Conditionpublic Condition newCondition() {// 抛异常throw new UnsupportedOperationException();//因为我不操作互斥那么就没有必要获取锁使得互斥}}public static class WriteLock implements Lock, java.io.Serializable {// ...public Condition newCondition() {return sync.newCondition();}// ...}// ...} ⾸先读写锁中的 ReadLock 是不⽀持 Condition 的读写锁的写锁或者说互斥锁就⽀持Condition虽然它们各 自调用的是自己的内部类Sync的但是内部类Sync里面的操作是创建AQS对应的内部类对象的因此上面的代码sync.newCondition最终得到了AQS里面的内部类对象即ConditionObject他是实现了Condition的 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {// Condition的所有实现都在ConditionObject类中} }abstract static class Sync extends AbstractQueuedSynchronizer {final ConditionObject newCondition() {return new ConditionObject();} } 每一个Condition对象上面都可能阻塞了多个线程因此在ConditionObject内部也有一个双向链表组成的队 列如下所示 public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter;private transient Node lastWaiter; }static final class Node {volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter; } 下面来看一下在await()/notify()方法中是如何使用这个队列的 await()实现分析 //对应的ConditionObject里面的因为是他这个对象来调用的 public final void await() throws InterruptedException {// 刚要执行await()操作时若在之前收到中断信号抛异常if (Thread.interrupted())throw new InterruptedException();// 加入Condition的等待队列Node node addConditionWaiter();// 阻塞在Condition之前必须先释放锁否则会死锁因为你没有操作完毕那么对应的锁也是没有操作完毕的即始终阻塞拿到锁的所以要提前释放int savedState fullyRelease(node); //释放int interruptMode 0;while (!isOnSyncQueue(node)) {//判断是否释放虽然他下面的是否会解除阻塞但是为了让他继续往后执行所以这里加上判断当然他操作的只是当前线程阻塞和释放所以不同线程之间基本是不会影响的所以到那时对应的代码之间不会出现问题// 阻塞当前线程LockSupport.park(this);//看看是否又中断if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}// 重新获取锁自然也就是在阻塞中获取记住因为与wait基本一样if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode ! 0)// 被中断唤醒抛中断异常reportInterruptAfterWait(interruptMode);}关于await有⼏个关键点要说明 1线程调用 await()的时候肯定已经先拿到了锁因为要操作对应的对象必然是在lock里面所以是拿到锁的所以在 addConditionWaiter()内部对这个双向链表 的操作不需要执行CAS操作线程天⽣是安全的代码如下 private Node addConditionWaiter() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node t lastWaiter;// If lastWaiter is cancelled, clean out.if (t ! null t.waitStatus ! Node.CONDITION) {unlinkCancelledWaiters();t lastWaiter;}Node node new Node(Node.CONDITION); //操作加入if (t null)firstWaiter node;elset.nextWaiter node;lastWaiter node;return node;}2在线程执行阻塞操作之前必须先释放锁也就是fullyRelease(node)否则会发⽣死锁这个他们直接的唤醒和释放的操作和wait/notify与synchronized的配合机制基本一样 3线程从wait阻塞这里只是代表阻塞的意思而不是对应的wait方法哦中被唤醒后必须用acquireQueued(node, savedState)方法重新拿锁 4checkInterruptWhileWaiting(node)代码在park(this)代码之后是为了检测在park期间是否收到过中断 信号当线程从park中醒来时有两种可能一种是其他线程调用了unpark另一种是收到中断信号因为对应的park是可以操作中断的而这里的await()方法是可以响应中断的操作后面的判断所以当发现自己是被中断唤醒的而不是被unpark唤醒的时会 直接退出while循环await()方法也会返回 5isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面初始的时候Node只在Condition的队列里而不在AQS的队列里但执行notity唤醒这里只是代表唤醒的意思而不是notity方法哦操作的时候会放进AQS的同步队列因为要去判断是否释放当被释放后他放入队列那么就不会继续操作阻塞了因为!的存在即!true就是false所以他可以认为是判断该线程是否被释放的主要操作即如果该队列存在说明已经释放了当然一般操作这里存在时他不只是知道释放并且也会清除释放的队列总不能一直存在吧主要看他是否这样操作了你可以看里面的源码即可自然放入队列 从上面看我们可以发现他也的确与wait和notity基本一样前面也提到过只是他可以定向的唤醒前面也说明了我们看源码也知道他是根据对应是否存在来决定的所以也就操作了定向了 当然了不能抢占已经放入队列的锁无论是否有操作公平 awaitUninterruptibly()实现分析 与await()不同awaitUninterruptibly()不会响应中断其方法的定义中不会有中断异常抛出下面分析其实 现和await()的区别 public final void awaitUninterruptibly() {Node node addConditionWaiter();int savedState fullyRelease(node);boolean interrupted false;while (!isOnSyncQueue(node)) {LockSupport.park(this);//如果线程唤醒后如果被中断过只是记录不处理即我们继续执行后面的操作即while循环if (Thread.interrupted())interrupted true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt(); //补偿操作与前面说明的补偿是一样的} 可以看出整体代码和 await()类似区别在于收到异常后不会抛出异常而是继续执行while循环 signal()实现分析 public final void signal() {// 只有持有锁的线程才有资格调用signal()方法同理对应的await也是如此if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)// 发起通知doSignal(first);} // 唤醒队列中的第1个线程因为队列是结果包括是否公平也是结果 private void doSignal(Node first) {do {if ( (firstWaiter first.nextWaiter) null)lastWaiter null;first.nextWaiter null;} while (!transferForSignal(first) (first firstWaiter) ! null);}final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 先把Node放入互斥锁的同步队列中再调用unpark方法Node p enq(node);int ws p.waitStatus;if (ws 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;} 与await()再次的继续操作一样在调用 signal()的时候必须先拿到锁否则就会抛出上面的异常是因为前面执行await()的时候把锁释放了所以若要继续操作自然要操作获得锁从而在对应阻塞的地方继续执行 然后从队列中取出firstWaiter唤醒它但在通过调用unpark唤醒它之前先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中也正因为如此才有了await()方法里面的判断条件while( ! isOnSyncQueue(node)) 这个判断条件满⾜说明await线程不是被中断而是被unpark唤醒的唤醒所有的方法signalAll与他类似主要的区别就是执行了多次的transferForSignal方法 最后注意在定义成员变量时无论是否静态包括静态块其中静态块里面创建的变量是静态块的操作完毕自动删除若有继续利用成员变量需要按照先后顺序 还有对应的CAS我们好像只是认为他进行了操作但是并没有给出具体案例若要看看如何具体操作你可以到这里查看https://jiuaidu.com/jianzhan/1006040/虽然前面说可能需要自带的某些操作可能是得到偏移量的操作一般都会直接得到但是他这也可以是算的虽然并没有因为他也是我们写的那么对于对应说明的类来说里面也有类似的这个操作或者CAS也是这个流程比如在前面也说明过偏移量如AtomicInteger类中的这个private static final long VALUE U.objectFieldOffset(AtomicInteger.class, “value”);就得到了偏移量所以我也说某些自带的操作自然也使用了Unsafe当然对应博客里面也操作了找具体偏移量以及比较的操作当然这也是使用对应类Unsafe的了大多数的相关CAS操作是由native操作的所以就不多说了且他解决了Unsafe使用的情况即解决了前面说明的Unsafe使用报错的情况 StampedLock 为什么引入StampedLock StampedLock是在JDK8中新增的有了读写锁为什么还要引入StampedLock呢 可以看到从ReentrantLock到StampedLock并发度依次提高因为互斥的影响降低了自然可以更快的进行操作并发进入操作 另一方面因为ReentrantReadWriteLock采用的是悲观读的策略当第一个读线程拿到锁之后第二个、 第三个读线程还可以拿到锁使得写线程一直拿不到锁可能导致写线程饿死虽然通常会在其公平或⾮公平的实现 中都尽量避免这种情形好像并没有具体说明了解即可但还有可能发⽣ StampedLock引入了乐观读策略读的时候不加读锁读出来发现数据被修改了再升级为悲观读相当 于降低了读的地位把抢锁的天平往写的一方倾斜了一下避免写线程被饿死即偏向读锁的互斥 使用场景 在剖析其原理之前下面先以官方的一个例子来看一下StampedLock如何使用 package main5;import java.util.concurrent.locks.StampedLock;/****/ public class Point {private double x, y;private final StampedLock sl new StampedLock();// 假设多个线程调用该方法修改x和y的值void move(double deltaX, double deltaY) {long stamp sl.writeLock(); //写锁try {x deltaX;y deltaY;} finally {sl.unlockWrite(stamp); //释放}}// 假设多个线程调用该方法求距离double distenceFromOrigin() {// 使用乐观读long stamp sl.tryOptimisticRead();// 将共享变量拷⻉到线程栈double currentX x, currentY y;// 读期间有其他线程修改数据if (!sl.validate(stamp)) {// 读到的是脏数据丢弃// 重新使用悲观读而保证读取先后stamp sl.readLock(); //读锁try {currentX x;currentY y;} finally {sl.unlockRead(stamp); //释放}}return Math.sqrt(currentX * currentX currentY * currentY);} }如上面代码所示有一个Point类多个线程调用move()方法修改坐标还有多个线程调用 distanceFromOrigin()方法求距离 ⾸先执行move操作的时候要加写锁这个用法和ReadWriteLock的用法没有区别写操作和写操作也是 互斥的 关键在于读的时候用了一个乐观读sl.tryOptimisticRead()相当于在读之前给数据的状态做了一个快照。然后把数据拷⻉到内存里面在用之前再比对一次版本号如果版本号变了则说明在读的期间有其他 线程修改了数据读出来的数据废弃重新获取读锁关键代码就是下面这三行 // 读取之前获取数据的版本号 long stamp sl.tryOptimisticRead(); // 读将一份数据拷⻉到线程的栈内存中 double currentX x, currentY y; // 读取之后对比读之前的版本号和当前的版本号判断数据是否可用 // 根据stamp判断在读取数据和使用数据期间有没有其他线程修改数据 if (!sl.validate(stamp)) {// ...}要说明的是这三行关键代码对顺序⾮常敏感不能有重排序因为 state 变量已经是volatile所以可以禁 ⽌重排序但stamp并不是volatile的为此在validate(stamp)方法里面插入内存屏障 public boolean validate(long stamp) {VarHandle.acquireFence(); //对应的该类操作的内存屏障前面说明的是Unsafe的给出这个那么后面的操作要应用这个屏障的作用虽然这里没有说明是什么作用return (stamp SBITS) (state SBITS); }乐观读的实现原理 ⾸先StampedLock是一个读写锁因此也会像读写锁那样把一个state变量分成两半分别表示读锁和写 锁的状态同时它还需要一个数据的version但是一次CAS没有办法操作两个变量所以这个state变量本身 同时也表示了数据的version下面先分析state变量 public class StampedLock implements java.io.Serializable {private static final int LG_READERS 7;private static final long RUNIT 1L;private static final long WBIT 1L LG_READERS; // 第8位表示写锁private static final long RBITS WBIT - 1L; // 最低的7位表示读锁private static final long RFULL RBITS - 1L; // 读锁的数⽬private static final long ABITS RBITS | WBIT; // 读锁和写锁状态合二为一private static final long SBITS ~RBITS;// private static final long ORIGIN WBIT 1; // state的初始值private transient volatile long state;// ...}如下图用最低的8位表示读和写的状态其中第8位表示写锁的状态最低的7位表示读锁的状态因为写锁 只有一个bit位所以写锁是不可重入的 初始值不为0而是把WBIT 向左移动了一位也就是上面的ORIGIN 常量构造方法如下所示 public StampedLock() {state ORIGIN;}为什么state的初始值不设为0呢看对应乐观锁的实现 /* private static final int LG_READERS 7; 0000 0111private static final long RUNIT 1L; 0000 0001private static final long WBIT 1L LG_READERS; // 第8位表示写锁0000 1000 0000private static final long RBITS WBIT - 1L; // 最低的7位表示读锁 0000 0111 1111private static final long RFULL RBITS - 1L; // 读锁的数⽬0000 0111 1110private static final long ABITS RBITS | WBIT; // 读锁和写锁状态合二为一0000 1111 1111private static final long SBITS ~RBITS;1111 1000 0000private static final long ORIGIN WBIT 1; // state的初始值0001 0000 0000*/ public long tryOptimisticRead() {long s;return (((s state) WBIT) 0L) ? (s SBITS) : 0L;}public boolean validate(long stamp) {VarHandle.acquireFence();return (stamp SBITS) (state SBITS);}上面两个方法必须结合起来看当stateWBIT ! 0的时候说明有线程持有写锁比如state对应的出现多余的10001 1000 0000那么上面的tryOptimisticRead会永远返回0这样再调用validatestamp也就是validate0也会永远返回false这正是我们想要的逻辑所以当有线程持有写锁的时候validate永远返回false无论中间写线程是否释放了写锁因为无论是否释放了state回到初始值写锁state值都不为0所以这个时候validate0永远为false所以写与读不互斥 为什么上面的validate(…)方法不直接比较stampstate而要比较stampSBITSstateSBITS 呢 因为读锁和读锁是不互斥的那么可能中间会被修改所以如果stampstate那么就不是true而是false而stampSBITSstateSBITS仍然是true满足读锁和读锁不互斥 所以即使在乐观读的时候state 值被修改了但如果它改的是对应第7位validate(…)还是会返回true因为不是持有写锁了而是读锁即stateWBIT 0即的结果是0那么前面不是返回0而是返回对应的值可以观察到返回了true 另外要说明的一点是上面使用了内存屏障VarHandle.acquireFence();是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile的由此可以禁⽌其和前面的currentXXcurrentYY进行重排序比如操作保证不会与写冲突这样就不用操作写锁了在一定的情况下重排序是可以替换互斥锁的因为他内部就是可以操作互斥的通过上面的分析可以发现state的设计⾮常巧妙只通过一个变量既实现了读锁、写锁的状态记录还实现 了数据的版本号的记录 悲观读/写阻塞与自旋策略实现差异 同ReadWriteLock一样StampedLock也要进行悲观的读锁和写锁操作不过它不是基于AQS实现的而是 内部重新实现了一个阻塞队列如下所示 public class StampedLock implements java.io.Serializable {// ...static final class WNode {volatile WNode prev;volatile WNode next;volatile WNode cowait;volatile Thread thread;volatile int status; // 取值0WAITING或CANCELLEDfinal int mode; // 取值RMODE或WMODEWNode(int m, WNode p) {mode m;prev p;}}// ...private transient volatile WNode whead;private transient volatile WNode wtail;// ...}这个阻塞队列和 AQS 里面的很像 刚开始的时候wheadwtailNULL然后初始化建一个空节点whead和wtail都指向这个空节点之后往 里面加入一个个读线程或写线程节点 但基于这个阻塞队列实现的锁的调度策略和AQS很不一样也就是自旋 一般来说在AQS里面当一个线程CAS state失败之后会立即加入阻塞队列只是针对AbstractQueuedSynchronizer的对应操作并且进入阻塞状态但在StampedLock中CAS state失败之后会不断自旋自旋⾜够多的次数之后如果还拿不到锁才进入 阻塞状态为此一般会根据CPU的核数定义了自旋次数的常量值如果是单核的CPU肯定不能自旋在多核情况下才采 用自旋策略 //StampedLock里面的 private static final int NCPU Runtime.getRuntime().availableProcessors();// 自旋的次数超过这个数字进入阻塞 private static final int SPINS (NCPU 1) ? 1 6 : 0;下面以写锁的加锁也就是StampedLock的writeLock()方法为例来看一下自旋的实现 public long writeLock() {long next;return ((next tryWriteLock()) ! 0L) ? next : acquireWrite(false, 0L); }public long tryWriteLock() {long s;return (((s state) ABITS) 0L) ? tryWriteLock(s) : 0L; }/* private static final int LG_READERS 7; 0000 0111private static final long RUNIT 1L; 0000 0001private static final long WBIT 1L LG_READERS; // 第8位表示写锁0000 1000 0000private static final long RBITS WBIT - 1L; // 最低的7位表示读锁 0000 0111 1111private static final long RFULL RBITS - 1L; // 读锁的数⽬0000 0111 1110private static final long ABITS RBITS | WBIT; // 读锁和写锁状态合二为一0000 1111 1111private static final long SBITS ~RBITS;1111 1000 0000private static final long ORIGIN WBIT 1; // state的初始值0001 0000 0000*/如上面代码所示当stateABITS0的时候说明既没有线程持有读锁也没有线程持有写锁此时当前线 程有资格通过CAS操作state来决定谁进入前面的一般来说只要修改了那么必然会改变int c的结果那个地方就是一个案例因为要进行抢占互斥的也基本如此而不互斥的基本不会这样操作CAS一般也是可以操作互斥的哦因为只能一人进入所以他的修改也是可以操作互斥的若操作不成功则调用acquireWrite()方法进入阻塞队列并进行自旋这个方法 是整个加锁操作的核⼼代码如下 private long acquireWrite(boolean interruptible, long deadline) {WNode node null, p;// 入列时自旋for (int spins -1;;) { // spin while enqueuinglong m, s, ns;if ((m (s state) ABITS) 0L) {if ((ns tryWriteLock(s)) ! 0L)return ns; // 自旋的时候获取到锁返回}else if (spins 0)// 计算自旋值spins (m WBIT wtail whead) ? SPINS : 0;else if (spins 0) {--spins; // 每次自旋获取锁spins值减一Thread.onSpinWait();}// 如果尾部节点是null初始化队列else if ((p wtail) null) { // initialize queueWNode hd new WNode(WMODE, null);// 头部和尾部指向一个节点if (WHEAD.weakCompareAndSet(this, null, hd))wtail hd;}else if (node null)node new WNode(WMODE, p);else if (node.prev ! p)// p节点作为前置节点node.prev p;// for循环唯一的break成功将节点node添加到队列尾部才会退出for循环总不能一直自旋吧那么就要操作放入阻塞队列然后退出循环了breakelse if (WTAIL.weakCompareAndSet(this, p, node)) {// 设置p的后置节点为nodep.next node;break;}}boolean wasInterrupted false;for (int spins -1;;) {WNode h, np, pp; int ps;if ((h whead) p) {if (spins 0)spins HEAD_SPINS;else if (spins MAX_HEAD_SPINS)spins 1;for (int k spins; k 0; --k) { // spin at headlong s, ns;if (((s state) ABITS) 0L) {if ((ns tryWriteLock(s)) ! 0L) {whead node;node.prev null;if (wasInterrupted)Thread.currentThread().interrupt();return ns;}}elseThread.onSpinWait();}}// 唤醒读取的线程else if (h ! null) { // help release stale waitersWNode c; Thread w;while ((c h.cowait) ! null) {if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) (w c.thread) ! null)LockSupport.unpark(w);}}if (whead h) {if ((np node.prev) ! p) {if (np ! null)(p np).next node; // stale}else if ((ps p.status) 0)WSTATUS.compareAndSet(p, 0, WAITING);else if (ps CANCELLED) {if ((pp p.prev) ! null) {node.prev pp;pp.next node;}}else {long time; // 0 argument to park means no timeoutif (deadline 0L)time 0L;else if ((time deadline - System.nanoTime()) 0L)return cancelWaiter(node, node, false);Thread wt Thread.currentThread();node.thread wt;if (p.status 0 (p ! h || (state ABITS) ! 0L) whead h node.prev p) {if (time 0L)// 阻塞直到被唤醒LockSupport.park(this);else// 计时阻塞 LockSupport.parkNanos(this, time);}node.thread null;if (Thread.interrupted()) {if (interruptible)// 如果被中断了则取消等待return cancelWaiter(node, node, true);wasInterrupted true;}}}}}整个acquireWrite(…)方法是两个大的for循环内部实现了⾮常复杂的自旋策略在第一个大的for循环里面 ⽬的就是把该Node加入队列的尾部一边加入一边通过CAS操作尝试获得锁如果获得了整个方法就会返 回如果不能获得锁会一直自旋直到加入队列尾部 在第二个大的for循环里也就是该Node已经在队列尾部了这个时候如果发现自己刚好也在队列头部说 明队列中除了空的Head节点就是当前线程了当然此时也会再进行新一轮的自旋直到达到MAX_HEAD_SPINS次数即也会继续尝试获得锁这就是自旋的次数总不能一直自旋吧 然后进入阻塞这里有一个关键点要说明当release(…)方法释放被调用之后会唤醒队列头部的第1个元素此时会 执行第二个大的for循环里面的逻辑也就是接着for循环里面park()方法后面的代码往下执行 另外一个不同于AQS的阻塞队列的地方是在每个WNode里面有一个cowait指针用于串联起所有的读线 程例如队列尾部阻塞的是一个读线程 1现在又来了读线程 2、3那么会通过cowait指针把1、2、3串联起 来1被唤醒之后2、3也随之一起被唤醒因为读和读之间不互斥 明⽩加锁的自旋策略后下面来看锁的释放操作和读写锁的实现类似也是做了两件事情一是把state变量 置回原位二是唤醒阻塞队列中的第一个节点 ReservedStackAccesspublic void unlockWrite(long stamp) {if (state ! stamp || (stamp WBIT) 0L)throw new IllegalMonitorStateException();unlockWriteInternal(stamp);}private long unlockWriteInternal(long s) {long next; WNode h;STATE.setVolatile(this, next unlockWriteState(s));if ((h whead) ! null h.status ! 0)release(h);return next;}private void release(WNode h) {if (h ! null) {WNode q; Thread w;WSTATUS.compareAndSet(h, WAITING, 0);if ((q h.next) null || q.status CANCELLED) {for (WNode t wtail; t ! null t ! h; t t.prev)if (t.status 0)q t;}if (q ! null (w q.thread) ! null)LockSupport.unpark(w);}}//上面知识了解即可由于博文字数限制的原因请到下一篇博文学习
http://www.hkea.cn/news/14260311/

相关文章:

  • 做网站后的收获豆瓣 wordpress 插件
  • 打开网站自动跳转代码网站建设 用英语
  • 汽车4s店网站建设方案大连企业网站哪一家好
  • 诸城网站优化域名检测查询
  • 智慧团建网站官网电脑版手机网站插件代码
  • 网站诊断内容网上销售平台有哪些
  • 网站的建设方面武昌做网站公司
  • 宁波网站建设设计手机企业网站
  • wordpress建站教程阿里云一级a做爰片免费观看 安全网站
  • 雄安网建 网站建设靖江网站制作多少钱
  • 网站兼容浏览器顺德乐从网站建设
  • windows2008网站wordpress 4.9.2漏洞
  • 养殖推广网站怎么做怎么让谷歌收录我的网站
  • 青海省城乡建设厅网站首页wordpress 出名主题
  • 在网站上如何做天气预报栏开发做游戏的网站
  • 网站备案需要钱吗口碑好的镇江网站建设
  • 四川建设企业网站页面设计说明万能模板
  • 通化 网站建设电脑制作软件的工具
  • 天津网站建设哪家好3d建模师可以自学吗
  • 谷歌网站关键词优化网站开发工资淄博
  • 公司网站建设接单济南网站建设方案咨询
  • 使用c#语言建设网站优点wordpress按钮灯箱
  • 国内外公司网站差异临汾哪里有做网站的
  • 儒枫网网站建设网上可以报警备案吗
  • 深圳建设网站公百度 网站速度诊断
  • 多用户商城系统网站建设湛江seo推广公司
  • 网站config配置教程手机网站翻页效果
  • 建筑建设规范网站做盗版网站引流
  • 色一把做最好网站有没有网站找人帮忙做图
  • 泰州哪里有做网站的网络公司4000-26谷歌官方网站注册