您的位置:首页 > 编程语言 > Java开发

java中的并发工具类

2017-03-01 11:18 369 查看

概述

CountDownLatch:允许一个或多个线程 等待其他线程完成操作

CyclicBarrier:当一组线程达到同步点时被阻塞,直到最后一个线程到达同步点时,所有被拦截的线程才会继续运行

Semaphore:信号量,用来控制同时访问特定资源的线程数量,它通过协调各线程以保证合理的使用公共资源

Exchanger:用于进行线程间的数据交换

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程执行完成。CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。

构造函数:

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}


CountDownLatch的构造函数可以传入一个int类型的参数N,表示需要等待N个点完成。调用CountDownLatch的countDown()方法时,N会减1。CountDownLatch的await方法会阻塞当前线程,直到N变为0.

CountDownLatch实例:

/**
* 十个人分成两组比赛,每组5个,当发令枪响时,开始比赛
* @author Administrator
*
*/
public class CountDownLatchDemo {
private static final int GROUP_SIZE=5;
private static final Random RANDOM = new Random();
private static void processOneGroup(final String groupName){
//等待所以选手准备就绪
final CountDownLatch preCountDownLatch = new CountDownLatch(GROUP_SIZE);
//等待比赛开始
final CountDownLatch startCountDownLatch = new CountDownLatch(1);
//等待比赛结束
final CountDownLatch endCountDownLatch = new CountDownLatch(GROUP_SIZE);

System.out.println(groupName+",比赛开始:");

for (int i = 0; i < GROUP_SIZE; i++) {
new Thread(String.valueOf(i)){
public void run() {
preCountDownLatch.countDown();//准备完毕
System.out.println("第"+GROUP_SIZE+"组,第"+this.getName()+"号选手,已准备完毕!~");
try {
//等待裁判发出开始指令
startCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
//运行一个随机时间,表示选手实际的比赛时间
Thread.sleep(Math.abs(RANDOM.nextInt())%1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//完成比赛
endCountDownLatch.countDown();
};
}.start();
}

try {
preCountDownLatch.await();//等待所有的选手准备完毕
} catch (InterruptedException e) {
e.printStackTrace();
}

startCountDownLatch.countDown();//开始比赛

try {
endCountDownLatch.await();//等待所有的选手结束比赛
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(groupName+" 比赛结束!");
}
public static void main(String[] args) {
processOneGroup("分组1");
processOneGroup("分组2");
}

}


输出结果:

分组1,比赛开始:

第5组,第0号选手,已准备完毕!~

第5组,第2号选手,已准备完毕!~

第5组,第1号选手,已准备完毕!~

第5组,第3号选手,已准备完毕!~

第5组,第4号选手,已准备完毕!~

分组1 比赛结束!

分组2,比赛开始:

第5组,第0号选手,已准备完毕!~

第5组,第1号选手,已准备完毕!~

第5组,第2号选手,已准备完毕!~

第5组,第3号选手,已准备完毕!~

第5组,第4号选手,已准备完毕!~

分组2 比赛结束!

countDown方法的实现如下:

/**
* 减少锁存器的计数,如果计数达到零,则释放所有等待的线程。
* 如果当前计数大于零,那么它被递减。如果新计数为零,则所有等待的线程被重新启用用于线程调度目的。如果当前计数等于零,则什么也不发生。
*/
public void countDown() {
sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


protected boolean tryReleaseShared(int releases) {
// 递减计数; 直到信号转换为零时
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}


/**
* 确保释放传播,即使有其他进行中的获取/释放。 如果它需要信号,则以通常的方式尝试解除头的处理器。 但如果不是,状态设置为PROPAGATE以确保在释放后,传播继续。 另外,我们必须循环,以便在我们这样做的时候添加一个新的节点。 此外,与unparkSuccessor的其他用法不同,我们需要知道CAS是否重置状态失败,如果这样重新检查。
*/
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;            // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;                // loop on failed CAS
}
if (h == head)                   // loop if head changed
break;
}
}


CountDownLatch与Thread.join()的区别;

调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕.

CyclicBarrier

CyclicBarrier可以让一组线程到达一个同步点时被阻塞,直到最后一个线程到达同步点时,所有被阻塞的线程才会继续运行。

默认构造方法:parties表示阻塞在同步点的线程数。

public CyclicBarrier(int parties) {
this(parties, null);
}


public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;//表示仍在等待的线程数,只要一个线程到达同步点,count就减1,直到0为止
this.barrierCommand = barrierAction;//当线程全部到达同步点时,优先执行barrierAction
}


每个线程调用await方法,告诉CyclicBarrier已经到达同步点,然后当前线程被阻塞.

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

CyclicBarrier实例如下:

/**
* 用一个Excel保存用户所有银行流水,每个Sheet保存一个账号近一年的每笔银行
* 流水,现在需要统计用户的日军银行流水,先用多线程处理每个sheet,都执行完
* 之后,得到每个sheet的日均银行流水,最后用barrierAction技术整个Excel的
* 日均银行流水
* @author Administrator
*
*/
public class CyclicBarrierDemo implements Runnable{
//
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
//假设只有4个sheet,启动4个线程
private Executor executor = Executors.newFixedThreadPool(4);
//保存每个sheet计算结果
private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

private void count(){
for (int i = 0; i < 4; i++) {

executor.execute(new Runnable() {

@Override
public void run() {

//业务操作,计算sheet的银行流水数据,假设结果为1,代码省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);

try {
cyclicBarrier.await();
} catch (Exception e) {

}
}
});

}
}
//汇总操作
@Override
public void run() {
int result = 0;
//汇总每个sheet计算的结果
for (Entry<String,Integer>  entry : sheetBankWaterCount.entrySet()) {
result+=entry.getValue();
}
//将结果输出
System.out.println(result);
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.count();
}

}


结果输出:

4

CyclicBarrier和CountDownLatch的区别:

CountDownLatch到达计算器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所有CyclicBarrier可以使用更复杂的业务场景。

CyclicBarrier还提供了getNumberWaiting方法可以获得阻塞的线程数量。isBroken可以获得阻塞的线程是否被中断

需要注意的是,如果CyclicBarrier的parriers数比时间的线程数大时,那么这组线程将会被一直阻塞

Semaphore

Semaphore是用来控制同时访问特定资源 的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore可以用于做流量控制,特别是公有资源有限的应用场景,比如数据库链接。

Semaphore的构造方法Semaphore(int permits)接受一个整型的数组,表示最大的并发数。线程使用Semaphore的acquire()方法获得一个许可,使用之后调用release()方法归还许可。

Semaphore示例:

/**
* 假设有10个线程要访问某个资源,但是我们的并发量控制为3
* @author Administrator
*
*/
public class SemaphoreDemo {
private static final int THREAD_COUNT=10;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore semaphore = new Semaphore(3);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {

@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程"+Thread.currentThread().getName()+"获得访问资源的权利!");
Thread.sleep(TimeUnit.SECONDS.toSeconds(5));
System.out.println("线程"+Thread.currentThread().getName()+"访问结束!");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
});
}
threadPool.shutdown();
}
}


输出结果如下:

线程pool-1-thread-1获得访问资源的权利!

线程pool-1-thread-2获得访问资源的权利!

线程pool-1-thread-3获得访问资源的权利!

线程pool-1-thread-2访问结束!

线程pool-1-thread-3访问结束!

线程pool-1-thread-4获得访问资源的权利!

线程pool-1-thread-1访问结束!

线程pool-1-thread-5获得访问资源的权利!

线程pool-1-thread-6获得访问资源的权利!

线程pool-1-thread-5访问结束!

线程pool-1-thread-6访问结束!

线程pool-1-thread-4访问结束!

线程pool-1-thread-8获得访问资源的权利!

线程pool-1-thread-9获得访问资源的权利!

线程pool-1-thread-7获得访问资源的权利!

线程pool-1-thread-7访问结束!

线程pool-1-thread-10获得访问资源的权利!

线程pool-1-thread-8访问结束!

线程pool-1-thread-9访问结束!

线程pool-1-thread-10访问结束!

通过打印结果我们可以看出,当已经有三个线程获取了资源访问的权利后,后面的线程只能等这三个线程中的一个释放了访问权利,才能接着访问资源。

Exchanger

Exchanger用于线程间的协作,进行数据交互。它提供一个同步点,在这个同步点,两个线程可用交互彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,他会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将生产出来的数据传递给对方。

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需要将纸质银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗,两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否一致。

代码实例:

public class ExchangerDemo {
/**
*
* 生产者
*
*/
static class Producer implements Runnable{
private List<String> producerBuffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> producerBuffer,Exchanger<List<String>> exchanger){
this.producerBuffer=producerBuffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
producerBuffer.add("生产者"+i);
System.out.println("第"+i+"次生产者在等待与消费者交换数据");
try {
producerBuffer=exchanger.exchange(producerBuffer);
System.out.println("第"+i+"次生产者与消费者交换后的数据为:"+producerBuffer.get(i));
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

}
/**
*消费者
*/
static class Consumer implements Runnable{
private List<String> consumerBuffer;
private final Exchanger<List<String>> exchanger;

public Consumer(List<String> consumerBuffer,Exchanger<List<String>> exchanger) {
this.consumerBuffer=consumerBuffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
consumerBuffer.add("消费者"+i);
try {
System.out.println("第"+i+"次消费者正在等待与生产者交换数据!");
consumerBuffer=exchanger.exchange(consumerBuffer);
System.out.println("第"+i+"次消费者交换后的数据为:"+consumerBuffer.get(i));

} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

}
public static void main(String[] args) {

List<String> producerBuffer = new ArrayList<>();

List<String> consumerBuffer = new ArrayList<>();

Exchanger<List<String>> exchanger = new Exchanger<>();

ExchangerDemo.Producer  producer = new Producer(producerBuffer, exchanger);

ExchangerDemo.Consumer consumer = new Consumer(consumerBuffer, exchanger);

Thread producerThread = new Thread(producer);

Thread consumerThread = new Thread(consumer);

producerThread.start();

consumerThread.start();

}
}


输出结果为

第0次生产者在等待与消费者交换数据

第0次消费者正在等待与生产者交换数据!

第0次消费者交换后的数据为:生产者0

第0次生产者与消费者交换后的数据为:消费者0

第1次消费者正在等待与生产者交换数据!

第1次生产者在等待与消费者交换数据

第1次消费者交换后的数据为:生产者1

第1次生产者与消费者交换后的数据为:消费者1

第2次消费者正在等待与生产者交换数据!

第2次生产者在等待与消费者交换数据

第2次生产者与消费者交换后的数据为:消费者2

第2次消费者交换后的数据为:生产者2

第3次生产者在等待与消费者交换数据

第3次消费者正在等待与生产者交换数据!

第3次生产者与消费者交换后的数据为:消费者3

第3次消费者交换后的数据为:生产者3

第4次生产者在等待与消费者交换数据

第4次消费者正在等待与生产者交换数据!

第4次消费者交换后的数据为:生产者4

第4次生产者与消费者交换后的数据为:消费者4

如果两个线程,有一个线程没有调用exchange()方法,那么则会一直等待,因此,为避免一直等待,可以使用exchange(V x,long timeout,TimeUnit unit)设置等待时长
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息