您的位置:首页 > 编程语言 > Java开发

Java并发之(3):锁

2016-02-04 15:04 555 查看
锁是并发编程中的重要概念,用来控制多个线程对同一资源的并发访问,在支持并发的编程语言中都有体现,比如c++ python等。本文主要讲解Java中的锁,或者说是重入锁。之所以这么说是因为在Java中,锁主要就是指重入锁。 java中的锁分为两大类:一种是synchronized内置锁,另一种是显式的Lock锁。在Java中,Lock接口的主要实现是重入锁ReentrantLock,而内置锁也是可重入锁。

这两种锁的简单比较如下:

synchronized内置锁和ReentrantLock都是可重入锁。 synchronized就不是可中断锁,而Lock是可中断锁。 synchronized 和Lock都是不公平锁。 Lock可以设置为公平锁。

在nts 生产代码中,并没有使用显式Lock,而是大量地使用了synchronized关键字。本文主要包括这两种锁的实现、主要的方法和使用。其中在讲解使用时,会结合在jdk中的使用以及在nts中的使用等方式来讲解。

outline:
1 java 的内置锁(synchronized关键字)的使用
2 ReentrantLock的实现、主要方法和使用

[b]3 Condition的实现、主要方法和使用[/b]

4 ReentrantReadWriteLock的实现、主要方法和使用

1 [b]java 的内置锁(synchronized关键字)的使用:[/b]

synchronized 关键字有两种用法:synchronized 方法和 synchronized 块。其中synchronized方法又分为静态方法和实例方法两种。synchronized块又可以分为普通对象块,this对象块和class对象块。由于可以针对任意代码块,且可任意指定上锁的对象,故synchronized块的灵活性较高。

当一个线程获取了对应的内置锁,并执行该代码块时,其他线程便只能一直等待获取锁的线程释放锁,而获取锁的线程只会在两种情况下释放锁:

  1)获取锁的线程执行完了该代码块,然后释放对锁的占有;

  2)线程执行发生异常,此时JVM会让线程自动释放锁。

1.1 synchronized 方法:

// in push.Latch
public synchronized void enter(final long timeout) throws InterruptedException //通过在方法声明中加入 synchronized关键字来声明 synchronized 方法
{
num++;
if (num < total)
{
if (0L < timeout)
{
wait(timeout); // here InterruptedException may be thrown
}
}
else
{
notifyAll(); // must in a synchronized method
}
}


1.2 static synchronized 方法

static synchronized方法是对class的类对象加锁,synchronized方法是对class的当前object加锁。

// in dca.access.IconStore
public static synchronized IconStore getInstance()  // 性能较差的singleton模式
{
try
{
if (null == instance)
{
instance = new IconStore();
}
}
catch (final NotesException e)
{
XLog.error("Could not create icon store object", e);
}

return instance;
}


1.3 普通对象上的synchronized 块

//nts.util.Events
private static final Map<String, SignalValues> events = new HashMap<String, SignalValues>()
...
public Object clone()
{
Events s = null;

try
{
   synchronized (events) // on plain object
{
s = (Events) super.clone();
}
       }
catch (final CloneNotSupportedException e)
{
XLog.error(e);
}
return s;
}


1.4 类对象上的synchronized 块

private static TravelerSocketFactory getInstance() // in util.TravelerSocketFactory
{
if (instance == null)
{
synchronized (TravelerSocketFactory.class) // double-checking lock
          {
if (instance == null)
{
instance = new TravelerSocketFactory();
}
}
}
return instance;
}


1.5 this 对象上[b]synchronized 块[/b]

// in push.Latch
public boolean start(final Object obj)
{
final long timeStart = System.currentTimeMillis();
boolean rv = false;
Barrier b = null;
synchronized (this) // 当前对象
       {
if (!latched.containsKey(obj))
{
b = new Barrier("Latch:" + name + "_Obj:" + obj, 1);
latched.put(obj, b);
rv = true;
}
}
XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time="
+ (System.currentTimeMillis() - timeStart) + "ms"));
return rv;
}


