线程同步工具-CyclicBarrier
2017-06-06 14:40
483 查看
CyclicBarrier可以允许一个或者多个线程在一个确定的点同步数据,他的使用上和CountDownLatch类似,但是有趣的是你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行,这在使用分区处理编程的时候能很好的发挥其并发性能
比如我们有一个数据量较大的二维整数数组,要找到某个整数在其中出现的次数。
例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。
如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。
当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。
比如我们有一个数据量较大的二维整数数组,要找到某个整数在其中出现的次数。
生成一个二维矩阵 public class MatrixMock { private int data[][]; public MatrixMock(int rows , int widths) { Random random = new Random(47); data = new int[rows][widths]; for (int i = 0; i < rows; i++) { for (int j = 0; j < widths; j++) { data[i][j] = random.nextInt(10); } } } public int[] getRows(int row){ return data[row]; } }
//此类用来装每个线程的查询结果 public class Results { private List<Integer> resultList; public Results() { this.resultList = new ArrayList<>(); } public List<Integer> getResultList() { return resultList; } public void setResult(int data){ this.resultList.add(data); } }
//查询工具类 public class Searcher implements Runnable { private MatrixMock mock; private int startRows; private int endRows; private int seacherNumber; private Results results; private final CyclicBarrier cyclicBarrier; public Searcher(MatrixMock mock, int startRows, int endRows, int seacherNumber, Results results,CyclicBarrier cyclicBarrier) { this.mock = mock; this.startRows = startRows; this.endRows = endRows; this.seacherNumber = seacherNumber; this.results = results; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { int count = 0; System.out.printf("%s : Processing lines from %d to %d .\n",Thread.currentThread().getName(),startRows,endRows); for(int i = startRows ; i < endRows ; i++){ int row[] = mock.getRows(i); for (int j = 0; j < row.length; j++) { if(row[j]==seacherNumber){ count++; } } } results.setResult(count); System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName()); try { //计算完成以后等待 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
//聚合类 将查询结果聚合 此类在CyclicBarrier唤醒线程后执行 public class Grouper implements Runnable { private Results results; private int count; public Grouper(Results results) { this.results = results; } @Override public void run() { for (int i = 0; i < results.getResultList().size(); i++) { count += results.getResultList().get(i); } } public int getCount(){ return count; } }
//测试类 public class Main { public static void main(String[] args) { final int ROWS = 10000; final int NUMBERS = 1000; final int SEARCH = 5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANT = 2000; MatrixMock matrixMock = new MatrixMock(ROWS,NUMBERS); Results results = new Results(); Grouper grouper = new Grouper(results); CyclicBarrier cyclicBarrier = new CyclicBarrier(PARTICIPANTS,grouper); Thread[] threads = new Thread[PARTICIPANTS]; for (int i = 0; i < PARTICIPANTS; i++) { threads[i] = new Thread(new Searcher(matrixMock, i * LINES_PARTICIPANT, (i + 1) * LINES_PARTICIPANT, SEARCH, results, cyclicBarrier)); threads[i].start(); } for (int i = 0; i < PARTICIPANTS; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("final result : "+grouper.getCount()); } }
例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。
如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。
当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。
相关文章推荐
- Java八个并发学习——线程同步工具CyclicBarrier
- CyclicBarrier线程同步工具
- 线程并发工具--CyclicBarrier
- 【java并发】线程同步工具CyclicBarrier的使用
- 线程同步工具——CountDownLatch
- 线程同步工具类——CyclicBarrier
- 线程同步工具-Semaphore
- 线程同步工具之CyclicBarrier
- Java并发学习之十九——线程同步工具之Phaser
- 线程同步工具——Exchanger
- 多线程条件通行工具——CyclicBarrier
- 线程同步工具-CountDownLatch
- 线程同步工具之CountDownLatch
- 线程同步工具之CountDownLatch
- 【java并发】线程同步工具Semaphore的使用
- 线程同步工具之CountDownLatch
- Java并发学习之十七——线程同步工具之CountDownLatch
- Java并发-多线程的线程同步工具类(12)
- 多线程编程入门(15):线程同步工具之Semaphore(信号量)