您的位置:首页 > 其它

Thread详解13:ReentrantLock的用法(一)

2016-03-31 12:35 513 查看
Java里面提供了比synchronized更加灵活丰富的锁机制,它们有一个共同的接口Lock,我们先来学习这个接口,了解其协议和功能。下面是JDK文档,总结得非常精炼,包含的知识点非常多,所以一开始可能看不懂,不过没关系,后面一点点弄懂。

public interface Lock

Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

锁是控制多个线程对共享资源进行访问的工具。通常,锁提供了对共享资源的独占访问。一次只能有一个线程获得锁,对共享资源的所有访问都需要首先获得锁。不过,某些锁可能允许对共享资源并发访问,如 ReadWriteLock 的读取锁。

虽然 synchronized 方法和语句的范围机制使得使用监视器锁编程方便了很多,而且还帮助避免了很多涉及到锁的常见编程错误,但有时也需要以更为灵活的方式使用锁。例如,某些遍历并发访问的数据结果的算法要求使用 “hand-over-hand” 或 “chain locking”:获取节点 A 的锁,然后再获取节点 B 的锁,然后释放 A 并获取 C,然后释放 B 并获取 D,依此类推。Lock 接口的实现允许锁在不同的作用范围内获取和释放,并允许以任何顺序获取和释放多个锁,从而支持使用这种技术。

随着灵活性的增加,也带来了更多的责任。不使用块结构锁就失去了使用 synchronized 方法和语句时会出现的锁自动释放功能。在大多数情况下,应该使用以下语句:

Lock l = ...;
l.lock();
try {
// access the resource protected by this lock
} finally {
l.unlock();
}


锁定和取消锁定出现在不同作用范围中时,必须谨慎地确保保持锁定时所执行的所有代码用 try-finally 或 try-catch 加以保护,以确保在必要时释放锁。

Lock 实现提供了使用 synchronized 方法和语句所没有的其他功能,包括提供了一个非块结构的获取锁尝试 (tryLock())、一个获取可中断锁的尝试 (lockInterruptibly()) 和一个获取超时失效锁的尝试 (tryLock(long, TimeUnit))。

Lock 类还可以提供与隐式监视器锁完全不同的行为和语义,如保证排序、非重入用法或死锁检测。如果某个实现提供了这样特殊的语义,则该实现必须对这些语义加以记录。

注意,Lock 实例只是普通的对象,其本身可以在 synchronized 语句中作为目标使用。获取 Lock 实例的监视器锁与调用该实例的任何 lock() 方法没有特别的关系。为了避免混淆,建议除了在其自身的实现中之外,决不要以这种方式使用 Lock 实例。除非另有说明,否则为任何参数传递 null 值都将导致抛出 NullPointerException。



1 使用ReentrantLock进行同步

一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

ReentrantLock 将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。

建议总是 立即实践,使用 lock 块来调用 try,在之前/之后的构造中,最典型的代码如下:

class X {
private final ReentrantLock lock = new ReentrantLock();
// ...

public void m() {
lock.lock();  // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}


除了实现 Lock 接口,此类还定义了 isLocked 和 getLockQueueLength 方法,以及一些相关的 protected 访问方法,这些方法对检测和监视可能很有用。

此锁最多支持同一个线程发起的 2147483648 个递归锁。试图超过此限制会导致由锁方法抛出的 Error。

首先,我们先不管它有多牛逼,我们先使用它来代替synchronized实现常规的同步,也就是串行化,然后调用其中的一些方法看一看是什么效果:

Service.java

package testReentrantLock;

import java.util.concurrent.locks.ReentrantLock;

public class Service {
private ReentrantLock lock = new ReentrantLock();

public void testMethod() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println("******  " + Thread.currentThread().getName() + " is printing " + i + "  ******");
// 查询当前线程保持此锁的次数
int holdCount = lock.getHoldCount();

// 返回正等待获取此锁的线程估计数
int queuedLength = lock.getQueueLength();

// 如果此锁的公平设置为 true,则返回 true
boolean isFair = lock.isFair();

System.out.printf("---holdCount: %d;\n---queuedLength:%d;\n---isFair: %s\n\n", holdCount, queuedLength,
isFair);

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}


Thread1.java

package testReentrantLock;

public class Thread1 extends Thread {
private Service service;

public Thread1(Service service, String name) {
super(name);
this.service = service;
}

@Override
public void run() {
super.run();
service.testMethod();
}

public static void main(String[] args) {
Service service = new Service();
Thread1 tA = new Thread1(service, "Thread-A");
Thread1 tB = new Thread1(service, "Thread-B");
Thread1 tC = new Thread1(service, "Thread-C");
tA.start();
tB.start();
tC.start();
}

}


输出

******  Thread-A is printing 0  ******
---holdCount: 1;
---queuedLength:2;
---isFair: false

******  Thread-A is printing 1  ******
---holdCount: 1;
---queuedLength:2;
---isFair: false

******  Thread-A is printing 2  ******
---holdCount: 1;
---queuedLength:2;
---isFair: false

******  Thread-B is printing 0  ******
---holdCount: 1;
---queuedLength:1;
---isFair: false

******  Thread-B is printing 1  ******
---holdCount: 1;
---queuedLength:1;
---isFair: false

******  Thread-B is printing 2  ******
---holdCount: 1;
---queuedLength:1;
---isFair: false

******  Thread-C is printing 0  ******
---holdCount: 1;
---queuedLength:0;
---isFair: false

******  Thread-C is printing 1  ******
---holdCount: 1;
---queuedLength:0;
---isFair: false

******  Thread-C is printing 2  ******
---holdCount: 1;
---queuedLength:0;
---isFair: false


我稍微啰嗦地解释一下getHoldCount,它返回的是查询当前线程保存此lock的个数,也就是在此线程代码内,代用lock.lock() 的次数。一般一个线程内每个需要同步的代码块就会使用锁定嘛:

lock.lock();  // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}


2 使用Condition实现等待/通知



Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 Object.wait 做的那样。Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition() 方法。

看完下面的这个例子你就会使用Condition了。

BoundedBuffer.java

package testReentrantLock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
final ReentrantLock lock = new ReentrantLock();
// notFull 才能put
final Condition notFull = lock.newCondition();
// notEmpty 才能take
final Condition notEmpty = lock.newCondition();

final int[] items = new int[2];
int putptr, takeptr, count;

public void put(int x) throws InterruptedException {
// 每次put之前线程得获得这个锁才行
lock.lock();
try {
// 如果是full,则让这个企图put的线程等待
while (count == items.length) {
System.out.printf("----FULL---- The buffer is full!  %s has to wait.\n",
Thread.currentThread().getName());
notFull.await();
}

// 每次只要put成功,则通知一下 notEmpty,如果存在等待take的线程,则唤醒一个让它取
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public int take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.printf("----EMPTY---- The buffer is empty!  %s has to wait.\n",
Thread.currentThread().getName());
notEmpty.await();
}
// 每次take成功,则通知 notFull,如果有等待put的线程,则让它放
int x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}


BufferThread.java

package testReentrantLock;

public class BufferThread extends Thread {

private BoundedBuffer boundedBuffer = new BoundedBuffer();
private String name;

public BufferThread(BoundedBuffer boundedBuffer, String name) {
super(name);
this.boundedBuffer = boundedBuffer;
this.name = name;
}

@Override
public void run() {
super.run();
System.out.println(Thread.currentThread().getName() + " is running!");
if (name.startsWith("PUT")) {
for (int i = 1; i < 4; i++) {
try {
boundedBuffer.put(i);
System.out.printf("--PUT-- %s has put %d into the buffer.\n", Thread.currentThread().getName(), i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if (name.startsWith("TAKE")) {
for (int i = 1; i < 4; i++) {
try {
int value = boundedBuffer.take();
System.out.printf("--TAK-- %s has took %d from the buffer.\n", Thread.currentThread().getName(),
value);
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

public static void main(String[] args) {
BoundedBuffer boundedBuffer = new BoundedBuffer();
// 创建3个put线程,每个往Buffer里put 3次
BufferThread put1 = new BufferThread(boundedBuffer, "PUT1");
BufferThread put2 = new BufferThread(boundedBuffer, "PUT2");
BufferThread put3 = new BufferThread(boundedBuffer, "PUT3");

// 创建2个take线程,每个从Buffer里take 3次
BufferThread take1 = new BufferThread(boundedBuffer, "TAKE1");
BufferThread take2 = new BufferThread(boundedBuffer, "TAKE2");

put1.start();
put2.start();
put3.start();
take1.start();
take2.start();
}

}


输出

PUT2 is running!
TAKE1 is running!
TAKE2 is running!
----EMPTY---- The buffer is empty!  TAKE2 has to wait.
PUT1 is running!
PUT3 is running!
--PUT-- PUT3 has put 1 into the buffer.
--PUT-- PUT2 has put 1 into the buffer.
--TAK-- TAKE1 has took 1 from the buffer.
--TAK-- TAKE2 has took 1 from the buffer.
--PUT-- PUT1 has put 1 into the buffer.
--TAK-- TAKE2 has took 1 from the buffer.
----EMPTY---- The buffer is empty!  TAKE1 has to wait.
--PUT-- PUT3 has put 2 into the buffer.
----FULL---- The buffer is full!  PUT2 has to wait.
--PUT-- PUT1 has put 2 into the buffer.
--PUT-- PUT2 has put 2 into the buffer.
--TAK-- TAKE1 has took 2 from the buffer.
--TAK-- TAKE2 has took 2 from the buffer.
--TAK-- TAKE1 has took 2 from the buffer.
--PUT-- PUT3 has put 3 into the buffer.
--PUT-- PUT1 has put 3 into the buffer.
----FULL---- The buffer is full!  PUT2 has to wait.


使用Condition的优越性就在于,它把等待的线程分类了,利用同一个lock创建不同的Condition,你想把等待的线程分成几类你就创建多少个Condition就好了。在特定条件下,唤醒不同类别的等待线程,多么方便。如果这样说你还是不明白Condition的优越性,那么看看同样的功能使用synchronized编写是怎么样的:

对BoundedBuffer.java的改写:

package testReentrantLock;

public class BoundedBufferSyn {

final int[] items = new int[2];
int putptr, takeptr, count;

synchronized public void put(int x) throws InterruptedException {
// 如果是full,则让这个企图put的线程等待
while (count == items.length) {
System.out.printf("----FULL---- The buffer is full!  %s has to wait.\n", Thread.currentThread().getName());
// 这里的wait和Condition的await在功能上没有什么区别,重点在唤醒
wait();
}

// 每次只要put成功,则通知一下 notEmpty,如果存在等待take的线程,则唤醒一个让它取
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;

// 唤醒所有等待线程,让它们再去抢一次锁,而无法只通知特性的线程
notifyAll();
}

synchronized public int take() throws InterruptedException {
while (count == 0) {
System.out.printf("----EMPTY---- The buffer is empty!  %s has to wait.\n",
Thread.currentThread().getName());
wait();
}
// 每次take成功,则通知 notFull,如果有等待put的线程,则让它放
int x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notifyAll();
return x;
}
}


3 公平锁

ReentrantLock的公平锁是个啥? 先来看看JDK文档的解释:

此类的构造方法接受一个可选的公平 参数。当设置为 true 时,在多个线程的争用下,这些锁【倾向于】将访问权授予等待时间最长的线程。否则此锁将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁)相比,使用公平锁的程序在许多线程访问时【表现为很低的总体吞吐量(即速度很慢,常常极其慢)】,但是在获得锁和保证锁分配的均衡性时差异较小。

【不过要注意的是,公平锁不能保证线程调度的公平性。】因此,使用公平锁的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁时。还要注意的是,未定时的 tryLock 方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁是可用的,此方法就可以获得成功。

关于公平锁,我觉得文档已经解释的非常清楚了,我就不编写示例代码了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息