1.6 Java内置锁的可重入性

下面这段code可以证明对象关联monitor上的锁是重入锁。如果锁具备可重入性,则称作为可重入锁。一个线程不能获取其他线程所拥有的锁,但是它可以获取它已经拥有的锁。 允许一个线程多次获取同一个锁,使得重入同步成为可能。考虑以下场景:同步代码(位于同步块或者同步方法中的代码)直接或者间接地调用某个方法,而该方法 同样包含同步代码(上述两组代码使用同样的锁)。如果没有重入同步,同步代码将不得不使用额外的预警机制来避免一个线程因为自己阻塞自己而死锁。可重入性 在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。

class Test {
public static void main(String[] args) {
Test t = new Test();
synchronized(t) {
synchronized(t) {
System.out.println("made it!");
}
}
}
}


程序的输出结果为:

made it!


View Code
在JICP中举了一个例子,是两个synchronized方法。其中一个调用了另外一个。如果不是可重入锁,则会死锁。

2 jdk中的显式Lock

java.util.concurrent包是java5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。其中Synchronizers包含了五种:Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 另外java.util.concurrent包还包含了两个子包:java.util.concurrent.Atomics和java.util.concurrent.Locks。

2.1 Lock接口的方法及使用
Lock接口一共定义了以下6个不同的方法:其中lock()、lockInterruptibly(), TryLock()、和tryLock(long time, TimeUnit unit))是用来获取锁的。unLock()方法是用来释放锁的。newCondition() 用来生成新的Condition对象。(跟Lock用来替代Synchronized类似,Condition用来替代object monitor上的wait()/notify()方法。) 下面分别来介绍这些方法的使用:

2.1.1 void lock()

Lock接口获取锁的方式有4中,首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。(跟synchronized一样,可以使用synchronized代替lock。)

// java.util.concurrent.CyclicBarrier
public boolean isBroken() {
final ReentrantLock lock = this.lock; // this.lock is a feild in CyclicBarrier, this is a optimize method which is heavily used by doug lea
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}


lock()的不正确使用方法示例:

public class Test {
private ArrayList<Integer> arrayList = new ArrayList<Integer>();
public static void main(String[] args)  {
final Test test = new Test();

new Thread(){
public void run() {
test.insert(Thread.currentThread());
};
}.start();

new Thread(){
public void run() {
test.insert(Thread.currentThread());
};
}.start();
}

public void insert(Thread thread) {
Lock lock = new ReentrantLock();    //注意这个地方
lock.lock();
try {
System.out.println(thread.getName()+"得到了锁");
for(int i=0;i<5;i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
lock.unlock();
}
}
}


2.1.2 boolean tryLock()方法

在当前时间(被调用时)查询锁是否可用。如果不可用则立即返回。

// java.util.concurrent.ForkJoinTask
static final void helpExpungeStaleExceptions() {
final ReentrantLock lock = exceptionTableLock;
if (lock.tryLock()) {
try {
expungeStaleExceptions();
} finally {
lock.unlock();
}
}
}


2.1.3 void lockInterruptibly() throws InterruptedException;

这个方法涉及到一个概念,可中断锁。顾名思义,就是可以响应中断的锁。如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。用synchronized修饰的话,当一个线程处于等待 某个锁的状态,是无法被中断的,只有一直等待下去。Lock可以让等待锁的线程响应中断,而synchronized却不行,使用 synchronized时,等待的线程会一直等待下去,不能够响应中断(在threadA中调用threadB.interrupt();)。

// java.util.concurrent.ArrayBlockingQueue
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // can be interrupted here
try {
while (count == items.length)
notFull.await(); // notFull is a Condition of lock.
enqueue(e);
} finally {
lock.unlock();
}
}


2.1.4 boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
在jdk和nts生产代码中没有该方法的使用。

2.1.5 Condition newCondition();

用来返回一个Condition接口的对象,Condition对象的使用见下一节。

2.1.6 void unlock();

采用synchronized不需要用户去手动释放 锁,当synchronized方法或者 synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出 现死锁现象。 synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock() 去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁。

1  public boolean addAll(Collection<? extends E> c) {  // java.util.concurrent.CopyOnWriteArrayList
2         Object[] cs = (c.getClass() == CopyOnWriteArrayList.class) ?
3             ((CopyOnWriteArrayList<?>)c).getArray() : c.toArray();
4         if (cs.length == 0)
5             return false;
6         final ReentrantLock lock = this.lock;
7         lock.lock();
8         try {
9             Object[] elements = getArray();
10             int len = elements.length;
11             if (len == 0 && cs.getClass() == Object[].class)
12                 setArray(cs);
13             else {
14                 Object[] newElements = Arrays.copyOf(elements, len + cs.length);
15                 System.arraycopy(cs, 0, newElements, len, cs.length);
16                 setArray(newElements);
17             }
18             return true;
19         } finally {
20             lock.unlock();
21         }
22     }


2.2 ReentrantLock的实现

ReentrantLock内部有一个Sync内部类,它是抽象类AQS(abstractQueuedSychronizer)的子类,它又有两个子类NonFairSync和FairSync。
ReentrantLock的上述方法都是通过Sync内部类来实现的。

newCondition():可见ReentrantLock的实现基本完全基于AQS,ReentrantLock返回的Condition对象,是它的AQS内部类的内部类ConditionObject的实例(在下一节会详述):

1     public Condition newCondition() {
2         return sync.newCondition();
3     }


lock(): 包裹了sync内部类的lock方法。

1    public void lock() {
2         sync.lock();
3     }


tryLock():

1     public boolean tryLock(long timeout, TimeUnit unit)
2             throws InterruptedException {
3         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
4     }


lockInterruptibly():

1     public void lockInterruptibly() throws InterruptedException {
2         sync.acquireInterruptibly(1);
3     }


unLock():

1   public void unlock() {
2         sync.release(1);
3     }


下面再来看一下ReentrantLock如何实现公平锁和非公平锁(公平锁:当持有该锁的线程释放掉公平锁的时候,等待该锁的其他线程获得该锁的机会均等,不存在某个线程多次获得该锁,而其他某个线程一直处在饥饿状态的情况。)

FairSync:

1         protected final boolean tryAcquire(int acquires) {
2             final Thread current = Thread.currentThread();
3             int c = getState();
4             if (c == 0) {
5                 if (!hasQueuedPredecessors() &&  // 如果处于等待队列的最前面
6                     compareAndSetState(0, acquires)) {
7                     setExclusiveOwnerThread(current);
8                     return true;
9                 }
10             }
11             else if (current == getExclusiveOwnerThread()) {
12                 int nextc = c + acquires;
13                 if (nextc < 0)
14                     throw new Error("Maximum lock count exceeded");
15                 setState(nextc);
16                 return true;
17             }
18             return false;
19         }
20     }


NonFairSync():

1         final boolean nonfairTryAcquire(int acquires) {
2             final Thread current = Thread.currentThread();
3             int c = getState();
4             if (c == 0) {
5                 if (compareAndSetState(0, acquires)) { // 直接开抢,谁抢着谁执行
6                     setExclusiveOwnerThread(current);
7                     return true;
8                 }
9             }
10             else if (current == getExclusiveOwnerThread()) {
11                 int nextc = c + acquires;
12                 if (nextc < 0) // overflow
13                     throw new Error("Maximum lock count exceeded");
14                 setState(nextc);
15                 return true;
16             }
17             return false;
18         }


公平锁和非公平锁可以通过ReentrantLock的构造函数来实现,默认构造函数用来构造NonFairLock, 而带参数的public ReentrantLock(boolean fair) 方法可以选择构造NonFairLock还是FairLock。

1  public ReentrantLock() {
2         sync = new NonfairSync();
3     }


带参数构造函数:

1  public ReentrantLock(boolean fair) {
2         sync = fair ? new FairSync() : new NonfairSync();
3     }


ReentrantLock 还提供了几个查询方法:getHoldCount,isHoldByCurrentThread,isLocked, isFair, getOwner, hasQueuedThreads, getQueueLength, 等等,不再赘述。

3 [b]Condition的实现、主要方法和使用[/b]

Lock与Condition的组合,可以用来替代Synchronized&wait/notify的组合。

[b]3.1 实现[/b]

如前所述,Condition的实现也离不开AQS。

返回函数:


1   public Condition newCondition() { // in java.util.concurrent.ReentrantLock
2         return sync.newCondition();
3     }


sync是什么呢? 在 ReentrantLock类中定义了一个抽象class Sync(抽象类AQS的子类),它又有两个子类FairSync和NonFairSync。上面代码中的sync就是NonFairSync的对象。

1 abstract static class Sync extends AbstractQueuedSynchronizer {
2       ...
3       final ConditionObject newCondition() {
4             return new ConditionObject();
5        }
6        ...
7 }


在java中实现了 Conditon接口的类只有两 个:AbstractQueuedSynchronizer.ConditionObject和 AbstractLongQueuedSynchronized.ConditionObject。

3.2 方法

Condition接口通过提供下列方法,提供了与Object.notify()/Object.notifyAll()类似的功能:

//可以被中断,无期限等待。
void await() throws InterruptedException;
//无期限等待,不可中断。
void awaitUninterruptibly();
//等待特定时间,超时则返回;可中断。返回值是返回时距离超时所剩时间。
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待特定时间,超时则返回;可中断。超时返回,返回值为false。
boolean await(long time, TimeUnit unit) throws InterruptedException;
//等待到特定时间点,超时则返回;可中断。超时返回,返回值为false。
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();


3.3 CyclicBarrier中的使用

下面以java.util.concurrent包下的CyclicBarrier和BlockingQueue(接口)为例,来看一下这些方法的使用,首先看一下CyclicBarrier中的:

NewCondition():

1 // in CyclicBarrier
2 /** The lock for guarding barrier entry */
3     private final ReentrantLock lock = new ReentrantLock();
4     /** Condition to wait on until tripped */
5     private final Condition trip = lock.newCondition();


signalAll():

1     private void nextGeneration() {
// signal completion of last generation
        trip.signalAll();
         // set up next generation
count = parties;
generation = new Generation();
}


await()&awaitNanos(long):

private int dowait(boolean timed, long nanos) // CyclicBarrier
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
         lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count; // count is how many thread will be barriered
             if (index == 0) {  // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
                        command.run();
                     ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
                 try {
if (!timed)
                        trip.await();
                     else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
                 } catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
     }


3.4 ArrayBlockingQueue中的使用:

[b]newCondition():[/b]

可以为一个重入锁设置多个Condition对象

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
         notEmpty = lock.newCondition();
         notFull =  lock.newCondition();
     }


signal():

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
 while (count == items.length)
notFull.await();
enqueue(e);
         } finally {
lock.unlock();
}
}


3.5 LinkedBlockingQueue中的使用

new:

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


signal():

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
         try {
while (count.get() == capacity) {
                notFull.await();
             }
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
                notFull.signal();
         } finally {
putLock.unlock();
}
         if (c == 0)
signalNotEmpty();
}


4 读写锁 ReadWriteLock 接口

ReadWriteLock接口只提供两个方法定义,其中readLock()方法返回一个ReentrantLock.ReadLock类的对象作为 readLock,另一个writeLock()方法返回一个ReentrantLock.WriteLock类的对象作为writeLock。

当没有写线程时,readLock可以被多个读线程同时持有(互斥锁:在任一时刻只允许一个线程持有,其他线程将被阻塞。 与之对应的是共享锁(S锁,share)又称读锁。)。而写锁则是互斥的: 一些锁允许对共享资源的并发访问,比如一个readWriteLock中的读锁。

如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

4.1 示例:

import java.util.concurrent.*;
import java.util.Date;
import java.util.concurrent.locks.*;
import java.text.*;

public class ReentrantReadWriteLockTest {
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();

public void read() {
readLock.lock();
try {
System.out.println(format.format(new Date()) + "---read---");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}

public void write() {
writeLock.lock();
try {
System.out.println(format.format(new Date()) + "---write---");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}

public static class MyThread extends Thread {
private ReentrantReadWriteLockTest reentrantReadWriteLockTest;
private String methodName;

public MyThread(ReentrantReadWriteLockTest reentrantReadWriteLockTest, String methodName) {
super();
this.reentrantReadWriteLockTest = reentrantReadWriteLockTest;
this.methodName = methodName;
}

@Override
public void run() {
if ("read".equalsIgnoreCase(methodName))
reentrantReadWriteLockTest.read();
else
reentrantReadWriteLockTest.write();
}
}

public static void main(String[] args) {
ReentrantReadWriteLockTest test = new ReentrantReadWriteLockTest();
Thread t1 = new MyThread(test, "write"); // replace write with read and check what will happen?
Thread t2 = new MyThread(test, "read");
t1.start();
t2.start();
}
}


4.2 使用

读写锁的典型应用场景为对容器的读写。但是,java的容器框架提供了各种各样的免锁或并发容器,比如最简单的并发容器Collections.synchronizedMap()等等。

在免锁容器中,CopyOnWriteArrayList对读操作不会block,对写操作会block;ConcurrentHashMap 类将 Map 的存储空间分为若干块,每块拥有自己的锁,大大减少了多个线程争夺同一个锁的情况:
对读操作不会block 即使是对写操作,我们也不需要对整个Map对象加锁,从而避免在整个对象上block。因此读写锁的应用并不是特别广泛。

下面来看一下jdk中使用ReentantreadWriteLock的例子javax.swing.plaf.nimbus.ImageCache :

// Lock for concurrent access to map
private ReadWriteLock lock = new ReentrantReadWriteLock();


flush:

为什么flush方法加读锁?

public void flush() {
        lock.readLock().lock();
         try {
map.clear();
} finally {
lock.readLock().unlock();
}
}


getImage:

public Image getImage(GraphicsConfiguration config, int w, int h, Object... args) {
        lock.readLock().lock();
         try {
PixelCountSoftReference ref = map.get(hash(config, w, h, args));
// check reference has not been lost and the key truly matches, in case of false positive hash match
if (ref != null && ref.equals(config,w, h, args)) {
return ref.get();
} else {
return null;
}
} finally {
lock.readLock().unlock();
}
}


setImage:

public boolean setImage(Image image, GraphicsConfiguration config, int w, int h, Object... args) {
if (!isImageCachable(w, h)) return false;
int hash = hash(config, w, h, args);
        lock.writeLock().lock();
         try {
//...
} finally {
lock.writeLock().unlock();
}
}


问题: 与ConcurrentHashMap相比,读写锁的优势在哪里?

4.3 ReentrantReadWireteLock的实现:
内部同样是定义了NonFair和Fair的synchronizer,同样是继承自AQS。

5 进一步问题:

自旋锁:
基于他这种原理,等待的时候,并不释放cpu时间片,相比synchronized wait()操作,减小了释放,重新获取的消耗。 该自旋锁适用于,当前线程竞争不强烈的时候使用。

进一步解释synchronized和显式Lock的区别

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。 ??

6 参考文献: http://www.cnblogs.com/dolphin0520/p/3923167.html http://blog.csdn.net/HEYUTAO007/article/details/6422412
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: