您的位置:首页 > 其它

ReentrantReadWriteLock之写锁

2018-04-03 10:44 369 查看
ReentrantLock实现了标准的互斥操作,也就是说在某一时刻只有有一个线程持有锁。ReentrantLock采用这种独占的保守锁直接,在一定程度上减低了吞吐量。在这种情况下任何的“读/读”、“读/写”、“写/写”操作都不能同时发生。然而在实际的场景中我们就会遇到这种情况:有些资源并发的访问中,它大部分时间都是执行读操作,写操作比较少,但是读操作并不影响数据的一致性,如果在进行读操作时采用独占的锁机制,这样势必会大大降低吞吐量。所以如果能够做到读写分离,那就非常完美了。ReadWriteLock, 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。对于ReadWriteLock而言,一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据)。如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReadWriteLock为一个接口,他定义了两个方法readLock、writeLock,从方法名我们就可以看出这两个方法是干嘛用的。ReentrantReadWriteLock作为ReadWriteLock的实现类,在API文档中详细介绍了它的特性。(一) 公平性      1)、 非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。       2)、 公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。(二) 重入性      1)、 读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。当然了只有写线程释放了锁,读线程才能获取重入锁。      2)、 写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。      3)、 另外读写锁最多支持65535个递归写入锁和65535个递归读取锁。(为何是65535后面介绍)(三) 锁降级      1)、 写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。(四) 锁升级      1)、 读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁视图获取写入锁而都不释放读取锁时就会发生死锁。(五) 锁获取中断      1)、 读取锁和写入锁都支持获取锁期间被中断。这个和独占锁一致。(六) 条件变量      1)、 写入锁提供了条件变量(Condition)的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个UnsupportedOperationException异常。(七) 重入数      1)、 读取锁和写入锁的数量最大分别只能是65535(包括重入数)。(八) 监测      1)、 此类支持一些确定是保持锁还是争用锁的方法。这些方法设计用于监视系统状态,而不是同步控制。ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,它的读锁、写锁都是依靠Sync来实现的。所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一    样而已,它的读写锁其实就是两个类:ReadLock、writeLock,这两个类都是lock实现。
/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;

/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}

/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

public static class WriteLock implements Lock, java.io.Serializable{
public void lock() {
//独占锁
sync.acquire(1);
}
/**
* 省略其余源代码
*/
}

public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
//共享锁
sync.acquireShared(1);
}
/**
* 省略其余源代码
*/
}
从上面的源代码我们可以看到WriteLock就是一个独占锁,readLock是一个共享锁,他们内部都是使用AQS的acquire、release来进行操作的。但是还是存在一些区别的。关于独占锁、共享锁,请关注前面的博客:下面LZ就ReadLock、WriteLock的获取锁(lock)、释放锁(release)进行分析。

WriteLock

lock()

                  
public void lock() {
sync.acquire(1);
}
与ReentrantLock一样,调用AQS的acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一步,写锁调用tryAcquire方法,该方法与ReentrantLock中的tryAcquire方法略有不同:
protected final boolean tryAcquire(int acquires) {
//当前线程
Thread current = Thread.currentThread();
//当前锁个数
int c = getState();
//写锁个数
int w = exclusiveCount(c);

//当前锁个数 != 0(是否已经有线程持有锁),线程重入
if (c != 0) {
//w == 0,表示写线程数为0
//或者独占锁不是当前线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;

//超出最大范围(65535)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//设置锁的线程数量
setState(c + acquires);
return true;
}

//是否阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;

//设置锁为当前线程所有
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // 状态不为0,表示锁被分配出去了。
// (Note: if c != 0 and w == 0 then shared count != 0)
// c != 0 and w == 0 表示分配了读锁
// w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
if (w == 0 || current != getExclusiveOwnerThread())
return false ;
// 写锁重入
// 检测是否超过最大重入次数。
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
// Reentrant acquire
setState(c + acquires);
return true ;
}
// writerShouldBlock 留给子类实现,用于实现公平性策略。
// 如果允许获取写锁,则用 CAS 更新状态。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false ; // 不允许获取锁 或 CAS 失败。
// 获取写锁超过,设置独占线程。
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) // 是否是当前线程持有写锁
throw new IllegalMonitorStateException();
// 这里不考虑高16位是因为高16位肯定是 0。
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread( null); // 写锁完全释放,设置独占线程为null。
setState(nextc);
return free;
}

在tryAcquire()中有一个段代码int w = exclusiveCount(c);该段代码主要是获取线程的数量的,在前面的特性里面有讲过读取锁和写入锁的数量最大分别只能是65535(包括重入数)。为何是65535呢?在前面LZ也提到过独占锁ReentrantLock中有一个state,共享锁中也有一个state,其中独占锁中的state为0或者1如果有重入,则表示重入的次数,共享锁中表示的持有锁的数量。而在ReadWriteLock中则不同,由于ReadWriteLock中存在两个锁,他们之间有联系但是也有差异,所以需要有两个state来分别表示他们。于是ReentrantReadWriteLock就将state一分二位,高16位表示共享锁的数量,低16位表示独占锁的数量。2^16 – 1 = 65535。这就是前面提过的为什么读取锁和写入锁的数量最大分别只能是65535。
·   static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回共享锁持有线程的数量 **/
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** 返回独占锁持有线程的数量 **/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
这段代码可以清晰表达计算写锁、读书持有线程的数量。在上段tryAcquire方法的源代码中,主要流程如下:1、首先获取c、w。然后判断是否已经有线程持有写锁(c != 0),如果持有,则线程进行重入。若w == 0(写入锁==0)或者 current != getExclusiveOwnerThread()(锁的持有者不是当前线程),则返回false。如果写入锁的数量超出最大范围(65535),则抛出error。2、如果当且写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果通过CAS增加写线程数失败也返回失败。3、当c ==0或者c>0,w >0,则设置锁的持有则为当前线程。

unlock()

public void unlock() {
sync.release(1);
}
unlock()调用Sync的release():
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在release()中首先调用tryRelease方法进行尝试释放锁:
protected final boolean tryRelease(int releases) {
//若锁的持有者不是当前线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//写锁的新线程数
int nextc = getState() - releases;
//若写锁的新线程数为0,则将锁的持有者设置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
//设置写锁的新线程数
setState(nextc);
return free;
}
写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。
原文链接:http://cmsblogs.com/?p=1679
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: