学习笔记 05 --- JUC锁
2017-01-13 09:55
302 查看
学习笔记 05 --- JUC锁
LockSupport:
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport通过unsafe函数中的接口来实现阻塞和解除阻塞的,AQS和其他的lock都会使用到这个基础类。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程(park()---获取许可,unpark()---释放许可),而且park()和unpark()不会遇到“Thread.suspend
和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。
注:
park()/unpark()和wait()/notify()的区别是后者让线程阻塞和解除阻塞前,必须通过synchronized获取对象的同步锁。
unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。
LockSupport不可重入,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。
LockSupport 只能阻塞当前线程,但是可以唤醒任意线程。
LockSupport 的park和Object的wait一样可以相应中断。但是线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException
。
*******************************************************************************************************************************
Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。
*******************************************************************************************************************************
LockSupport的源码如下所示:
ReentrantLock---互斥锁:
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
ReentrantLock 与 synchronized的区别:
1、与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
2、ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
3、ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
4、ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
5、ReentrantLock支持中断处理,且性能较synchronized会好些。
6、ReentrantLock持有的锁是需要手动通过unlock去关闭的。
ReentrantLock实现了Lock接口,内部使用static类继承AQS实现独占式的api来实现具体功能,使用AQS的state来表示锁可重入次数。对于公平和非公平的定义是通过对AbstractQueuedSynchronizer的扩展加以实现的,也就是在tryAcquire的实现上做了不同的控制,ReentrantLock中有3个内部类,分别是Sync、FairSync和NonfairSync,类结构如下所示:
ReentrantLock-->Lock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
NonfairSync/FairSync-->Sync是ReentrantLock的三个内部类
Node是AbstractQueuedSynchronizer的内部类
内部源码部分如下:
Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。tryAcquire方法由它的两个FairSync(公平锁)和NonfairSync(非公平锁)实现。
ReentrantLock根据传入构造方法的布尔型参数实例化出Sync的实现类FairSync和NonfairSync,分别表示公平的Sync和非公平的Sync,非公平会先直接尝试cas修改,不成功再去排队,就是插队,而公平锁就是老老实实请求排队操作。
ReentrantLock非公平锁:
假设线程1调用了ReentrantLock的lock()方法,那么线程1将会独占锁,整个调用链十分简单:
第一个获取锁的线程就做了两件事情:
1、设置AbstractQueuedSynchronizer的state为1
2、设置AbstractOwnableSynchronizer的thread为当前线程
那么这两步做完之后就表示线程1独占了锁。然后线程2也要尝试获取同一个锁,在线程1没有释放锁的情况下必然是行不通的,所以线程2就要阻塞。那么,线程2如何被阻塞?看下线程2的方法调用链,这就比较复杂了:
lock()
lock()在ReentrantLock.java的NonfairSync类中实现,它的源码如下:
说明:
lock()会先通过compareAndSet(0, 1)来判断“锁”是不是空闲状态。是的话,“当前线程”直接获取“锁”;否则的话,调用acquire(1)获取锁。
(01) compareAndSetState()是CAS函数,它的作用是比较并设置当前锁的状态。若锁的状态值为0,则设置锁的状态值为1。
(02) setExclusiveOwnerThread(Thread.currentThread())的作用是,设置“当前线程”为“锁”的持有者。
“公平锁”和“非公平锁”关于lock()的对比
acquire()
acquire()在AQS中实现的,它的源码如下:
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列依次排序,然后获取锁。
(02) “当前线程”尝试失败的情况下,会先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。
(03) 然后,调用acquireQueued()获取锁。在acquireQueued()中,当前线程会等待它在“CLH队列”中前面的所有线程执行并释放锁之后,才能获取锁并返回。如果“当前线程”在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
非公平锁的tryAcquire()在ReentrantLock.java的NonfairSync类中实现,源码如下:
nonfairTryAcquire()在ReentrantLock.java的Sync类中实现,源码如下:
说明:
根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。
(01) 如果“锁”没有被任何线程拥有,则通过CAS函数设置“锁”的状态为acquires,同时,设置“当前线程”为锁的持有者,然后返回true。
(02) 如果“锁”的持有者已经是当前线程,则将更新锁的状态即可。
(03) 如果不术语上面的两种情况,则认为尝试失败。
“公平锁”和“非公平锁”关于tryAcquire()的对比
先创建一个当前线程的Node,模式为独占模式(因为传入的mode是一个NULL),再判断一下队列上有没有节点,没有就创建一个队列,因此走enq方法:
上面这段代码可以用下图很好的标示出来:
每一步都用图表示出来了,由于线程2所在的Node是第一个要等待的Node,因此FIFO队列上肯定没有内容,tail为null,走的就是第4行~第10行的代码逻辑。这里用了CAS设置头Node,当然有可能线程2设置头Node的时候CPU切换了,线程3已经把头Node设置好了形成了上图所示的一个队列,这时线程2再循环一次获取tail,由于tail是volatile的,所以对线程2可见,线程2看见tail不为null,就走到了13行的else里面去往尾Node后面添加自身。整个过程下来,形成了一个双向队列。最后走AQS的acquireQueued(node,
1):
此时再做判断,由于线程2是双向队列的真正的第一个Node(前面还有一个h),所以第5行~第10行再次判断一下线程2能不能获取锁(可能这段时间内线程1已经执行完了把锁释放了,state从1变为了0),如果还是不行,先调用AQS的shouldParkAfterFailedAcquire(p, node)方法:
这个waitStatus是h的waitStatus,很明显是0,所以此时把h的waitStatus设置为Noed.SIGNAL即-1并返回false。既然返回了false,上面的acquireQueued的11行if自然不成立,再走一次for循环,还是先尝试获取锁,不成功,继续走shouldParkAfterFailedAcquire,此时waitStatus为-1,小于0,走第三行的判断,返回true。然后走acquireQueued的11行if的第二个判断条件parkAndCheckInterrupt:
最后一步,调用LockSupport的park方法阻塞住了当前的线程。
简化版的步骤:(非公平锁的核心)
基于CAS尝试将state(锁数量)从0设置为1
A、如果设置成功,设置当前线程为独占锁的线程;
B、如果设置失败,还会再获取一次锁数量,
B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
ReentrantLock公平锁:
lock()
lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:
说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因了。
可重入就是指锁可以被单个线程多次获取。
acquire()
acquire()在AQS中实现的,它的源码如下:
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。
tryAcquire()
公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:
说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。
addWaiter()
addWaiter()在AQS中实现,源码如下:
说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。
acquireQueued()
acquireQueued()在AQS中实现,源码如下:
说明:acquireQueued()的目的是从队列中获取锁。
selfInterrupt()
selfInterrupt()是AQS中实现,源码如下:
说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。
在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!
总结
再回过头看看acquire()函数,它最终的目的是获取锁!
(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
简化版的步骤:(公平锁的核心)
获取一次锁数量,
B1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
B2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
不会尝试CAS操作去插队,而是获取不到锁就去排队。方法和非公平锁类似,不同的地方是FairSync内部类中的tryAcquire方法:
从源码中可以看出,上面红色代码的地方是公平锁多出来的代码,通过对公平锁的了解和代码的分析,我认为这个判断是判断当前线程是否为等待时间最长的线程,也就是说队列里面是否有其他线程再等待,如果有其他线程再等待则当前线程不获取锁,返回继续阻塞。这也就是FIFO原理先进来的进程先执行------这个理解不知道是否准确,如果不准确希望大家可以指正出来,在评论中知会我。谢谢!
基于JDK1.7的话
红色部分代码是:!hasQueuedPredecessors()
ReentrantLock释放锁:
unlock()在ReentrantLock.java中实现的,源码如下:
说明:
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
release()
release()在AQS中实现的,源码如下:
说明:
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
tryRelease()
tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:
首先,只有当c==0的时候才会让free=true,这和上面一个线程多次调用lock方法累加state是对应的,调用了多少次的lock()方法自然必须调用同样次数的unlock()方法才行,这样才把一个锁给全部解开。
当一条线程对同一个ReentrantLock全部解锁之后,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,方法返回true。代码继续往下走,上面的release方法的第四行,h不为null成立,h的waitStatus为-1,不等于0也成立,所以走第5行的unparkSuccessor方法:
unparkSuccessor()
在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。
s即h的下一个Node,这个Node里面的线程就是线程2,由于这个Node不等于null,所以走21行,线程2被unPark了,得以运行。有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减少一个Node呢?这是一个很巧妙的设计,又回到了AQS的acquireQueued方法了:
被阻塞的线程2是被阻塞在第12行,注意这里并没有return语句,也就是说,阻塞完成线程2依然会进行for循环。然后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是一样的原理。
这里有一个细节,看一下setHead方法:
setHead方法里面的前驱Node是Null,也没有线程,那么为什么不用一个在等待的线程作为Head Node呢?
因为一个线程随时有可能因为中断而取消,而取消的话,Node自然就要被GC了,那GC前必然要把头Node的后继Node变为一个新的头而且要应对多种情况,这样就很麻烦。用一个没有thread的Node作为头,相当于起了一个引导作用,因为head没有线程,自然也不会被取消。
再看一下上面unparkSuccessor的14行~20行,就是为了防止head的下一个node被取消的情况,这样,就从尾到头遍历,找出离head最近的一个node,对这个node进行unPark操作。
简化版的步骤:(释放锁的步骤)
1)获取当前的锁数量,然后用这个锁数量减去解锁的数量(这里为1),最后得出结果c
2)判断当前线程是不是独占锁的线程,如果不是,抛出异常
3)如果c==0,说明锁被成功释放,将当前的独占线程置为null,锁数量置为0,返回true
4)如果c!=0,说明释放锁失败,锁数量置为c,返回false
5)如果锁被释放成功的话,唤醒距离头节点最近的一个非取消的节点
Condition:
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
Condition的函数列表:
Condition和ReentrantLock联合使用的示例:
Condition更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
*******************************未完待续***********************************
该文为本人学习的笔记,方便以后自己查阅。参考网上各大帖子,取其精华整合自己的理解而成。如有不对的地方,请多多指正!自勉!共勉!
************************************************************************
LockSupport:
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport通过unsafe函数中的接口来实现阻塞和解除阻塞的,AQS和其他的lock都会使用到这个基础类。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程(park()---获取许可,unpark()---释放许可),而且park()和unpark()不会遇到“Thread.suspend
和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。
注:
park()/unpark()和wait()/notify()的区别是后者让线程阻塞和解除阻塞前,必须通过synchronized获取对象的同步锁。
unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。
LockSupport不可重入,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。
LockSupport 只能阻塞当前线程,但是可以唤醒任意线程。
LockSupport 的park和Object的wait一样可以相应中断。但是线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException
。
*******************************************************************************************************************************
Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。
*******************************************************************************************************************************
LockSupport的源码如下所示:
/* * @(#)LockSupport.java 1.12 06/03/30 * * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package java.util.concurrent.locks; import java.util.concurrent.*; import sun.misc.Unsafe; /** * Basic thread blocking primitives for creating locks and other * synchronization classes. * * <p>This class associates, with each thread that uses it, a permit * (in the sense of the {@link java.util.concurrent.Semaphore * Semaphore} class). A call to {@code park} will return immediately * if the permit is available, consuming it in the process; otherwise * it <em>may</em> block. A call to {@code unpark} makes the permit * available, if it was not already available. (Unlike with Semaphores * though, permits do not accumulate. There is at most one.) * * <p>Methods {@code park} and {@code unpark} provide efficient * means of blocking and unblocking threads that do not encounter the * problems that cause the deprecated methods {@code Thread.suspend} * and {@code Thread.resume} to be unusable for such purposes: Races * between one thread invoking {@code park} and another thread trying * to {@code unpark} it will preserve liveness, due to the * permit. Additionally, {@code park} will return if the caller's * thread was interrupted, and timeout versions are supported. The * {@code park} method may also return at any other time, for "no * reason", so in general must be invoked within a loop that rechecks * conditions upon return. In this sense {@code park} serves as an * optimization of a "busy wait" that does not waste as much time * spinning, but must be paired with an {@code unpark} to be * effective. * * <p>The three forms of {@code park} each also support a * {@code blocker} object parameter. This object is recorded while * the thread is blocked to permit monitoring and diagnostic tools to * identify the reasons that threads are blocked. (Such tools may * access blockers using method {@link #getBlocker}.) The use of these * forms rather than the original forms without this parameter is * strongly encouraged. The normal argument to supply as a * {@code blocker} within a lock implementation is {@code this}. * * <p>These methods are designed to be used as tools for creating * higher-level synchronization utilities, and are not in themselves * useful for most concurrency control applications. The {@code park} * method is designed for use only in constructions of the form: * <pre>while (!canProceed()) { ... LockSupport.park(this); }</pre> * where neither {@code canProceed} nor any other actions prior to the * call to {@code park} entail locking or blocking. Because only one * permit is associated with each thread, any intermediary uses of * {@code park} could interfere with its intended effects. * * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out * non-reentrant lock class: * <pre>{@code * class FIFOMutex { * private final AtomicBoolean locked = new AtomicBoolean(false); * private final Queue<Thread> waiters * = new ConcurrentLinkedQueue<Thread>(); * * public void lock() { * boolean wasInterrupted = false; * Thread current = Thread.currentThread(); * waiters.add(current); * * // Block while not first in queue or cannot acquire lock * while (waiters.peek() != current || * !locked.compareAndSet(false, true)) { * LockSupport.park(this); * if (Thread.interrupted()) // ignore interrupts while waiting * wasInterrupted = true; * } * * waiters.remove(); * if (wasInterrupted) // reassert interrupt status on exit * current.interrupt(); * } * * public void unlock() { * locked.set(false); * LockSupport.unpark(waiters.peek()); * } * }}</pre> */ public class LockSupport { private LockSupport() {} // Cannot be instantiated. // Hotspot implementation via intrinsics API private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long parkBlockerOffset; static { try { parkBlockerOffset = unsafe.objectFieldOffset (java.lang.Thread.class.getDeclaredField("parkBlocker")); } catch (Exception ex) { throw new Error(ex); } } private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. unsafe.putObject(t, parkBlockerOffset, arg); } /** * Makes available the permit for the given thread, if it * was not already available. If the thread was blocked on * {@code park} then it will unblock. Otherwise, its next call * to {@code park} is guaranteed not to block. This operation * is not guaranteed to have any effect at all if the given * thread has not been started. * * @param thread the thread to unpark, or {@code null}, in which case * this operation has no effect */ // 如果给定线程的许可尚不可用,则使其可用。 public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); } /** * Disables the current thread for thread scheduling purposes unless the * permit is available. * * <p>If the permit is available then it is consumed and the call returns * immediately; otherwise * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. * * @param blocker the synchronization object responsible for this * thread parking * @since 1.6 */ // 为了线程调度,在许可可用之前禁用当前线程。 public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } /** * Disables the current thread for thread scheduling purposes, for up to * the specified waiting time, unless the permit is available. * * <p>If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current * thread; or * * <li>The specified waiting time elapses; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the elapsed time * upon return. * * @param blocker the synchronization object responsible for this * thread parking * @param nanos the maximum number of nanoseconds to wait * @since 1.6 */ // 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间。 public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, nanos); setBlocker(t, null); } } /** * Disables the current thread for thread scheduling purposes, until * the specified deadline, unless the permit is available. * * <p>If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread; or * * <li>The specified deadline passes; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the current time * upon return. * * @param blocker the synchronization object responsible for this * thread parking * @param deadline the absolute time, in milliseconds from the Epoch, * to wait until * @since 1.6 */ // 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。 public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(true, deadline); setBlocker(t, null); } /** * Returns the blocker object supplied to the most recent * invocation of a park method that has not yet unblocked, or null * if not blocked. The value returned is just a momentary * snapshot -- the thread may have since unblocked or blocked on a * different blocker object. * * @return the blocker * @since 1.6 */ // 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。 public static Object getBlocker(Thread t) { return unsafe.getObjectVolatile(t, parkBlockerOffset); } /** * Disables the current thread for thread scheduling purposes unless the * permit is available. * * <p>If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of three * things happens: * * <ul> * * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. */ // 为了线程调度,禁用当前线程,除非许可可用。 public static void park() { unsafe.park(false, 0L); } /** * Disables the current thread for thread scheduling purposes, for up to * the specified waiting time, unless the permit is available. * * & 20000 lt;p>If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The specified waiting time elapses; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the elapsed time * upon return. * * @param nanos the maximum number of nanoseconds to wait */ // 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。 public static void parkNanos(long nanos) { if (nanos > 0) unsafe.park(false, nanos); } /** * Disables the current thread for thread scheduling purposes, until * the specified deadline, unless the permit is available. * * <p>If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The specified deadline passes; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the current time * upon return. * * @param deadline the absolute time, in milliseconds from the Epoch, * to wait until */ // 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。 public static void parkUntil(long deadline) { unsafe.park(true, deadline); } }
ReentrantLock---互斥锁:
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
ReentrantLock 与 synchronized的区别:
1、与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
2、ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
3、ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
4、ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
5、ReentrantLock支持中断处理,且性能较synchronized会好些。
6、ReentrantLock持有的锁是需要手动通过unlock去关闭的。
ReentrantLock实现了Lock接口,内部使用static类继承AQS实现独占式的api来实现具体功能,使用AQS的state来表示锁可重入次数。对于公平和非公平的定义是通过对AbstractQueuedSynchronizer的扩展加以实现的,也就是在tryAcquire的实现上做了不同的控制,ReentrantLock中有3个内部类,分别是Sync、FairSync和NonfairSync,类结构如下所示:
ReentrantLock-->Lock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
NonfairSync/FairSync-->Sync是ReentrantLock的三个内部类
Node是AbstractQueuedSynchronizer的内部类
内部源码部分如下:
<span style="font-size:18px;">abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** 非公平锁的acquire */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判断state是否被占用 if (c == 0) { //没有被占用,直接cas占用,成功的话就设置当前线程为占用线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果state不为0,因为是可重入锁,需要判断是不是自己占用的,如果是累加state值 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //acquire失败,AQS等待队列排队 return false; } //release的时候也需要判断是不是当前线程。因为可重入,所以可以lock多次,release的时候就要release多次 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /**AbstractOwnableSynchronizer.exclusiveOwnerThread 判断是否为当前占用lock的线程*/ protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } /**lock.newCondition每次直接new一个AQS的conditionObject维护一个条件队列*/ final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }</span>
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /**非公平锁进来就cas,成功就设置独占线程,不成功再去Acquire排队,这就是公平不公平的区分*/ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /**直接使用父类中notFairAcquire*/ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /**公平锁的tryAcquire*/ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //锁还在并且AQS没有其他等待节点,cas设置,然后再设置独占线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //因为是可重入锁,state不为0看是不是自己占用了,如果是更新state值 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } //判断队列没有其他等待节点 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
// 创建一个 ReentrantLock ,默认是“非公平锁”。 ReentrantLock() // 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。 ReentrantLock(boolean fair) // 查询当前线程保持此锁的次数。 int getHoldCount() // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 protected Thread getOwner() // 返回一个 collection,它包含可能正等待获取此锁的线程。 protected Collection<Thread> getQueuedThreads() // 返回正等待获取此锁的线程估计数。 int getQueueLength() // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 protected Collection<Thread> getWaitingThreads(Condition condition) // 返回等待与此锁相关的给定条件的线程估计数。 int getWaitQueueLength(Condition condition) // 查询给定线程是否正在等待获取此锁。 boolean hasQueuedThread(Thread thread) // 查询是否有些线程正在等待获取此锁。 boolean hasQueuedThreads() // 查询是否有些线程正在等待与此锁有关的给定条件。 boolean hasWaiters(Condition condition) // 如果是“公平锁”返回true,否则返回false。 boolean isFair() // 查询当前线程是否保持此锁。 boolean isHeldByCurrentThread() // 查询此锁是否由任意线程保持。 boolean isLocked() // 获取锁。 void lock() // 如果当前线程未被中断,则获取锁。 void lockInterruptibly() // 返回用来与此 Lock 实例一起使用的 Condition 实例。 Condition newCondition() // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。 boolean tryLock() // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。 boolean tryLock(long timeout, TimeUnit unit) // 试图释放此锁。 void unlock()
Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。tryAcquire方法由它的两个FairSync(公平锁)和NonfairSync(非公平锁)实现。
ReentrantLock根据传入构造方法的布尔型参数实例化出Sync的实现类FairSync和NonfairSync,分别表示公平的Sync和非公平的Sync,非公平会先直接尝试cas修改,不成功再去排队,就是插队,而公平锁就是老老实实请求排队操作。
ReentrantLock非公平锁:
假设线程1调用了ReentrantLock的lock()方法,那么线程1将会独占锁,整个调用链十分简单:
第一个获取锁的线程就做了两件事情:
1、设置AbstractQueuedSynchronizer的state为1
2、设置AbstractOwnableSynchronizer的thread为当前线程
那么这两步做完之后就表示线程1独占了锁。然后线程2也要尝试获取同一个锁,在线程1没有释放锁的情况下必然是行不通的,所以线程2就要阻塞。那么,线程2如何被阻塞?看下线程2的方法调用链,这就比较复杂了:
lock()
lock()在ReentrantLock.java的NonfairSync类中实现,它的源码如下:
1 final void lock() { 2 if (compareAndSetState(0, 1)) 3 setExclusiveOwnerThread(Thread.currentThread()); 4 else 5 acquire(1); 6 }
说明:
lock()会先通过compareAndSet(0, 1)来判断“锁”是不是空闲状态。是的话,“当前线程”直接获取“锁”;否则的话,调用acquire(1)获取锁。
(01) compareAndSetState()是CAS函数,它的作用是比较并设置当前锁的状态。若锁的状态值为0,则设置锁的状态值为1。
(02) setExclusiveOwnerThread(Thread.currentThread())的作用是,设置“当前线程”为“锁”的持有者。
“公平锁”和“非公平锁”关于lock()的对比
公平锁 -- 公平锁的lock()函数,会直接调用acquire(1)。 非公平锁 -- 非公平锁会先判断当前锁的状态是不是空闲,是的话,就不排队,而是直接获取锁。
acquire()
acquire()在AQS中实现的,它的源码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列依次排序,然后获取锁。
(02) “当前线程”尝试失败的情况下,会先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。
(03) 然后,调用acquireQueued()获取锁。在acquireQueued()中,当前线程会等待它在“CLH队列”中前面的所有线程执行并释放锁之后,才能获取锁并返回。如果“当前线程”在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
非公平锁的tryAcquire()在ReentrantLock.java的NonfairSync类中实现,源码如下:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire()在ReentrantLock.java的Sync类中实现,源码如下:
final boolean nonfairTryAcquire(int acquires) { // 获取“当前线程” final Thread current = Thread.currentThread(); // 获取“锁”的状态 int c = getState(); // c=0意味着“锁没有被任何线程锁拥有” if (c == 0) { // 若“锁没有被任何线程锁拥有”,则通过CAS函数设置“锁”的状态为acquires。 // 同时,设置“当前线程”为锁的持有者。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果“锁”的持有者已经是“当前线程”, // 则将更新锁的状态。 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
说明:
根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。
(01) 如果“锁”没有被任何线程拥有,则通过CAS函数设置“锁”的状态为acquires,同时,设置“当前线程”为锁的持有者,然后返回true。
(02) 如果“锁”的持有者已经是当前线程,则将更新锁的状态即可。
(03) 如果不术语上面的两种情况,则认为尝试失败。
“公平锁”和“非公平锁”关于tryAcquire()的对比
公平锁和非公平锁,它们尝试获取锁的方式不同。 公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。 而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
先创建一个当前线程的Node,模式为独占模式(因为传入的mode是一个NULL),再判断一下队列上有没有节点,没有就创建一个队列,因此走enq方法:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 Node h = new Node(); // Dummy header 6 h.next = node; 7 node.prev = h; 8 if (compareAndSetHead(h)) { 9 tail = node; 10 return h; 11 } 12 } 13 else { 14 node.prev = t; 15 if (compareAndSetTail(t, node)) { 16 t.next = node; 17 return t; 18 } 19 } 20 } 21 }
上面这段代码可以用下图很好的标示出来:
每一步都用图表示出来了,由于线程2所在的Node是第一个要等待的Node,因此FIFO队列上肯定没有内容,tail为null,走的就是第4行~第10行的代码逻辑。这里用了CAS设置头Node,当然有可能线程2设置头Node的时候CPU切换了,线程3已经把头Node设置好了形成了上图所示的一个队列,这时线程2再循环一次获取tail,由于tail是volatile的,所以对线程2可见,线程2看见tail不为null,就走到了13行的else里面去往尾Node后面添加自身。整个过程下来,形成了一个双向队列。最后走AQS的acquireQueued(node,
1):
1 final boolean acquireQueued(final Node node, int arg) { 2 try { 3 boolean interrupted = false; 4 for (;;) { 5 final Node p = node.predecessor(); 6 if (p == head && tryAcquire(arg)) { 7 setHead(node); 8 p.next = null; // help GC 9 return interrupted; 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } catch (RuntimeException ex) { 16 cancelAcquire(node); 17 throw ex; 18 } 19 }
此时再做判断,由于线程2是双向队列的真正的第一个Node(前面还有一个h),所以第5行~第10行再次判断一下线程2能不能获取锁(可能这段时间内线程1已经执行完了把锁释放了,state从1变为了0),如果还是不行,先调用AQS的shouldParkAfterFailedAcquire(p, node)方法:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int s = pred.waitStatus; 3 if (s < 0) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park 7 */ 8 return true; 9 if (s > 0) { 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; 16 } while (pred.waitStatus > 0); 17 pred.next = node; 18 } 19 else 20 /* 21 * Indicate that we need a signal, but don't park yet. Caller 22 * will need to retry to make sure it cannot acquire before 23 * parking. 24 */ 25 compareAndSetWaitStatus(pred, 0, Node.SIGNAL); 26 return false; 27 }
这个waitStatus是h的waitStatus,很明显是0,所以此时把h的waitStatus设置为Noed.SIGNAL即-1并返回false。既然返回了false,上面的acquireQueued的11行if自然不成立,再走一次for循环,还是先尝试获取锁,不成功,继续走shouldParkAfterFailedAcquire,此时waitStatus为-1,小于0,走第三行的判断,返回true。然后走acquireQueued的11行if的第二个判断条件parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
最后一步,调用LockSupport的park方法阻塞住了当前的线程。
简化版的步骤:(非公平锁的核心)
基于CAS尝试将state(锁数量)从0设置为1
A、如果设置成功,设置当前线程为独占锁的线程;
B、如果设置失败,还会再获取一次锁数量,
B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
ReentrantLock公平锁:
lock()
lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:
final void lock() { acquire(1); }
说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因了。
可重入就是指锁可以被单个线程多次获取。
acquire()
acquire()在AQS中实现的,它的源码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。
tryAcquire()
公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:
protected final boolean tryAcquire(int acquires) { // 获取“当前线程” final Thread current = Thread.currentThread(); // 获取“独占锁”的状态 int c = getState(); // c=0意味着“锁没有被任何线程锁拥有”, if (c == 0) { // 若“锁没有被任何线程锁拥有”, // 则判断“当前线程”是不是CLH队列中的第一个线程线程, // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果“独占锁”的拥有者已经为“当前线程”, // 则将更新锁的状态。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。
addWaiter()
addWaiter()在AQS中实现,源码如下:
private Node addWaiter(Node mode) { // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。 enq(node); return node; }
说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。
acquireQueued()
acquireQueued()在AQS中实现,源码如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // interrupted表示在CLH队列的调度中, // “当前线程”在休眠时,有没有被中断过。 boolean interrupted = false; for (;;) { // 获取上一个节点。 // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
说明:acquireQueued()的目的是从队列中获取锁。
selfInterrupt()
selfInterrupt()是AQS中实现,源码如下:
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。
在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!
总结
再回过头看看acquire()函数,它最终的目的是获取锁!
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
简化版的步骤:(公平锁的核心)
获取一次锁数量,
B1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
B2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
不会尝试CAS操作去插队,而是获取不到锁就去排队。方法和非公平锁类似,不同的地方是FairSync内部类中的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
从源码中可以看出,上面红色代码的地方是公平锁多出来的代码,通过对公平锁的了解和代码的分析,我认为这个判断是判断当前线程是否为等待时间最长的线程,也就是说队列里面是否有其他线程再等待,如果有其他线程再等待则当前线程不获取锁,返回继续阻塞。这也就是FIFO原理先进来的进程先执行------这个理解不知道是否准确,如果不准确希望大家可以指正出来,在评论中知会我。谢谢!
基于JDK1.7的话
红色部分代码是:!hasQueuedPredecessors()
ReentrantLock释放锁:
unlock()在ReentrantLock.java中实现的,源码如下:
public void unlock() { sync.release(1); }
说明:
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
release()
release()在AQS中实现的,源码如下:
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 }
说明:
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
tryRelease()
tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:
protected final boolean tryRelease(int releases) { // c是本次释放锁之后的状态 int c = getState() - releases; // 如果“当前线程”不是“锁的持有者”,则抛出异常! if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 设置当前线程的锁的状态。 setState(c); return free; }
首先,只有当c==0的时候才会让free=true,这和上面一个线程多次调用lock方法累加state是对应的,调用了多少次的lock()方法自然必须调用同样次数的unlock()方法才行,这样才把一个锁给全部解开。
当一条线程对同一个ReentrantLock全部解锁之后,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,方法返回true。代码继续往下走,上面的release方法的第四行,h不为null成立,h的waitStatus为-1,不等于0也成立,所以走第5行的unparkSuccessor方法:
unparkSuccessor()
在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。
private void unparkSuccessor(Node node) { // 获取当前线程的状态 int ws = node.waitStatus; // 如果状态<0,则设置状态=0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。 // 这里的有效,是指“后继节点对应的线程状态<=0” Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒“后继节点对应的线程” if (s != null) LockSupport.unpark(s.thread); }
s即h的下一个Node,这个Node里面的线程就是线程2,由于这个Node不等于null,所以走21行,线程2被unPark了,得以运行。有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减少一个Node呢?这是一个很巧妙的设计,又回到了AQS的acquireQueued方法了:
1 final boolean acquireQueued(final Node node, int arg) { 2 try { 3 boolean interrupted = false; 4 for (;;) { 5 final Node p = node.predecessor(); 6 if (p == head && tryAcquire(arg)) { 7 setHead(node); 8 p.next = null; // help GC 9 return interrupted; 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } catch (RuntimeException ex) { 16 cancelAcquire(node); 17 throw ex; 18 } 19 }
被阻塞的线程2是被阻塞在第12行,注意这里并没有return语句,也就是说,阻塞完成线程2依然会进行for循环。然后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是一样的原理。
这里有一个细节,看一下setHead方法:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
setHead方法里面的前驱Node是Null,也没有线程,那么为什么不用一个在等待的线程作为Head Node呢?
因为一个线程随时有可能因为中断而取消,而取消的话,Node自然就要被GC了,那GC前必然要把头Node的后继Node变为一个新的头而且要应对多种情况,这样就很麻烦。用一个没有thread的Node作为头,相当于起了一个引导作用,因为head没有线程,自然也不会被取消。
再看一下上面unparkSuccessor的14行~20行,就是为了防止head的下一个node被取消的情况,这样,就从尾到头遍历,找出离head最近的一个node,对这个node进行unPark操作。
简化版的步骤:(释放锁的步骤)
1)获取当前的锁数量,然后用这个锁数量减去解锁的数量(这里为1),最后得出结果c
2)判断当前线程是不是独占锁的线程,如果不是,抛出异常
3)如果c==0,说明锁被成功释放,将当前的独占线程置为null,锁数量置为0,返回true
4)如果c!=0,说明释放锁失败,锁数量置为c,返回false
5)如果锁被释放成功的话,唤醒距离头节点最近的一个非取消的节点
Condition:
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
Condition的函数列表:
// 造成当前线程在接到信号或被中断之前一直处于等待状态。 void await() // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 boolean await(long time, TimeUnit unit) // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 long awaitNanos(long nanosTimeout) // 造成当前线程在接到信号之前一直处于等待状态。 void awaitUninterruptibly() // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。 boolean awaitUntil(Date deadline) // 唤醒一个等待线程。 void signal() // 唤醒所有等待线程。 void signalAll()
Condition和ReentrantLock联合使用的示例:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); //获取锁 try { // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。 while (count == items.length) notFull.await(); // 将x添加到缓冲中 items[putptr] = x; // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。 if (++putptr == items.length) putptr = 0; // 将“缓冲”数量+1 ++count; // 唤醒take线程,因为take线程通过notEmpty.await()等待 notEmpty.signal(); // 打印写入的数据 System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x); } finally { lock.unlock(); // 释放锁 } } public Object take() throws InterruptedException { lock.lock(); //获取锁 try { // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。 while (count == 0) notEmpty.await(); // 将x从缓冲中取出 Object x = items[takeptr]; // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。 if (++takeptr == items.length) takeptr = 0; // 将“缓冲”数量-1 --count; // 唤醒put线程,因为put线程通过notFull.await()等待 notFull.signal(); // 打印取出的数据 System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x); return x; } finally { lock.unlock(); // 释放锁 } } } public class ConditionTest2 { private static BoundedBuffer bb = new BoundedBuffer(); public static void main(String[] args) { // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9); // 启动10个“读线程”,从BoundedBuffer中不断的读数据。 for (int i=0; i<10; i++) { new PutThread("p"+i, i).start(); new TakeThread("t"+i).start(); } } static class PutThread extends Thread { private int num; public PutThread(String name, int num) { super(name); this.num = num; } public void run() { try { Thread.sleep(1); // 线程休眠1ms bb.put(num); // 向BoundedBuffer中写入数据 } catch (InterruptedException e) { } } } static class TakeThread extends Thread { public TakeThread(String name) { super(name); } public void run() { try { Thread.sleep(10); // 线程休眠1ms Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据 } catch (InterruptedException e) { } } } }
Condition更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
*******************************未完待续***********************************
该文为本人学习的笔记,方便以后自己查阅。参考网上各大帖子,取其精华整合自己的理解而成。如有不对的地方,请多多指正!自勉!共勉!
************************************************************************
相关文章推荐
- 学习笔记12—JAVA高级视频05_网络编程
- 学习笔记—C语言基础篇05
- JUC锁-05之 非公平锁
- Duilib学习笔记《05》— 消息响应处理
- Duilib学习笔记《05》— 消息响应处理
- 《机电传动控制》学习笔记05-1
- 《30天自制操作系统》05_day_学习笔记
- 《机电传动控制》学习笔记05-2
- 《Effective C++》学习笔记条款05了解C++默默编写并调用哪些函数
- android开发之this.finish()的使用 分类: android 学习笔记 2015-07-18 19:05 30人阅读 评论(0) 收藏
- 《深入理解java虚拟机》学习笔记05--HotSpot中对象存活判读算法和垃圾收集算法的实现
- 《Effective C++ 》学习笔记——条款05
- java io系列05之 ObjectInputStream 和 ObjectOutputStream
- Object-C学习笔记——内存管理
- mybatis-05-高级映射
- 戴文的Linux内核专题:05 配置内核 (1)
- 扒扒数据库长长知识(下载资源组合看)之05 (多行函数_分组数据_分组过滤)
- LINUX_C编程实战-第九章《信号》学习笔记
- 马士兵Struts2学习笔记
- 多线程“基础篇”05之 线程等待与唤醒