您的位置:首页 > 编程语言 > Java开发

Java并发之Semaphore的源码分析

2016-10-06 23:06 489 查看
Semaphore是一个计数信号量,采用的是共享锁的方式来控制。

主要对下面2个方法进行分析:

acquire(int)来获取信号量,直到只有一个可以用或者出现中断。

release(int)用来释放信号量,将信号量数量返回给Semaphore

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}


AQS:acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //小于0表示没有获得共享锁
doAcquireSharedInterruptibly(arg);
}


tryAcquireShared(arg)

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}


nonfairTryAcquireShared(acquires)

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//获取去中的信号量数
int remaining = available - acquires;//剩余信号量数
//1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数
//2.信号量数小于等于0,直接返回负数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}


变量state的源码,采用了volatile的可见修饰

/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> read.
* @return current state value
*/
protected final int getState() {
return state;
}


doAcquireSharedInterruptibly(arg)

//没有获得共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//前面的节点等于头结点
if (p == head) {
//尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//将当前节点设置为head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


释放信号量release(int)的源码

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}


releaseShared(int arg)

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


tryReleaseShared(arg)

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//获取当前的信号量数
int next = current + releases;//将释放的信号量还给Semaphore
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//设置信号量
return true;
}
}


doReleaseShared()

private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;                // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head)                   // loop if head changed
break;
}
}


一个简单实例:

package thread;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class SDTask extends Thread
{

private Semaphore s;

public SDTask(Semaphore s,String name)
{
super(name);
this.s = s;
}
public void run()
{
try
{
System.out.println(Thread.currentThread().getName()+" 尝试获取3个信号!!!");
s.acquire(3);
System.out.println(Thread.currentThread().getName()+" 获取了3个信号!!!");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e)
{
e.printStackTrace();
}finally
{
System.out.println(Thread.currentThread().getName()+" 释放了3个信号!!!");
s.release(3);
}
}
}
public class SemaphoreDemo2
{
public static void main(String[] args)
{
Semaphore s = new Semaphore(7);
for(int i=0; i<3; i++)
{
new SDTask(s,"thread"+i).start();;
}
}
}

结果:
thread0 尝试获取3个信号!!!

thread2 尝试获取3个信号!!!

thread0 获取了3个信号!!!

thread2 获取了3个信号!!!

thread1 尝试获取3个信号!!!

thread0 释放了3个信号!!!

thread2 释放了3个信号!!!

thread1 获取了3个信号!!!

thread1 释放了3个信号!!!

总结:

Semaphore并没有采用Lock进行锁操作,使用volatile state和for循环,通过修改条件来改变线程的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: