您的位置:首页 > 其它

第三章 Thread Synchronization Utilities(线程同步工具类)【上】

2017-08-14 17:12 357 查看
涉及内容:

控制并发获取资源
控制并发获取多份copy资源
等待多个并发事件
在公共点同步任务
运行并发多阶段任务
在阶段任务中控制阶段变化
在并发任务之间改变数据
这章节使用更加高级的同步工具类。

Semaphores: 计数器,表示多少资源可以使用

CountDownLatch:等待其他动作完成

CyclicBarrier: 在共同点同步

Phaser: 分阶段同步

Exchanger: 在两个线程中交换数据

1、控制并发获取资源

采用Semaphores,表示计数器大于0的时候有资源可用。二进制semaphores(也就是0或者1)

package com.jack;

import java.util.concurrent.Semaphore;

public class PrintQueue {
private final Semaphore semaphore;

public PrintQueue() {
super();
this.semaphore = new Semaphore(1);
}

public void printJob(Object document){
try {
semaphore.acquire();
long duration = (long)(Math.random()*10);
System.out.printf("%s:打印队列:打印一个工作持续了%d秒\n", Thread.currentThread().getName(), duration);
Thread.sleep(duration);
}catch (InterruptedException e){
e.printStackTrace();
} finally{
semaphore.release();
}
}

}


package com.jack;

