您的位置:首页 > 其它

线程同步工具-CyclicBarrier

2017-06-06 14:40 483 查看
CyclicBarrier可以允许一个或者多个线程在一个确定的点同步数据,他的使用上和CountDownLatch类似,但是有趣的是你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行,这在使用分区处理编程的时候能很好的发挥其并发性能

比如我们有一个数据量较大的二维整数数组,要找到某个整数在其中出现的次数。

生成一个二维矩阵

public class MatrixMock {
private int data[][];

public MatrixMock(int rows , int widths) {
Random random = new Random(47);
data = new int[rows][widths];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < widths; j++) {
data[i][j] = random.nextInt(10);
}
}
}
public int[] getRows(int row){
return data[row];
}
}


//此类用来装每个线程的查询结果
public class Results {
private List<Integer> resultList;
public Results() {
this.resultList = new ArrayList<>();
}
public List<Integer> getResultList() {
return resultList;
}
public void setResult(int data){
this.resultList.add(data);
}
}


//查询工具类
public class Searcher implements Runnable {
private MatrixMock mock;
private int startRows;
private int endRows;
private int seacherNumber;
private Results results;
private final CyclicBarrier cyclicBarrier;
public Searcher(MatrixMock mock, int startRows, int endRows, int seacherNumber, Results results,CyclicBarrier cyclicBarrier) {
this.mock = mock;
this.startRows = startRows;
this.endRows = endRows;
this.seacherNumber = seacherNumber;
this.results = results;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
int count  = 0;
System.out.printf("%s : Processing lines from %d to %d .\n",Thread.currentThread().getName(),startRows,endRows);
for(int i = startRows ; i < endRows ; i++){
int row[] = mock.getRows(i);
for (int j = 0; j < row.length; j++) {
if(row[j]==seacherNumber){
count++;
}
}
}
results.setResult(count);
System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName());
try {
//计算完成以后等待
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}


//聚合类 将查询结果聚合 此类在CyclicBarrier唤醒线程后执行
public class Grouper implements Runnable {
private Results results;
private int count;

public Grouper(Results results) {
this.results = results;
}

@Override
public void run() {
for (int i = 0; i < results.getResultList().size(); i++) {
count += results.getResultList().get(i);
}
}
public int getCount(){
return count;
}
}


//测试类
public class Main {
public static void main(String[] args) {
final int ROWS = 10000;
final int NUMBERS = 1000;
final int SEARCH = 5;
final int PARTICIPANTS = 5;
final int LINES_PARTICIPANT = 2000;
MatrixMock matrixMock = new MatrixMock(ROWS,NUMBERS);
Results results = new Results();
Grouper grouper = new Grouper(results);
CyclicBarrier cyclicBarrier = new CyclicBarrier(PARTICIPANTS,grouper);
Thread[] threads = new Thread[PARTICIPANTS];
for (int i = 0; i < PARTICIPANTS; i++) {
threads[i] = new Thread(new Searcher(matrixMock, i * LINES_PARTICIPANT, (i + 1) * LINES_PARTICIPANT, SEARCH, results, cyclicBarrier));
threads[i].start();

}
for (int i = 0; i < PARTICIPANTS; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("final result : "+grouper.getCount());
}
}


例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。

如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。

当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: