营销网络的建设是什么意思,优化百度网站,宁波东方论坛,轻奢风格装修图片JUC第十四讲#xff1a;JUC锁: ReentrantReadWriteLock详解 本文是JUC第十四讲#xff1a;JUC锁 - ReentrantReadWriteLock详解。ReentrantReadWriteLock表示可重入读写锁#xff0c;ReentrantReadWriteLock中包含了两种锁#xff0c;读锁ReadLock和写锁WriteLock#xff…JUC第十四讲JUC锁: ReentrantReadWriteLock详解 本文是JUC第十四讲JUC锁 - ReentrantReadWriteLock详解。ReentrantReadWriteLock表示可重入读写锁ReentrantReadWriteLock中包含了两种锁读锁ReadLock和写锁WriteLock可以通过这两种锁实现线程间的同步。 文章目录 JUC第十四讲JUC锁: ReentrantReadWriteLock详解1、带着BAT大厂的面试问题去理解2、ReentrantReadWriteLock数据结构3、ReentrantReadWriteLock源码分析3.1、类的继承关系3.2、类的内部类3.3、内部类 - Sync类3.4、内部类 - Sync核心函数分析3.5、类的属性3.6、类的构造函数3.7、核心函数分析 4、ReentrantReadWriteLock示例5、更深入理解5.1、什么是锁升降级? 6、参考文章 1、带着BAT大厂的面试问题去理解
请带着这些问题继续后文会很大程度上帮助你更好的理解相关知识点。
为了有了ReentrantLock还需要ReentrantReadWriteLock? 实现读锁共享写锁互斥ReentrantReadWriteLock底层实现原理? ReentrantLock AQSReentrantReadWriteLock底层读写状态如何设计的? 高16位为读锁低16位为写锁读锁和写锁的最大数量是多少? 2^16本地线程计数器ThreadLocalHoldCounter是用来做什么的? 线程与对象相关联缓存计数器HoldCounter是用来做什么的? 记录锁的可重入次数写锁的获取与释放是怎么实现的?读锁的获取与释放是怎么实现的?什么是锁的升降级? RentrantReadWriteLock为什么不支持锁升级? 保证可见性
2、ReentrantReadWriteLock数据结构
ReentrantReadWriteLock底层是基于ReentrantLock和AbstractQueuedSynchronizer来实现的所以ReentrantReadWriteLock的数据结构也依托于AQS的数据结构。
3、ReentrantReadWriteLock源码分析
3.1、类的继承关系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}说明: 可以看到ReentrantReadWriteLock实现了ReadWriteLock接口ReadWriteLock接口定义了获取读锁和写锁的规范具体需要实现类去实现同时其还实现了Serializable接口表示可以进行序列化在源代码中可以看到 ReentrantReadWriteLock 实现了自己的序列化逻辑。
3.2、类的内部类
ReentrantReadWriteLock 有五个内部类五个内部类之间也是相互关联的。内部类的关系如下图所示。 说明: 如上图所示Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类ReadLock实现了Lock接口、WriteLock也实现了Lock接口。
3.3、内部类 - Sync类
类的继承关系
abstract static class Sync extends AbstractQueuedSynchronizer {}说明Sync抽象类继承自AQS抽象类Sync类提供了对 ReentrantReadWriteLock 的支持。
类的内部类
Sync类内部存在两个内部类分别为HoldCounter和ThreadLocalHoldCounter其中HoldCounter主要与读锁配套使用其中HoldCounter源码如下。
// 计数器
static final class HoldCounter {// 计数int count 0;// Use id, not reference, to avoid garbage retention// 获取当前线程的TID属性的值final long tid getThreadId(Thread.currentThread());
}说明: HoldCounter主要有两个属性count和tid其中count表示某个读线程重入的次数tid表示该线程的tid字段的值该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下
// 本地线程计数器
static final class ThreadLocalHoldCounterextends ThreadLocalHoldCounter {// 重写初始化方法在没有进行set的情况下获取的都是该HoldCounter值public HoldCounter initialValue() {return new HoldCounter();}
}说明: ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下get到的均是 initialValue 方法里面生成的那个 HolderCounter 对象。
类的属性
abstract static class Sync extends AbstractQueuedSynchronizer {// 版本序列号private static final long serialVersionUID 6317671515068378041L;// 高16位为读锁低16位为写锁static final int SHARED_SHIFT 16;// 读锁单位 2^16static final int SHARED_UNIT (1 SHARED_SHIFT);// 读锁最大数量 2^16 - 1static final int MAX_COUNT (1 SHARED_SHIFT) - 1;// 写锁最大数量 2^16 - 1static final int EXCLUSIVE_MASK (1 SHARED_SHIFT) - 1;// 本地线程计数器private transient ThreadLocalHoldCounter readHolds;// 缓存的计数器private transient HoldCounter cachedHoldCounter;// 第一个读线程private transient Thread firstReader null;// 第一个读线程的计数private transient int firstReaderHoldCount;
}说明该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。
类的构造函数
// 构造函数
Sync() {// 本地线程计数器readHolds new ThreadLocalHoldCounter();// 设置AQS的状态setState(getState()); // ensures visibility of readHolds
}说明在Sync的构造函数中设置了本地线程计数器和AQS的状态state。
3.4、内部类 - Sync核心函数分析
对 ReentrantReadWriteLock 对象的操作绝大多数都转发至Sync对象进行处理。下面对Sync类中的重点函数进行分析
sharedCount 函数
表示占有读锁的线程数量源码如下
static int sharedCount(int c) { return c SHARED_SHIFT; }说明直接将state右移16位就可以得到读锁的线程数量因为state的高16位表示读锁对应的低十六位表示写锁数量。
exclusiveCount函数
表示占有写锁的线程数量源码如下
static int exclusiveCount(int c) { return c EXCLUSIVE_MASK; }说明: 直接将状态state和(2^16 - 1)做与运算其等效于将state模上2^16。写锁数量由state的低十六位表示。
tryRelease函数
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/protected final boolean tryRelease(int releases) {// 判断是否为独占线程if (!isHeldExclusively())throw new IllegalMonitorStateException();// 计算释放资源后的写锁的数量int nextc getState() - releases;boolean free exclusiveCount(nextc) 0; // 是否释放成功if (free)setExclusiveOwnerThread(null); // 设置独占线程为空setState(nextc); // 设置状态return free;
}说明此函数用于释放写锁资源首先会判断该线程是否为独占线程若不为独占线程则抛出异常否则计算释放资源后的写锁的数量若为0表示成功释放资源不将被占用否则表示资源还被占用。其函数流程图如下。
tryAcquire 函数
protected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*/// 获取当前线程Thread current Thread.currentThread();// 获取状态int c getState();// 写线程数量int w exclusiveCount(c);if (c ! 0) { // 状态不为0// (Note: if c ! 0 and w 0 then shared count ! 0)if (w 0 || current ! getExclusiveOwnerThread()) // 写线程数量为0或者当前线程没有占有独占资源return false;if (w exclusiveCount(acquires) MAX_COUNT) // 判断是否超过最高写线程数量throw new Error(Maximum lock count exceeded);// Reentrant acquire// 设置AQS状态setState(c acquires);return true;}if (writerShouldBlock() ||!compareAndSetState(c, c acquires)) // 写线程是否应该被阻塞return false;// 设置独占线程setExclusiveOwnerThread(current);return true;
}说明此函数用于获取写锁首先会获取state判断是否为0若为0表示此时没有读锁线程再判断写线程是否应该被阻塞而在非公平策略下总是不会被阻塞在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程若存在则需要被阻塞否则无需阻塞)之后在设置状态state然后返回true。若state不为0则表示此时存在读锁或写锁线程若写锁线程数量为0或者当前线程为独占锁线程则返回false表示不成功否则判断写锁线程的重入次数是否大于了最大值若是则抛出异常否则设置状态state返回true表示成功。其函数流程图如下 tryReleaseShared 函数
protected final boolean tryReleaseShared(int unused) {// 获取当前线程Thread current Thread.currentThread();// 当前线程为第一个读线程if (firstReader current) {// assert firstReaderHoldCount 0;// 读线程占用的资源数为1if (firstReaderHoldCount 1) firstReader null;else // 减少占用的资源firstReaderHoldCount--;} else { // 当前线程不为第一个读线程// 获取缓存的计数器HoldCounter rh cachedHoldCounter;// 计数器为空 或者计数器的tid不为当前正在运行的线程的tidif (rh null || rh.tid ! getThreadId(current))// 获取当前线程对应的计数器rh readHolds.get();// 获取计数int count rh.count;if (count 1) { // 计数小于等于1// 移除readHolds.remove();if (count 0) // 计数小于等于0抛出异常throw unmatchedUnlockException();}// 减少计数--rh.count;}// 无限循环for (;;) {// 获取状态int c getState();// 获取状态int nextc c - SHARED_UNIT;if (compareAndSetState(c, nextc)) // 比较并进行设置// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc 0;}
}说明此函数表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader若是则判断第一个读线程占有的资源数firstReaderHoldCount是否为1若是则设置第一个读线程firstReader为空否则将第一个读线程占有的资源数firstReaderHoldCount减1若当前线程不是第一个读线程那么首先会获取缓存计数器(上一个读锁线程对应的计数器 )若计数器为空或者tid不等于当前线程的tid值则获取当前线程的计数器如果计数器的计数count小于等于1则移除当前线程对应的计数器如果计数器的计数count小于等于0则抛出异常之后再减少计数即可。无论何种情况都会进入无限循环该循环可以确保成功设置状态state。其流程图如下 tryAcquireShared函数
private IllegalMonitorStateException unmatchedUnlockException() {return new IllegalMonitorStateException(attempt to unlock read lock, not locked by current thread);
}// 共享模式下获取资源
protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*/// 获取当前线程Thread current Thread.currentThread();// 获取状态int c getState();if (exclusiveCount(c) ! 0 getExclusiveOwnerThread() ! current) // 写线程数不为0并且占有资源的不是当前线程return -1;// 读锁数量int r sharedCount(c);if (!readerShouldBlock() r MAX_COUNT compareAndSetState(c, c SHARED_UNIT)) { // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功if (r 0) { // 读锁数量为0// 设置第一个读线程firstReader current;// 读线程占用的资源数为1firstReaderHoldCount 1;} else if (firstReader current) { // 当前线程为第一个读线程// 占用资源数加1firstReaderHoldCount;} else { // 读锁数量不为0并且不为当前线程// 获取计数器HoldCounter rh cachedHoldCounter;if (rh null || rh.tid ! getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid// 获取当前线程对应的计数器cachedHoldCounter rh readHolds.get();else if (rh.count 0) // 计数为0// 设置readHolds.set(rh);rh.count;}return 1;}return fullTryAcquireShared(current);
}说明此函数表示读锁线程获取读锁。首先判断写锁是否为0 并且当前线程不占有独占锁直接返回否则判断读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功若当前没有读锁则设置第一个读线程firstReader和firstReaderHoldCount若当前线程线程为第一个读线程则增加firstReaderHoldCount否则将设置当前线程对应的HoldCounter对象的值。流程图如下
fullTryAcquireShared函数
final int fullTryAcquireShared(Thread current) {/** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh null;for (;;) { // 无限循环// 获取状态int c getState();if (exclusiveCount(c) ! 0) { // 写线程数量不为0if (getExclusiveOwnerThread() ! current) // 不为当前线程return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞// Make sure were not acquiring read lock reentrantly// 当前线程为第一个读线程if (firstReader current) {// assert firstReaderHoldCount 0;} else { // 当前线程不为第一个读线程if (rh null) { // 计数器不为空// rh cachedHoldCounter;if (rh null || rh.tid ! getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tidrh readHolds.get();if (rh.count 0)readHolds.remove();}}if (rh.count 0)return -1;}}if (sharedCount(c) MAX_COUNT) // 读锁数量为最大值抛出异常throw new Error(Maximum lock count exceeded);if (compareAndSetState(c, c SHARED_UNIT)) { // 比较并且设置成功if (sharedCount(c) 0) { // 读线程数量为0// 设置第一个读线程firstReader current;// firstReaderHoldCount 1;} else if (firstReader current) {firstReaderHoldCount;} else {if (rh null)rh cachedHoldCounter;if (rh null || rh.tid ! getThreadId(current))rh readHolds.get();else if (rh.count 0)readHolds.set(rh);rh.count;cachedHoldCounter rh; // cache for release}return 1;}}
}说明在 tryAcquireShared函数中如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功) 则会进行fullTryAcquireShared函数中它用来保证相关操作可以成功。其逻辑与tryAcquireShared逻辑类似不再累赘。
而其他内部类的操作基本上都是转化到了对Sync对象的操作在此不再累赘。
3.5、类的属性
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {// 版本序列号private static final long serialVersionUID -6992448646407690164L; // 读锁private final ReentrantReadWriteLock.ReadLock readerLock;// 写锁private final ReentrantReadWriteLock.WriteLock writerLock;// 同步队列final Sync sync;private static final sun.misc.Unsafe UNSAFE;// 线程ID的偏移地址private static final long TID_OFFSET;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? tk Thread.class;// 获取线程的tid字段的内存地址TID_OFFSET UNSAFE.objectFieldOffset(tk.getDeclaredField(tid));} catch (Exception e) {throw new Error(e);}}
}说明: 可以看到ReentrantReadWriteLock属性包括了一个ReentrantReadWriteLock.ReadLock对象表示读锁一个ReentrantReadWriteLock.WriteLock对象表示写锁一个Sync对象表示同步队列。
3.6、类的构造函数
ReentrantReadWriteLock()型构造函数
public ReentrantReadWriteLock() {// 默认非公平锁this(false);
}说明此构造函数会调用另外一个有参构造函数。
ReentrantReadWriteLock(boolean) 型构造函数
public ReentrantReadWriteLock(boolean fair) {// 公平策略或者是非公平策略sync fair ? new FairSync() : new NonfairSync();// 读锁readerLock new ReadLock(this);// 写锁writerLock new WriteLock(this);
}说明: 可以指定设置公平策略或者非公平策略并且该构造函数中生成了读锁与写锁两个对象。
3.7、核心函数分析
对ReentrantReadWriteLock的操作基本上都转化为了对Sync对象的操作而Sync的函数已经分析过不再累赘。
4、ReentrantReadWriteLock示例
下面给出了一个使用ReentrantReadWriteLock的示例源代码如下。
import java.util.concurrent.locks.ReentrantReadWriteLock;class ReadThread extends Thread {private ReentrantReadWriteLock rrwLock;public ReadThread(String name, ReentrantReadWriteLock rrwLock) {super(name);this.rrwLock rrwLock;}public void run() {System.out.println(Thread.currentThread().getName() trying to lock);try {rrwLock.readLock().lock();System.out.println(Thread.currentThread().getName() lock successfully);Thread.sleep(5000); } catch (InterruptedException e) {e.printStackTrace();} finally {rrwLock.readLock().unlock();System.out.println(Thread.currentThread().getName() unlock successfully);}}
}class WriteThread extends Thread {private ReentrantReadWriteLock rrwLock;public WriteThread(String name, ReentrantReadWriteLock rrwLock) {super(name);this.rrwLock rrwLock;}public void run() {System.out.println(Thread.currentThread().getName() trying to lock);try {rrwLock.writeLock().lock();System.out.println(Thread.currentThread().getName() lock successfully); } finally {rrwLock.writeLock().unlock();System.out.println(Thread.currentThread().getName() unlock successfully);}}
}public class ReentrantReadWriteLockDemo {public static void main(String[] args) {ReentrantReadWriteLock rrwLock new ReentrantReadWriteLock();ReadThread rt1 new ReadThread(rt1, rrwLock);ReadThread rt2 new ReadThread(rt2, rrwLock);WriteThread wt1 new WriteThread(wt1, rrwLock);rt1.start();rt2.start();wt1.start();}
}运行结果(某一次)
rt1 trying to lock
rt2 trying to lock
wt1 trying to lock
rt1 lock successfully
rt2 lock successfully
rt1 unlock successfully
rt2 unlock successfully
wt1 lock successfully
wt1 unlock successfully说明: 程序中生成了一个ReentrantReadWriteLock对象并且设置了两个读线程一个写线程。根据结果可能存在如下的时序图。
rt1线程执行 rrwLock.readLock().lock 操作主要的函数调用如下。 说明此时AQS的状态state为2^16 次方即表示此时读线程数量为1。
rt2线程执行 rrwLock.readLock().lock 操作主要的函数调用如下。 说明: 此时AQS的状态state为2 * 2^16次方即表示此时读线程数量为2。
wt1线程执行rrwLock.writeLock().lock操作主要的函数调用如下。 说明此时在同步队列Sync queue中存在两个结点并且wt1线程会被禁止运行。
rt1线程执行 rrwLock.readLock().unlock 操作主要的函数调用如下。 说明此时AQS的state为 2^16次方表示还有一个读线程。
rt2线程执行 rrwLock.readLock().unlock 操作主要的函数调用如下。 说明当rt2线程执行unlock操作后AQS的state为0并且wt1线程将会被unpark其获得CPU资源就可以运行。
wt1线程获得CPU资源继续运行需要恢复。由于之前 acquireQueued 函数中的 parkAndCheckInterrupt函数 中被禁止的所以恢复到 parkAndCheckInterrupt函数中主要的函数调用如下 说明最后sync queue队列中只有一个结点并且头节点尾节点均指向它AQS的state值为1表示此时有一个写线程。
wt1执行 rrwLock.writeLock().unlock 操作主要的函数调用如下。
说明此时AQS的state为0表示没有任何读线程或者写线程了。并且Sync queue结构与上一个状态的结构相同没有变化。
在项目中的使用
todo
5、更深入理解
5.1、什么是锁升降级?
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁然后将其释放最后再获取读锁这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁再获取到读锁随后释放(先前拥有的)写锁的过程。
接下来看一个锁降级的示例。因为数据不常变化所以多个线程可以并发地进行数据处理当数据变更后如果当前线程感知到数据变化则进行数据的准备工作同时其他处理线程被阻塞直到当前线程完成数据的准备工作如代码如下所示
// update变量使用volatile修饰
public void processData() {readLock.lock();if (!update) {// 必须先释放读锁readLock.unlock();// 锁降级从写锁获取到开始writeLock.lock();try {if (!update) {// 准备数据的流程(略)update true;}readLock.lock();} finally {writeLock.unlock();}// 锁降级完成写锁降级为读锁}try {// 使用数据的流程(略)} finally {readLock.unlock();}
}上述示例中当数据发生变更后update变量(布尔类型且volatile修饰)被设置为false此时所有访问 processData() 方法的线程都能够感知到变化但只有一个线程能够获取到写锁其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后再获取读锁随后释放写锁完成锁降级。
锁降级中读锁的获取是否必要呢? 答案是必要的。主要是为了保证数据的可见性如果当前线程不获取读锁而是直接释放写锁假设此刻另一个线程(记作线程T)获取了写锁并修改了数据那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁即遵循锁降级的步骤则线程T将会被阻塞直到当前线程使用数据并释放读锁之后线程T才能获取写锁进行数据更新。
RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁最后释放读锁的过程)。目的也是保证数据可见性如果读锁已被多个线程获取其中任意线程成功获取了写锁并更新了数据则其更新对其他获取到读锁的线程是不可见的。
6、参考文章
【JUC】JDK1.8源码分析之ReentrantReadWriteLockReentrantReadWriteLock