AQS的应用-FutureTask | CountDownLatch | ReentrantLock
2013-04-09 22:36
357 查看
AbstractQueuedSynchronizer是Java并发包的基础,依托AQS所实现的CHL队列,JDK实现了很多实用的工具
1. FutureTask
2. CountDownLatch
3. ReentrantLock
4. ReentrantReadWriteLock
5. Semaphore
上面这几个类都是对AQS的具体应用,都是利用了AQS中的共享状态变量state的不同值来表示所需的业务含义,并利用AQS提供的阻塞队列完成多线程协同。
FutureTask为AQS中的共享状态变量state定义了三种业务状态:
ReentrantLock可重入锁可以对锁进行多次的获取与释放,而获取的次数也是依托于AQS的状态变量state而实现。
1. FutureTask
2. CountDownLatch
3. ReentrantLock
4. ReentrantReadWriteLock
5. Semaphore
上面这几个类都是对AQS的具体应用,都是利用了AQS中的共享状态变量state的不同值来表示所需的业务含义,并利用AQS提供的阻塞队列完成多线程协同。
FutureTask为AQS中的共享状态变量state定义了三种业务状态:
private final class Sync extends AbstractQueuedSynchronizer { private static final int RUNNING = 1; // 表示任务正在执行中,尚未完成 private static final int RAN = 2; // 表示任务已经完成,可以获取结果 private static final int CANCELLED = 4; // 表示任务已经被取消 // 具体任务 private final Callable<V> callable; // 任务执行结果 private V result; // 执行任务的线程 private volatile Thread runner; // 执行任务,该方法被FutureTask的run()方法调用 void innerRun() { // 任务开始执行前,将任务状态变更为"正在执行" if (!compareAndSetState(0, RUNNING)) return; try { // 设置当前执行任务的线程 runner = Thread.currentThread(); // 如果状态变更成功则执行任务,否则释放任务 if (getState() == RUNNING) innerSet(callable.call()); else releaseShared(0); } catch (Throwable ex) { innerSetException(ex); } } // 判断任务是否执行完成,state为RAN或者CANCELLED boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } private boolean ranOrCancelled(int state) { return (state & (RAN | CANCELLED)) != 0; } // 获取结果,如果任务没有结束,则线程进入阻塞队列 V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享资源,AQS的子类负责实现 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // acquireSharedInterruptibly最终会调用FutureTask中的Sync所实现的tryAcquireShared, // 在此方法中传入的参数被忽略,仅仅靠判断任务是否完成来决定是否可以获取"锁"(这里其实没有锁的概念),如果状态没有改变, // 则会进入方法doAcquireSharedInterruptibly,将调用试图获取任务结果的线程放入AQS的阻塞队列 protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } // 而另一个get方法则可以让客户端指定阻塞的时间,指定时间到达后线程将被唤醒,而此时任务可能还没有结束,返回结果自然为空 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } // 不想等待任务执行了,直接中断阻塞线程 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); done(); return true; } }CountDownLatch顾名思义是一个门闩,其中两个核心方法是await()和countDown()可以用于多线程间的工作协调,例如JDK说明文档中的例子
class Driver2 { void main() throws InterruptedException { // 定义N个门闩,每个工作线程分配一个,当线程所属的工作完成后, // 打开门闩,所有的工作线程都完成工作后,所有的门闩被打开 CountDownLatch doneSignal = new CountDownLatch(N); // JDK自带并发框架 Executor e = ... // 创建N个工作线程 for (int i = 0; i < N; ++i) e.execute(new WorkerRunnable(doneSignal, i)); // 等待所有的工作线程执行完成后线程唤醒继续执行 doneSignal.await(); } } // 工作线程 class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; public void run() { try { // 执行具体工作 doWork(i); // 打开门闩 doneSignal.countDown(); } catch (InterruptedException ex) {} } void doWork() { ... } }CountDownLatch实现门闩的机制就是利用的AQS的状态变量state实现。
ReentrantLock可重入锁可以对锁进行多次的获取与释放,而获取的次数也是依托于AQS的状态变量state而实现。
class Foo { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); try { try { lock.lock() } finally { lock.unlock(); } } finally { lock.unlock(); } } }Semaphore用于对重要资源的守护,例如线程池。
class Pool { // 线程池中的线程资源数 private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { // 有效资源数减一,如果没有可用资源,则线程进去AQS的阻塞队列 available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) // 有效资源数加一,如果AQS阻塞队列中有线程等待资源则阻塞,则唤醒线程 available.release(); } }资源的数量也是对应于AQS中的state变量。
相关文章推荐
- CountDownLatch+AQS
- Java多线程/并发24、Countdownlatch应用以及与CyclicBarrier的区别
- Java CountDownLatch应用
- 闭锁:CountDownLatch和CyclicBarrie、FutureTask
- Java并发(6)- CountDownLatch、Semaphore与AQS
- Java多线程知识点整理(CyclicBarrier、CountDownLatch、Callable、Future和FutureTask)
- Java多线程:Callable、ExecutorService、CountDownLatch、Future和FutureTask
- Java CountDownLatch应用
- 根据AQS推测CountDownLatch及源码分析
- AbstractQueuedSynchronizer在工具类ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier中的应用
- AbstractQueuedSynchronizer在工具类Semaphore、CountDownLatch、ReentrantLock中的应用和CyclicBarrier
- Java多线程之---用 CountDownLatch 说明 AQS 的实现原理
- concurrent-7-AQS-CountDownLatch,CyclicBarrier
- Synchronizer 闭锁(CountDownLatch,FutureTask ) 信号量(Semaphore) 关卡(CyclicBarrier) 知识点总结(java并发编程实践读书笔记三)
- Java多线程与并发库高级应用之倒计时计数器CountDownLatch
- Java并发31:CountDownLatch(下)--两种应用场景
- [置顶] java 并发编程实战书籍学习 第五章,CountDownLatch,FutureTask,CyclicBarrier,Semaphore学习
- Java CountDownLatch应用
- java多线程之CountdownLatch简单应用
- 同步器CountDownLatch的应用例子