public class Job implements Runnable{

private PrintQueue printQueue;

public Job(PrintQueue printQueue) {
super();
this.printQueue = printQueue;
}

@Override
public void run() {
System.out.printf("%s:开始打印一个工作\n", Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: 这个文档已经打印了\n", Thread.currentThread().getName());

}

}

package com.jack;

public class Main {
public static void main(String[] args) {
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for (int i=0; i<10; i++){
thread[i] = new Thread(new Job(printQueue), "Thread" + i);

}
for (int i=0; i<10; i++){
thread[i].start();
}
}
}


总结:new Semaphore(1)表示有一个资源(类似这里只有一个打印机)
semaphore.acquire();表示获取semaphore计数器,然后这时候为0

semaphore.release():这时候semaphore释放,然后就是1,其他线程可以获取。

其他方法:

acquireUninterruptibly():忽略线程异常interruption

tryAcquire(): 如果这块代码有线程在用返回一个boolean值(false),否则true(可用)

semaphore模式

如:new Semaphore(1,false);(第二个参数)

non-fair mode(非公平模式):false(表示CPU指定)

fair mode(公平模式): true(等待时间最长线程)

2、控制并发获取多份相同的资源

例子:模拟三台打印机打印工作

只需要修改PrintQueue.java

package com.jack;

import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrintQueue {
private final Semaphore semaphore;
private boolean freePrinters[];
private Lock lockPrinters;

public PrintQueue() {
super();
this.semaphore = new Semaphore(3);
freePrinters = new boolean[3];
for (int i=0; i<3; i++){
freePrinters[i] = true;
}
lockPrinters = new ReentrantLock();
}

public void printJob(Object document){
try {
semaphore.acquire();
int assignedPrinter = getPrinter();
long duration = (long)(Math.random()*10);
System.out.printf("%s:打印队列:打印一个工作持续了%d秒\n", Thread.currentThread().getName(), duration);
Thread.sleep(duration);
freePrinters[assignedPrinter] = true;
}catch (InterruptedException e){
e.printStackTrace();
} finally{
semaphore.release();
}
}

private int getPrinter() {
int ret = -1;
try {
lockPrinters.lock();
for (int i=0; i<freePrinters.length; i++){
//true表示空闲
if(freePrinters[i]){
ret = i;
freePrinters[i] = false;
break;
}
}
} catch (Exception e){
e.printStackTrace();
} finally{
lockPrinters.unlock();
}
return ret;
}

}


日志:

Thread0:开始打印一个工作
Thread6:开始打印一个工作
Thread7:开始打印一个工作
Thread9:开始打印一个工作
Thread8:开始打印一个工作
Thread5:开始打印一个工作
Thread1:开始打印一个工作
Thread4:开始打印一个工作
Thread0:打印队列:打印一个工作持续了4秒
Thread6:打印队列:打印一个工作持续了7秒
Thread3:开始打印一个工作
Thread2:开始打印一个工作
Thread7:打印队列:打印一个工作持续了8秒
Thread0: 这个文档已经打印了
Thread9:打印队列:打印一个工作持续了3秒
Thread9: 这个文档已经打印了
Thread5:打印队列:打印一个工作持续了0秒
Thread8:打印队列:打印一个工作持续了0秒
Thread6: 这个文档已经打印了
Thread4:打印队列:打印一个工作持续了9秒
Thread8: 这个文档已经打印了
Thread1:打印队列:打印一个工作持续了8秒
Thread7: 这个文档已经打印了
Thread5: 这个文档已经打印了
Thread3:打印队列:打印一个工作持续了7秒
Thread3: 这个文档已经打印了
Thread2:打印队列:打印一个工作持续了4秒
Thread1: 这个文档已经打印了
Thread4: 这个文档已经打印了
Thread2: 这个文档已经打印了


总结:

1、它创建一个ReentrantLock(),锁定获取可用的打印机,而freePrinters[] 数组即使判断三台打印的状态。

3、等待多线程并发事件

例子:采用CountDownLatch模拟视频会议

package com.jack;

import java.util.concurrent.CountDownLatch;

public class Videoconference implements Runnable{

private final CountDownLatch controller;
public Videoconference(int number){
controller = new CountDownLatch(number);
}
public void arrive(String name){
System.out.printf("%s 已经到达了。\n", name);
controller.countDown();
System.out.printf("电话会议:正在等待%d参与者。\n", controller.getCount());
}
@Override
public void run() {
System.out.printf("电话会议:与会人员数量:%d \n", controller.getCount());

try{
controller.await();
System.out.printf("电话会议:所有参与已经到达了\n");
System.out.printf("电话会议:让我们开始吧。。。\n");
}catch (InterruptedException e){
e.printStackTrace();
}
}

}
package com.jack;

import java.util.concurrent.TimeUnit;

public class Participant implements Runnable{

private Videoconference conference;
private String name;

public Participant(Videoconference conference, String name) {
super();
this.conference = conference;
this.name = name;
}

@Override
public void run() {
long duration = (long)(Math.random()*10);
try{
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e){
e.printStackTrace();
}
conference.arrive(name);
}

}
package com.jack;

public class Main {
public static void main(String[] args) {
Videoconference conference = new Videoconference(10);
Thread threadConference = new Thread(conference);
threadConference.start();
for(int i=0; i<10; i++){
Participant p = new Participant(conference, "与会人员  " + i);
Thread t = new Thread(p);
t.start();
}
}

}


日志:
电话会议:与会人员数量:10
与会人员  7 已经到达了。
电话会议:正在等待9参与者。
与会人员  2 已经到达了。
电话会议:正在等待8参与者。
与会人员  8 已经到达了。
电话会议:正在等待7参与者。
与会人员  4 已经到达了。
电话会议:正在等待6参与者。
与会人员  1 已经到达了。
电话会议:正在等待5参与者。
与会人员  6 已经到达了。
电话会议:正在等待4参与者。
与会人员  9 已经到达了。
电话会议:正在等待3参与者。
与会人员  3 已经到达了。
与会人员  0 已经到达了。
电话会议:正在等待1参与者。
电话会议:正在等待2参与者。
与会人员  5 已经到达了。
电话会议:正在等待0参与者。
电话会议:所有参与已经到达了
电话会议:让我们开始吧。。。


总结:

1、开启两种线程,一种是视频会议线程,一种是与会人员,每一个与会人员就是一个线程。
2、CountDownLatch 会计算与会人员,直到为0,否则一直等待。(wait()方法,完成一次减一 countDown())

CountDownLatch与其他同步的不同点

1、CountDownLatch机制并不用于保护共享资源或代码块。它用于同步一个或者多个线程执行多任务的顺序。
2、一旦CountDownLatch到达0,所有调用的它的方法都没有效果。

4、在公共点同步任务

采用CyclicBarrier类似CountDownLatch, 它就像约个地方吃饭,总会有先到和后到的,当然先到等待后到的,直到所有人到齐。

例子:计算矩阵,计算子集合并结果



package com.jack;

import java.util.Random;

/**
* 模拟矩阵
* @author Administrator
*
*/
public class MatrixMock {

private int data[][];

public MatrixMock(int size, int length, int number){
int counter = 0;
data = new int[size][length];
Random  random = new Random();
for (int i=0; i<size; i++){
for (int j=0; j<length; j++){
data[i][j] = random.nextInt(10);
if(data[i][j]==number){
counter++;
}
}
}
System.out.printf("Mock:在生成数据为%d的数量%d  \n",  number,counter);

}

public int[] getRow(int row){
if((row>=0) && (row<data.length)){
return data[row];
}
return null;

}
}

package com.jack;

/**
* 查找的结果
* @author Administrator
*
*/
public class Results {

private int data[];

public Results(int size){
data = new int[size];
}
public void setData(int position, int value){
data[position] = value;
}

public int[] getData(){
return data;
}
}

package com.jack;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* 执行搜索的线程
* @author Administrator
*
*/
public class Searcher implements Runnable {

private int firstRow;
private int lastRow;
private MatrixMock mock;

private Results results;
private int number;
private final CyclicBarrier barrier;

public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) {
super();
this.firstRow = firstRow;
this.lastRow = lastRow;
this.mock = mock;
this.results = results;
this.number = number;
this.barrier = barrier;
}

@Override
public void run() {
int counter;
System.out.printf("%s:处理行数从%d到%d.\n",
Thread.currentThread().getName(),firstRow, lastRow);
for(int i=firstRow; i<lastRow; i++){
int row[] = mock.getRow(i);
counter=0;
for(int j=0; j<row.length; j++){
if(row[j]==number) {
counter++;
}
}
results.setData(i, counter);
}
System.out.printf("%s:行数处理完成.\n", Thread.currentThread().getName());
try {
//直到所有线程执行完毕,自动创建Grouper执行
barrier.await();
}catch (InterruptedException e){
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

}

package com.jack;

/**
* 统计结果
* @author Administrator
*
*/
public class Grouper implements Runnable{

private Results results;

public Grouper(Results results) {
super();
this.results = results;
}

@Override
public void run() {
int finalResult =0;
System.out.printf("组合:处理结果。。。\n");
int data[] = results.getData();
for(int number:data){
finalResult += number;
}
System.out.printf("组:总结果:%d.\n", finalResult);

}

}

package com.jack;

import java.util.concurrent.CyclicBarrier;

public class Main {
public static void main(String[] args) {
final int ROWS = 10000;
final int NUMBERS = 1000;
final int SEARCH=5;
final int PARTICIPANTS = 5;
final int LINES_PARTICIPANT =2000;

//表示搜索5出现次数
MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
//总共的行数结果
Results results = new Results(ROWS);
//将每行结果进行相加
Grouper grouper = new Grouper(results);
//拦截点就是grouper,在所有线程执行完,再执行grouper
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS,grouper);
Searcher searchers[] = new Searcher[PARTICIPANTS];
for (int i=0; i<PARTICIPANTS; i++){
searchers[i] = new Searcher(i*LINES_PARTICIPANT,
(i*LINES_PARTICIPANT) + LINES_PARTICIPANT, mock, results,
SEARCH, barrier);
Thread thread = new Thread(searchers[i]);
thread.start();
}

System.out.printf("Main : 主线程已经完成了\n");
}

}


日志:

Mock:在生成数据为5的数量1000513
组合:处理结果。。。
组:总结果:0.
Thread-1:处理行数从0到2000.
Thread-5:处理行数从8000到10000.
Main : 主线程已经完成了
Thread-4:处理行数从6000到8000.
Thread-3:处理行数从4000到6000.
Thread-2:处理行数从2000到4000.
Thread-5:行数处理完成.
Thread-4:行数处理完成.
Thread-2:行数处理完成.
Thread-3:行数处理完成.
Thread-1:行数处理完成.
组合:处理结果。。。
组:总结果:1000513.


与CountDownLatch不同的是CyclicBarrier可以重置初始化状态(reset()方法)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: