【Java并发】(二) 线程同步之Thread.join()、CountDownLatch、CyclicBarrier
2017-05-09 20:44
567 查看
注:转载请注明来自Nemo, http://blog.csdn.net/nemo__
-
-
-
它们最终均是调用
下面可以看看
首先,它是Thread的一个final、synchronized、非static方法,因此外部不可重写此方法;调用t.join时,必须获取线程t的同步锁,如果外部以t为对象锁,可能导致这里等待大于millis时间;
join()的实现是wait()上以isAlive()作为循环判断条件,当线程结束时,notifyAll()方法会被调用,主线程阻塞会停止。
这里使用wait()方法来阻塞主线程执行,所以如果有调用线程
因此Java建议应用不要在Thread实例上调用
代码实例:
上例中,未加入
CountDownLatch是一个同步工具,它允许一个或多个线程等待一系列其它线程事务完成。一个CountDownLatch实例在初始化时会被赋予count,调用await()方法会造成阻塞,直到通过
CountDownLatch是一个多面性的同步工具,可以被用在很多场合下。可以初始化count为1作为门闩,多个线程通过调用await()等待,一个线程中调用countDown()来激活这些等待线程;或者是CountDownLatch被初始化为N,一个线程等待,其它多个线程完成时分别调用countDown()或者一个事件被完成N次分别调用countDown()。
在分别调用countDown()的多个线程中不需等待CountDownLatch的count被减为0,对它们所在线程执行没有任何影响。
它主要有三个方法:
-
-
-
前两个是阻塞线程,countDown()每次调用,count数减1,直到为0激活线程。
内存一致性效果:在count到达0之前,那些在子线程调用countDown()前的事务happen-before等待await()执行完成后的事务。
代码实例一:
上例中显示了CountDownLatch的两种用法:
- startSignal阻止所有Worker执行,直到doSomethingElse1()完成,会通知所有Worker;
- doneSignal会等待所有Worker完成才会退出,表明Driver完成。
代码实例二:
另一个典型的用法是把某个问题分成N部分完成,每一部分在一个Runnable执行,并且这些Runnable按顺序在Executor中执行,当所有子任务完成时,会通过countDown()反馈,直到count为0主线程能够通过。通常的做法是在一个线程中调用调用countDown(),在另一个线程对应的await()方法中返回一个成功的标志。
例一:
CyclicBarrier主要有两个方法:
-
-
前者最终是调用
内存一致性效果:那些在子线程调用await()前的事务happen-before屏障事务(barrier action),屏障事务又happen-before等待await()执行完成后再执行的事务。
例二:
上例中,所有Worker线程完成matrix中的一行的处理,然后等待所有行处理完成。当所有行完成后,会执行屏障事务(barrier-action)去融合所有行。等到融合动作完成时,屏障事务线程返回,done()返回true所有Worker线程结束。
如果屏障事务不依赖于所有子事务完成,那么可通过await()返回的值来决定哪个线程可以执行屏障事务:
CyclicBarrier在同步失败时使用
Thread.join()需要拿到线程t的引用,特别在2个以上的线程时,代码就会显得不那么整洁了,而CountDownLatch/CyclicBarrier只需要一个实例即可。
尽量使用JUC并发包中的工具,而不是wait()、notify()这些容易出错的方法。应尽量使用CountDownLatch/CyclicBarrier这样的并发工具,而不是Thread.join()方法。
CyclicBarrier的功能也可以由CountDownLatch来实现。CountDownLatch更多的是在主线程中等待子任务完成,而CyclicBarrier是要各个子任务在某一时刻达到同步状态。
CountDownLatch和CyclicBarrier最主要的区别是CyclicBarrier能够重用,而CountDownLatch不行。可以通过CyclicBarrier的reset()方法来重置它的初始状态。
CyclicBarrier构造函数所接受的Runnable参数也是CountDownLatch所不具备的。
参考
http://uule.iteye.com/blog/1101994
http://developer.51cto.com/art/201403/432095.htm
http://stackoverflow.com/questions/21808814/whats-the-difference-between-cyclicbarrier-countdownlatch-and-join-in-java
一、概述
多线程编程在处理繁锁、耗时的任务时发挥了非常重要作用,线程的调度是由系统完成的,它们之间执行是没有必然的先后顺序的。然而我们有时又需要线程处理任务时有先后,有些任务完成后才能执行别的任务,这时候线程同步就要起作用了。二、Thread.join()
Thread.join()就是实现线程同步的一种,
在主线程中调用线程t的join()方法,主线程会等待线程t执行完成后,再继续执行主线程代码。它一共有三种形式:
-
public final synchronized void join(long millis)
-
public final void join()
-
public final synchronized void join(long millis, int nanos)
它们最终均是调用
join(long millis)方法,当参数millis为0时,会一直等待直到线程t执行完成;millis大于0时,主线程最多等待millis毫秒,然后主线程和线程t并行执行。
下面可以看看
join(long millis)源码,就比较清楚它是如何实现线程同步的:
/** * Waits at most {@code millis} milliseconds for this thread to * die. A timeout of {@code 0} means to wait forever. * * <p> This implementation uses a loop of {@code this.wait} calls * conditioned on {@code this.isAlive}. As a thread terminates the * {@code this.notifyAll} method is invoked. It is recommended that * applications not use {@code wait}, {@code notify}, or * {@code notifyAll} on {@code Thread} instances. */ public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
首先,它是Thread的一个final、synchronized、非static方法,因此外部不可重写此方法;调用t.join时,必须获取线程t的同步锁,如果外部以t为对象锁,可能导致这里等待大于millis时间;
join()的实现是wait()上以isAlive()作为循环判断条件,当线程结束时,notifyAll()方法会被调用,主线程阻塞会停止。
这里使用wait()方法来阻塞主线程执行,所以如果有调用线程
t.notify()或
t.notifyAll()方法,会导致这里同步失败。
因此Java建议应用不要在Thread实例上调用
wait()、
notify()、
notifyAll()。同时也不应该以Thread实例作为synchronized关键字的对象锁。
代码实例:
public class JoinRunnable implements Runnable { public static int n = 0; public void run() { for (int i = 0; i < 1000; i++) { n++; } } public static void main(String[] args) throws Exception { Runnable r = new JoinRunnable(); Thread t = new Thread(r); t.start(); //t.join(); System.out.println(n); } }
上例中,未加入
t.join();时,最后打印的结果基本不会为1000;加入
t.join();后,最后打印的结果一直为1000。可以看作是主线程中插入线程t的代码来执行一样。
三、CountDownLatch
java.util.concurrent.CountDownLatch是JUC一个常用的线程同步类,作用和 Thread.join()方法类似,可用于一组线程和另外一组线程的协作。
CountDownLatch是一个同步工具,它允许一个或多个线程等待一系列其它线程事务完成。一个CountDownLatch实例在初始化时会被赋予count,调用await()方法会造成阻塞,直到通过
countDown()使得计数变成0,这时所有通过await()阻塞的线程都会被激活。这个CountDownLatch实例是一次性的,不能通过reset重置。
CountDownLatch是一个多面性的同步工具,可以被用在很多场合下。可以初始化count为1作为门闩,多个线程通过调用await()等待,一个线程中调用countDown()来激活这些等待线程;或者是CountDownLatch被初始化为N,一个线程等待,其它多个线程完成时分别调用countDown()或者一个事件被完成N次分别调用countDown()。
在分别调用countDown()的多个线程中不需等待CountDownLatch的count被减为0,对它们所在线程执行没有任何影响。
它主要有三个方法:
-
public void await()
-
public boolean await(long timeout, TimeUnit unit)
-
public void countDown()
前两个是阻塞线程,countDown()每次调用,count数减1,直到为0激活线程。
内存一致性效果:在count到达0之前,那些在子线程调用countDown()前的事务happen-before等待await()执行完成后的事务。
代码实例一:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); } doSomethingElse1(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse2(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
上例中显示了CountDownLatch的两种用法:
- startSignal阻止所有Worker执行,直到doSomethingElse1()完成,会通知所有Worker;
- doneSignal会等待所有Worker完成才会退出,表明Driver完成。
代码实例二:
class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = ... for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
另一个典型的用法是把某个问题分成N部分完成,每一部分在一个Runnable执行,并且这些Runnable按顺序在Executor中执行,当所有子任务完成时,会通过countDown()反馈,直到count为0主线程能够通过。通常的做法是在一个线程中调用调用countDown(),在另一个线程对应的await()方法中返回一个成功的标志。
四、CyclicBarrier
CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier 很有用。因为该barrier在释放等待线程后可以重用,所以称它为*循环的*barrier。
CyclicBarrier构造函数有一个可选的Runnable参数屏障事务(barrier action),它在所有线程达到公共屏障点后执行,且在所有线程释放前。这个屏障事务可用于更新共享的状态,在任何其它事务继续执行前。
例一:
public class TestCyclicBarrier { private static final int THREAD_NUM = 5; public static class WorkerThread implements Runnable{ CyclicBarrier barrier; public WorkerThread(CyclicBarrier b){ this.barrier = b; } @Override public void run() { // TODO Auto-generated method stub try{ System.out.println("Worker's waiting"); //线程在这里等待,直到所有线程都到达barrier。 barrier.await(); System.out.println("ID:"+Thread.currentThread().getId()+" Working"); }catch(Exception e){ e.printStackTrace(); } } } /** * @param args */ public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() { //当所有线程到达barrier时执行 @Override public void run() { System.out.println("Inside Barrier"); } }); for(int i=0;i<THREAD_NUM;i++){ new Thread(new WorkerThread(cb)).start(); } } } /* 以下是输出: Worker's waiting Worker's waiting Worker's waiting Worker's waiting Worker's waiting Inside Barrier ID:12 Working ID:8 Working ID:11 Working ID:9 Working ID:10 Working */
CyclicBarrier主要有两个方法:
-
public int await()
-
public int await(long timeout, TimeUnit unit)
前者最终是调用
await(long timeout, TimeUnit unit)方法,参数timeout设置超时时长。await()有一个int返回值,标志经过此次后还剩余的屏障数。
内存一致性效果:那些在子线程调用await()前的事务happen-before屏障事务(barrier action),屏障事务又happen-before等待await()执行完成后再执行的事务。
例二:
class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); } }; barrier = new CyclicBarrier(N, barrierAction); List<Thread> threads = new ArrayList<Thread>(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) { thread.join(); } } }
上例中,所有Worker线程完成matrix中的一行的处理,然后等待所有行处理完成。当所有行完成后,会执行屏障事务(barrier-action)去融合所有行。等到融合动作完成时,屏障事务线程返回,done()返回true所有Worker线程结束。
如果屏障事务不依赖于所有子事务完成,那么可通过await()返回的值来决定哪个线程可以执行屏障事务:
if (barrier.await() == 0) { // log the completion of this iteration }
CyclicBarrier在同步失败时使用
all-or-none的中断模型,如果一个线程因为中断、失败、或超时,则所有等待此barrier的线程都会中断并抛出BrokenBarrierException(或者它们也抛出InterruptedException)。
五、比较不同
Thread.join()需要等待线程t结束,而CountDownLatch/CyclicBarrier不需要,它们更便利。Thread.join()需要拿到线程t的引用,特别在2个以上的线程时,代码就会显得不那么整洁了,而CountDownLatch/CyclicBarrier只需要一个实例即可。
尽量使用JUC并发包中的工具,而不是wait()、notify()这些容易出错的方法。应尽量使用CountDownLatch/CyclicBarrier这样的并发工具,而不是Thread.join()方法。
CyclicBarrier的功能也可以由CountDownLatch来实现。CountDownLatch更多的是在主线程中等待子任务完成,而CyclicBarrier是要各个子任务在某一时刻达到同步状态。
CountDownLatch和CyclicBarrier最主要的区别是CyclicBarrier能够重用,而CountDownLatch不行。可以通过CyclicBarrier的reset()方法来重置它的初始状态。
CyclicBarrier构造函数所接受的Runnable参数也是CountDownLatch所不具备的。
参考
http://uule.iteye.com/blog/1101994
http://developer.51cto.com/art/201403/432095.htm
http://stackoverflow.com/questions/21808814/whats-the-difference-between-cyclicbarrier-countdownlatch-and-join-in-java
相关文章推荐
- Java并发之 CAS + Thread.join / CountDownLatch 方式实现线程安全
- 转发---[沧海拾遗]java并发之CountDownLatch、Semaphore和CyclicBarrier
- Java并发包:CountDownLatch和CyclicBarrier
- Java中CountDownLatch、CyclicBarrier、Thread.join方法基本应用
- 深入学习java并发编程:CountDownLatch、CyclicBarrier
- JAVA并发编程--Semaphore、CountDownLatch、ReentrantLock、CyclicBarrier
- java并发编程之CountDownLatch与CyclicBarrier
- java高并发之CountDownLatch,CyclicBarrier和join
- [沧海拾遗]java并发之CountDownLatch、Semaphore和CyclicBarrier
- Java并发学习之十七——线程同步工具之CountDownLatch
- Java并发学习笔记(13)线程之间的协作(Object.wait(),notifu(),notifyAll() Thread.join())
- Java 并发专题 :闭锁 CountDownLatch 之一家人一起吃个饭
- Java并发包concurrent之CountDownLatch
- Java线程知识__其他几种线程同步的工具类的使用(CyclicBarrier,CountDownLatch,Exchanger)
- java中CountDownLatch线程同步的入门使用
- Java多线程与并发库高级应用之倒计时计数器CountDownLatch
- Java线程(十一):CountDownLatch-线程并发的发令枪
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger
- Java 并发专题 :闭锁 CountDownLatch 之一家人一起吃个饭
- Java 并发编程实战学习笔记——CountDownLatch的使用