【java并发】juc高级锁机制探讨
2015-07-16 09:42
691 查看
最近在看一些juc相关的设计和源码,接上文:【java并发】基于JUC CAS原理,自己实现简单独占锁
本文探讨一下juc里面提供的一些高级锁机制和基本原理。
ReentrantLock:和synchronized具有差不多的语义,独占锁,同时只有一个线程能获得锁。
ReentrantReadWriteLock:读写锁。读允许共享,写独占。适用于读频繁的场景。
CountDownLatch:闭锁。使用场景类似比赛鸣枪,在没有鸣枪之前所有的运动员(线程)都必须等待。说白了就是使用于一个或多个线程等待某一个条件成立了,触发运行。
Semaphore:信号量。使用场景类似通行证,通行证数量有限,拿到证的才能通行。
CyclicBarrier:周期障碍。语义上和CountDownLatch有点类似,只有几个线程打到一个共同的状态之后,触发后续动作继续。不同在于1.达到共同状态后,可以指定一个后续触发的线程对象。2.周期性意味着可以周期运行。
FutureTask:带有返回值的异步执行。
至于各种锁机制使用场景这里不赘述,后面还有机会加一些例子。
以上不同的锁机制和使用场景,不管我们我们叫锁、闭锁、信号量等等。抽象之后都有一种共同的语义:
多线程并发的执行,之间通过某种共享状态来同步,只有当状态满足xxxx条件,才能触发线程执行xxxx。
这个共同的语义可以称之为同步器。可以认为以上所有的锁机制都可以基于同步器定制来实现的。
如果要实现一个特定场景的锁来同步线程的执行,其实并不难,如上文【java并发】基于JUC CAS原理,自己实现简单独占锁。而juc里的思想是将这些场景抽象出来的语义通过统一的同步框架来支持。
juc里所有的这些锁机制都是基于AQS(AbstractQueuedSynchronizer)框架上构建的。下面简单介绍下AQS(AbstractQueuedSynchronizer)。 可以参考Doug Lea的论文The java.util.concurrent Synchronizer Framework
一个同步器至少需要包含两个功能:
1. 获取同步状态
如果允许,则获取锁,如果不允许就阻塞线程,直到同步状态允许获取。
2. 释放同步状态
修改同步状态,并且唤醒等待线程。
根据作者论文,aqs同步机制同时考虑了如下需求:
1. 独占锁和共享锁两种机制。
2. 线程阻塞后,如果需要取消,需要支持中断。
3. 线程阻塞后,如果有超时要求,应该支持超时后中断的机制。
实现涉及基本技术原理
1. 状态位
提供volatile变量 state; 用于同步线程之间的共享状态。通过CAS和volatile保证其原子性和可见性。对应源码里的定义:
Java代码
/**
* 同步状态
*/
private volatile int state;
/**
*cas
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2. 线程阻塞和唤醒
有别于wait和notiry。这里利用jdk1.5开始提供的LockSupport.park()和LockSupport.unpark()的本地方法实现,实现线程的阻塞和唤醒。
3. 阻塞线程节点队列 CHL Node queue。
根据论文里描述,AQS里将阻塞线程封装到一个内部类Node里。并维护一个CHL Node FIFO队列。CHL队列是一个非阻塞的FIFO队列,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和CAS保证节点插入和移除的原子性。实现无锁且快速的插入。关于非阻塞算法可以参考
Java 理论与实践: 非阻塞算法简介。CHL队列对应代码如下:
Java代码
/**
* CHL头节点
*/
rivate transient volatile Node head;
/**
* CHL尾节点
*/
private transient volatile Node tail;
Node节点是对Thread的一个封装,结构大概如下:
Java代码
static final class Node {
/** 代表线程已经被取消*/
static final int CANCELLED = 1;
/** 代表后续节点需要唤醒 */
static final int SIGNAL = -1;
/** 代表线程在等待某一条件/
static final int CONDITION = -2;
/** 标记是共享模式*/
static final Node SHARED = new Node();
/** 标记是独占模式*/
static final Node EXCLUSIVE = null;
/**
* 状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、0
*/
volatile int waitStatus;
/**
* 前置节点
*/
volatile Node prev;
/**
* 后续节点
*/
volatile Node next;
/**
* 节点代表的线程
*/
volatile Thread thread;
/**
*连接到等待condition的下一个节点
*/
Node nextWaiter;
}
AQS源码
AQS实现了一个同步器的基本结构,下面以独占锁和非独占锁区分来看看AQS的几个主要方法:
独占模式
独占获取:tryAcquire本身不会阻塞线程,如果返回true成功就继续,如果返回false那么就阻塞线程并加入阻塞队列。
Java代码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列
selfInterrupt();
}
独占且可中断模式获取:支持中断取消
Java代码
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
独占且支持超时模式获取:带有超时时间,如果经过超时时间则会退出。
Java代码
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
独占模式释放:释放成功会唤醒后续节点
Java代码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
共享模式
共享模式获取
Java代码
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
可中断模式共享获取
Java代码
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
共享模式带定时获取
Java代码
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
共享锁释放
Java代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
注意以上框架只定义了一个同步器的基本结构框架,的基本方法里依赖的tryAcquire、tryRelease、tryAcquireShared 、tryReleaseShared四个方法在AQS里没有实现,这四个方法不会涉及线程阻塞,而是由各自不同的使用场景根据情况来定制:
Java代码
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
从以上源码可以看出AQS实现基本的功能:
AQS虽然实现了acquire,和release方法是可能阻塞的,但是里面调用的tryAcquire和tryRelease是由子类来定制的且是不阻塞的可。以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现。所以可以认为同步器实现了一下功能:
1.同步器基本范式、结构
2.状态获取、释放成功或失败的后续行为,如线程的阻塞、唤醒机制
3.线程阻塞队列的维护
状态获取、释放动作本身是由子类来定义的。
还有以下一些私有方法,用于辅助完成以上的功能:
final boolean acquireQueued(final Nodenode, int arg) :申请队列
private Node enq(final Node node) :入队
private Node addWaiter(Node mode) :以mode创建创建节点,并加入到队列
private void unparkSuccessor(Node node) : 唤醒节点的后续节点,如果存在的话。
private void doReleaseShared() :释放共享锁
private void setHeadAndPropagate(Node node,int propagate):设置头,并且如果是共享模式且propagate大于0,则唤醒后续节点。
private void cancelAcquire(Node node) :取消正在获取的节点
private static void selfInterrupt():自我中断
private final booleanparkAndCheckInterrupt() :park并判断线程是否中断
从源码可以看出AQS实现基本的功能:
1.同步器基本范式、结构
2.线程的阻塞、唤醒机制
3.线程阻塞队列的维护
AQS虽然实现了acquire,和release方法,但是里面调用的tryAcquire和tryRelease是由子类来定制的。可以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现。
ReentrantLock原理
由于同步器里已经定义了基本的结构,包括获取、释放、和阻塞队列维护和管理等。ReentrantLock是一个独占互斥锁,里只需要实现TryAcquire、TryRelease等方法,告诉同步器是否获取和释放状态成功。其他的后续行为都由AQS框架完成。由于ReentrantLock是一个可重入的独占锁,所以同步器状态可以直接根据是否==0来判断是否可用。
ReentrantLock主要提供lock和unlcok两个方法。
而lock和unlock正是基于AQS的一个子类同步器来实现。里面sync同步器有两种实现,一种是公平锁,一种是非公平锁。默认是非公平锁,看看非公平锁实现tryAcquire
Java代码
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果状态位为0,那么尝试获取
if (compareAndSetState(0, acquires)) {//基于CAS获取和修改状态
setExclusiveOwnerThread(current);//成功则设置当前线程为独占执行线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//当前线程已是执行线程
int nextc = c + acquires;//累加
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//其他情况下代表获取失败
再看看公平锁的tryAcquire
Java代码
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {//判断是否是第一个线程,是的话才尝试获取锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看看 非公平锁的不会根据FIFO,而公平锁会判断是否是第一个线程,根据FIFO来执行。
本文探讨一下juc里面提供的一些高级锁机制和基本原理。
JUC高级锁机制简介
Juc提供了高级锁的一些特性和应用,如:ReentrantLock:和synchronized具有差不多的语义,独占锁,同时只有一个线程能获得锁。
ReentrantReadWriteLock:读写锁。读允许共享,写独占。适用于读频繁的场景。
CountDownLatch:闭锁。使用场景类似比赛鸣枪,在没有鸣枪之前所有的运动员(线程)都必须等待。说白了就是使用于一个或多个线程等待某一个条件成立了,触发运行。
Semaphore:信号量。使用场景类似通行证,通行证数量有限,拿到证的才能通行。
CyclicBarrier:周期障碍。语义上和CountDownLatch有点类似,只有几个线程打到一个共同的状态之后,触发后续动作继续。不同在于1.达到共同状态后,可以指定一个后续触发的线程对象。2.周期性意味着可以周期运行。
FutureTask:带有返回值的异步执行。
至于各种锁机制使用场景这里不赘述,后面还有机会加一些例子。
以上不同的锁机制和使用场景,不管我们我们叫锁、闭锁、信号量等等。抽象之后都有一种共同的语义:
多线程并发的执行,之间通过某种共享状态来同步,只有当状态满足xxxx条件,才能触发线程执行xxxx。
这个共同的语义可以称之为同步器。可以认为以上所有的锁机制都可以基于同步器定制来实现的。
如果要实现一个特定场景的锁来同步线程的执行,其实并不难,如上文【java并发】基于JUC CAS原理,自己实现简单独占锁。而juc里的思想是将这些场景抽象出来的语义通过统一的同步框架来支持。
juc里所有的这些锁机制都是基于AQS(AbstractQueuedSynchronizer)框架上构建的。下面简单介绍下AQS(AbstractQueuedSynchronizer)。 可以参考Doug Lea的论文The java.util.concurrent Synchronizer Framework
AQS框架
AbstractQueuedSynchronizer是一个抽象类,里面定义了同步器的基本框架,实现了基本的结构功能。只留有状态条件的维护由具体同步器根据具体场景来定制,如上面提到的ReentrantLock、RetrantReadWriteLock和CountDownLatch等等。一个同步器至少需要包含两个功能:
1. 获取同步状态
如果允许,则获取锁,如果不允许就阻塞线程,直到同步状态允许获取。
2. 释放同步状态
修改同步状态,并且唤醒等待线程。
根据作者论文,aqs同步机制同时考虑了如下需求:
1. 独占锁和共享锁两种机制。
2. 线程阻塞后,如果需要取消,需要支持中断。
3. 线程阻塞后,如果有超时要求,应该支持超时后中断的机制。
实现涉及基本技术原理
1. 状态位
提供volatile变量 state; 用于同步线程之间的共享状态。通过CAS和volatile保证其原子性和可见性。对应源码里的定义:
Java代码
/**
* 同步状态
*/
private volatile int state;
/**
*cas
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2. 线程阻塞和唤醒
有别于wait和notiry。这里利用jdk1.5开始提供的LockSupport.park()和LockSupport.unpark()的本地方法实现,实现线程的阻塞和唤醒。
3. 阻塞线程节点队列 CHL Node queue。
根据论文里描述,AQS里将阻塞线程封装到一个内部类Node里。并维护一个CHL Node FIFO队列。CHL队列是一个非阻塞的FIFO队列,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和CAS保证节点插入和移除的原子性。实现无锁且快速的插入。关于非阻塞算法可以参考
Java 理论与实践: 非阻塞算法简介。CHL队列对应代码如下:
Java代码
/**
* CHL头节点
*/
rivate transient volatile Node head;
/**
* CHL尾节点
*/
private transient volatile Node tail;
Node节点是对Thread的一个封装,结构大概如下:
Java代码
static final class Node {
/** 代表线程已经被取消*/
static final int CANCELLED = 1;
/** 代表后续节点需要唤醒 */
static final int SIGNAL = -1;
/** 代表线程在等待某一条件/
static final int CONDITION = -2;
/** 标记是共享模式*/
static final Node SHARED = new Node();
/** 标记是独占模式*/
static final Node EXCLUSIVE = null;
/**
* 状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、0
*/
volatile int waitStatus;
/**
* 前置节点
*/
volatile Node prev;
/**
* 后续节点
*/
volatile Node next;
/**
* 节点代表的线程
*/
volatile Thread thread;
/**
*连接到等待condition的下一个节点
*/
Node nextWaiter;
}
AQS源码
AQS实现了一个同步器的基本结构,下面以独占锁和非独占锁区分来看看AQS的几个主要方法:
独占模式
独占获取:tryAcquire本身不会阻塞线程,如果返回true成功就继续,如果返回false那么就阻塞线程并加入阻塞队列。
Java代码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列
selfInterrupt();
}
独占且可中断模式获取:支持中断取消
Java代码
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
独占且支持超时模式获取:带有超时时间,如果经过超时时间则会退出。
Java代码
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
独占模式释放:释放成功会唤醒后续节点
Java代码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
共享模式
共享模式获取
Java代码
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
可中断模式共享获取
Java代码
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
共享模式带定时获取
Java代码
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
共享锁释放
Java代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
注意以上框架只定义了一个同步器的基本结构框架,的基本方法里依赖的tryAcquire、tryRelease、tryAcquireShared 、tryReleaseShared四个方法在AQS里没有实现,这四个方法不会涉及线程阻塞,而是由各自不同的使用场景根据情况来定制:
Java代码
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
从以上源码可以看出AQS实现基本的功能:
AQS虽然实现了acquire,和release方法是可能阻塞的,但是里面调用的tryAcquire和tryRelease是由子类来定制的且是不阻塞的可。以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现。所以可以认为同步器实现了一下功能:
1.同步器基本范式、结构
2.状态获取、释放成功或失败的后续行为,如线程的阻塞、唤醒机制
3.线程阻塞队列的维护
状态获取、释放动作本身是由子类来定义的。
还有以下一些私有方法,用于辅助完成以上的功能:
final boolean acquireQueued(final Nodenode, int arg) :申请队列
private Node enq(final Node node) :入队
private Node addWaiter(Node mode) :以mode创建创建节点,并加入到队列
private void unparkSuccessor(Node node) : 唤醒节点的后续节点,如果存在的话。
private void doReleaseShared() :释放共享锁
private void setHeadAndPropagate(Node node,int propagate):设置头,并且如果是共享模式且propagate大于0,则唤醒后续节点。
private void cancelAcquire(Node node) :取消正在获取的节点
private static void selfInterrupt():自我中断
private final booleanparkAndCheckInterrupt() :park并判断线程是否中断
从源码可以看出AQS实现基本的功能:
1.同步器基本范式、结构
2.线程的阻塞、唤醒机制
3.线程阻塞队列的维护
AQS虽然实现了acquire,和release方法,但是里面调用的tryAcquire和tryRelease是由子类来定制的。可以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现。
ReentrantLock原理
有了AQS基础,下面来看ReentrantLock的基本原理:ReentrantLock原理
由于同步器里已经定义了基本的结构,包括获取、释放、和阻塞队列维护和管理等。ReentrantLock是一个独占互斥锁,里只需要实现TryAcquire、TryRelease等方法,告诉同步器是否获取和释放状态成功。其他的后续行为都由AQS框架完成。由于ReentrantLock是一个可重入的独占锁,所以同步器状态可以直接根据是否==0来判断是否可用。
ReentrantLock主要提供lock和unlcok两个方法。
而lock和unlock正是基于AQS的一个子类同步器来实现。里面sync同步器有两种实现,一种是公平锁,一种是非公平锁。默认是非公平锁,看看非公平锁实现tryAcquire
Java代码
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果状态位为0,那么尝试获取
if (compareAndSetState(0, acquires)) {//基于CAS获取和修改状态
setExclusiveOwnerThread(current);//成功则设置当前线程为独占执行线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//当前线程已是执行线程
int nextc = c + acquires;//累加
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//其他情况下代表获取失败
再看看公平锁的tryAcquire
Java代码
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {//判断是否是第一个线程,是的话才尝试获取锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看看 非公平锁的不会根据FIFO,而公平锁会判断是否是第一个线程,根据FIFO来执行。
相关文章推荐
- JAVA NIO 中的 zerocopy 技术提高IO性能
- eclipse中在线安装FindBugs
- java反射
- 学习笔记之Myeclipse学习笔记(一)_“invalid Regular Expression Options”问题解决
- Java实现读取Excel
- 【MyBatis框架】mybatis和spring整合
- SpringMvc+Spring同时扫描出现问题。
- "could not create the java virtual machine
- java根据url获取完整域名
- java随机字符补充版
- myeclipse快捷键使用
- eclipse 生成javadoc文档
- java常用的7大排序算法汇总
- eclipse 高亮设置内容
- Spring常见注解的使用
- eclipse workspace设置
- eclipse 插件的安装与删除教程
- java解析apk安装包信息
- eclipse 快捷键介绍
- 【java工具类】MD5加密