Java并发之Semaphore的源码分析
2016-10-06 23:06
489 查看
Semaphore是一个计数信号量,采用的是共享锁的方式来控制。
主要对下面2个方法进行分析:
acquire(int)来获取信号量,直到只有一个可以用或者出现中断。
release(int)用来释放信号量,将信号量数量返回给Semaphore
AQS:acquireSharedInterruptibly
tryAcquireShared(arg)
nonfairTryAcquireShared(acquires)
变量state的源码,采用了volatile的可见修饰
doAcquireSharedInterruptibly(arg)
释放信号量release(int)的源码
releaseShared(int arg)
tryReleaseShared(arg)
doReleaseShared()
一个简单实例:
package thread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class SDTask extends Thread
{
private Semaphore s;
public SDTask(Semaphore s,String name)
{
super(name);
this.s = s;
}
public void run()
{
try
{
System.out.println(Thread.currentThread().getName()+" 尝试获取3个信号!!!");
s.acquire(3);
System.out.println(Thread.currentThread().getName()+" 获取了3个信号!!!");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e)
{
e.printStackTrace();
}finally
{
System.out.println(Thread.currentThread().getName()+" 释放了3个信号!!!");
s.release(3);
}
}
}
public class SemaphoreDemo2
{
public static void main(String[] args)
{
Semaphore s = new Semaphore(7);
for(int i=0; i<3; i++)
{
new SDTask(s,"thread"+i).start();;
}
}
}
结果:
thread0 尝试获取3个信号!!!
thread2 尝试获取3个信号!!!
thread0 获取了3个信号!!!
thread2 获取了3个信号!!!
thread1 尝试获取3个信号!!!
thread0 释放了3个信号!!!
thread2 释放了3个信号!!!
thread1 获取了3个信号!!!
thread1 释放了3个信号!!!
总结:
Semaphore并没有采用Lock进行锁操作,使用volatile state和for循环,通过修改条件来改变线程的操作。
主要对下面2个方法进行分析:
acquire(int)来获取信号量,直到只有一个可以用或者出现中断。
release(int)用来释放信号量,将信号量数量返回给Semaphore
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
AQS:acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //小于0表示没有获得共享锁 doAcquireSharedInterruptibly(arg); }
tryAcquireShared(arg)
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
nonfairTryAcquireShared(acquires)
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//获取去中的信号量数 int remaining = available - acquires;//剩余信号量数 //1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数 //2.信号量数小于等于0,直接返回负数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
变量state的源码,采用了volatile的可见修饰
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value */ protected final int getState() { return state; }
doAcquireSharedInterruptibly(arg)
//没有获得共享锁 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //前面的节点等于头结点 if (p == head) { //尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { //将当前节点设置为head setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 当前线程一直等待,直到获取到共享锁。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
释放信号量release(int)的源码
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
releaseShared(int arg)
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(arg)
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState();//获取当前的信号量数 int next = current + releases;//将释放的信号量还给Semaphore if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//设置信号量 return true; } }
doReleaseShared()
private void doReleaseShared() { for (;;) { // 获取CLH队列的头节点 Node h = head; // 如果头节点不为null,并且头节点不等于tail节点。 if (h != null && h != tail) { // 获取头节点对应的线程的状态 int ws = h.waitStatus; // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。 if (ws == Node.SIGNAL) { // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒“头节点的下一个节点所对应的线程”。 unparkSuccessor(h); } // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果头节点发生变化,则继续循环。否则,退出循环。 if (h == head) // loop if head changed break; } }
一个简单实例:
package thread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class SDTask extends Thread
{
private Semaphore s;
public SDTask(Semaphore s,String name)
{
super(name);
this.s = s;
}
public void run()
{
try
{
System.out.println(Thread.currentThread().getName()+" 尝试获取3个信号!!!");
s.acquire(3);
System.out.println(Thread.currentThread().getName()+" 获取了3个信号!!!");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e)
{
e.printStackTrace();
}finally
{
System.out.println(Thread.currentThread().getName()+" 释放了3个信号!!!");
s.release(3);
}
}
}
public class SemaphoreDemo2
{
public static void main(String[] args)
{
Semaphore s = new Semaphore(7);
for(int i=0; i<3; i++)
{
new SDTask(s,"thread"+i).start();;
}
}
}
结果:
thread0 尝试获取3个信号!!!
thread2 尝试获取3个信号!!!
thread0 获取了3个信号!!!
thread2 获取了3个信号!!!
thread1 尝试获取3个信号!!!
thread0 释放了3个信号!!!
thread2 释放了3个信号!!!
thread1 获取了3个信号!!!
thread1 释放了3个信号!!!
总结:
Semaphore并没有采用Lock进行锁操作,使用volatile state和for循环,通过修改条件来改变线程的操作。
相关文章推荐
- Java 并发 --- Semaphore源码分析
- Java并发系列之Semaphore源码分析
- Java并发Concurrent包的锁(七)——Semaphore源码分析及使用
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- java 并发 ConcurrentHashMap 与 HashTable源码分析总结
- Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch
- java并发编程之源码分析ThreadPoolExecutor线程池实现原理
- Java并发----ConcurrentHashMap02--源码分析
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- Java并发编程:ThreadPoolExecutor类及方法源码分析
- Java高并发程序设计笔记7之并发容器及典型源码分析
- 聊聊高并发(十二)分析java.util.concurrent.atomic.AtomicStampedReference源码来看如何解决CAS的ABA问题
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- Java concurrent Framework并发容器之ConcurrentHashMap(JDK1.5)源码分析
- Java并发-AtomicInteger源码分析
- java 并发编程实战 第五天 ThreadPoolExecutor 源码分析
- Java并发包源码学习之AQS框架(四)AbstractQueuedSynchronizer源码分析
- java并发锁ReentrantLock源码分析一 可重入支持中断锁的实现原理