您的位置:首页 > 编程语言 > Java开发

学习笔记六:线程间的协作与通信之并发工具类

2018-07-26 17:15 519 查看
继续 上一篇 《 学习笔记五:线程间的协作与通信

在jdk的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段,如下主要介绍工具类的使用。

1、等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

CountDownLatch类实际上是使用计数器的方式去控制的,当我们初始化CountDownLatch的时候传入了一个int变量这个时候在类的内部初始化一个int的变量,每当我们调用countDownt()方法的时候就使得这个变量的值减1,而对于await()方法则去判断这个int的变量的值是否为0,是则表示所有的操作都已经完成,否则继续等待。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。计数器的值必须大于等于0,只是等于0的时候,计数器就是零,调用await方法时不会阻塞当前线程;这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值;一个线程调用countDown方法happen-before,另外一个线程调用await方法。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

示例:boss等待所有员工来开会,当所有人员都到齐之后,boss宣布开始会议!!!

package com.black.example.mutilThread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Created by 10250H on 2018/7/26.
*/
public class CountDownLatchDemo {
//声明countDownLatch 变量,初始化线程数量(内部计数器)
static CountDownLatch countDownLatch = new CountDownLatch(2);

public static void main(String[] args) {
new Thread(new MyRunner(countDownLatch, "小李", 2000)).start();
new Thread(new MyRunner(countDownLatch, "小张", 4000)).start();
new Thread(new MyRunner(countDownLatch, "小王", 5000)).start();

try {
System.out.println("等待员工到来开会。。。。。。。");
//注意这里是await。主线程将会一直等待在这里,当所有线程都执行 countDownLatch.countDown();之后当前线程才会继续执行
countDownLatch.await();
startMeeting("老板");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static void startMeeting(String name) {
System.out.println(name + "说:人齐了。会议开始!!");
}

static class MyRunner implements Runnable {
CountDownLatch countDownLatch;
String name;
int time;

public MyRunner(CountDownLatch countDownLatch, String name, int time) {
this.countDownLatch = countDownLatch;
this.name = name;
this.time = time;
}

@Override
public void run() {
try {
System.out.println(name + " 开始出发去公司。");
TimeUnit.SECONDS.sleep(1);
System.out.println(name + " 终于到会议室!!!");
countDownLatch.countDown();
System.out.println(name + " 准备好了!!");
} catch (Exception e) {
e.printStackTrace();
}

}
}

}

运行结果:注意,至于是谁先到会议室,每次运行结果都会不一样。因为主线程和子线程的调用时由CPU决定的



如果某个人缺席会议,我们不能让主线程一直等待,所以可以使用另外一个带指定时间的await方法-await(long time,TimeUtil unit)的那个带指定时间后,就好不再阻塞当前线程。jion也有类似的方法。

2、同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达屏障了,然后当前线程被阻塞。 示例代码如下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
* Created by 10250H on 2018/7/26.
*/
public class CyclicBarrierDemo1 {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {
new Thread(new MyThread(1)).start();
new Thread(new MyThread(2)).start();
}

static class MyThread implements Runnable{
private int counter;
public MyThread(int counter){
this.counter = counter;
}
@Override
public void run() {
try {
System.out.println("当前值输出:"+counter);
TimeUnit.SECONDS.sleep(1);
cyclicBarrier.await();
System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}catch (BrokenBarrierException e) {
e.printStackTrace();
}

}
}
}

运行结果:因为主线程和子线程的调用时由CPU决定的,两个线程都有可能先执行,所以会产生不同的结果





如果把new CyclicBarrier(2) 修改为new CyclicBarrier(3),则主线程和子线程会永远等待,不会继续执行,因为第三个还没有到达屏障,所以之前到达屏障的两个线程都不会继续执行。

CyclicBarrier还提供了高级构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在现场到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码如下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo2 {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3));

public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("通用计数当前值:1");
cyclicBarrier.await();
System.out.println("默认执行通用线程2");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("通用计数当前值:2");
cyclicBarrier.await();
System.out.println("默认执行通用线程2");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}

static class MyThread implements Runnable{
private int counter;
public MyThread(int counter){
this.counter = counter;
}
@Override
public void run() {
try {
System.out.println("优先执行:"+counter);
TimeUnit.SECONDS.sleep(1);
System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

运行结果:当两个线程都到达屏障后,优先执行对象MyThread 的任务。



CyclicBarrier的应用场景

可以用于多线程计算数据,最后合并计算结果的场景。例如:用一个Excel保存用户的所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水。最后再用barrierAction总结出整个Excel的日均银行流水,示例代码如下:

package com.black.example.mutilThread;

import java.util.Map;
import java.util.concurrent.*;

public class BankWaterService implements Runnable {
//创建4个屏障,处理完之后执行当前类的run方法
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
//假设只有4个sheet,所以启动4个线程
private Executor executor = Executors.newFixedThreadPool(4);
//保存每个sheet计算出的银行流水结果
private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>();

private void count(){
for (int i=0;i<4;i++){
executor.execute(new Runnable() {
@Override
public void run() {
//1:计算当前sheet的银行流水数据,计算代码省略....伪代码如下:
sheetCountMap.put(Thread.currentThread().getName(),1);
//计算完成,插入屏障
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}

@Override
public void run() {
int result=0;
//汇总每个sheet的计算结果
for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){
result+=entry.getValue();
}
//输出结果
sheetCountMap.put("result",result);
System.out.println("最终结果:"+result);
}

public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}

运行结果:最终结果:4

3、CyclicBarrier与CountDownLatch的区别

CountDownLatch计数器只能使用一次,而CyclicBarrier[b]计数器可以使用reset()方法重置。[/b]

[b]所以CyclicBarrier可以处理更为复杂的业务场景,如:计算发生错误,可以重置计数器,并让线程重新执行一次。[/b]

CyclicBarrier其他用法及源代码如下,如:

isBroken() :阻塞的线程是否被中断,返回值boolean类型

getNumberWaiting() :获取Cyclic-Barrier阻塞的线程数量

4、控制并发线程数的Semaphore

Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

应用场景

Semaphore可以用于流量控制,特别是公用资源有限的应用场景,比如数据库链接。

假如有个需求,要读几万个文件的数据,因为是IO密集型任务,我们可以启动几十个线程并发读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库链接保存数据,否则会报错无法获取数据库链接。这时可以使用Semaphore来做流量控制,代码示例如下:

package com.black.example.mutilThread;

import java.util.concurrent.*;

public class SemaphoreDemo {
private static final int THREAD_COUNT=30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(10);//10个并发

public static void main(String[] args) {
for (int i=0;i<THREAD_COUNT;i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"-----请求资源");
//请求获取资源,如果有空闲资源则会立即获取,进入临界区,否则将会等待,一直等待到获取到临界区资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"----获取资源,保存数据!");
TimeUnit.SECONDS.sleep(1);
semaphore.release();//释放资源
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

}

运行结果:

pool-1-thread-4-----请求资源
pool-1-thread-2-----请求资源
pool-1-thread-4----获取资源,保存数据!
pool-1-thread-1-----请求资源
pool-1-thread-5-----请求资源
pool-1-thread-3-----请求资源
pool-1-thread-5----获取资源,保存数据!
pool-1-thread-8-----请求资源
pool-1-thread-1----获取资源,保存数据!
pool-1-thread-9-----请求资源
pool-1-thread-2----获取资源,保存数据!
pool-1-thread-7-----请求资源
pool-1-thread-9----获取资源,保存数据!
pool-1-thread-10-----请求资源
pool-1-thread-11-----请求资源
pool-1-thread-8----获取资源,保存数据!
pool-1-thread-6-----请求资源
pool-1-thread-3----获取资源,保存数据!
pool-1-thread-13-----请求资源
pool-1-thread-11----获取资源,保存数据!
pool-1-thread-10----获取资源,保存数据!
pool-1-thread-12-----请求资源
pool-1-thread-7----获取资源,保存数据!
pool-1-thread-15-----请求资源
pool-1-thread-14-----请求资源
pool-1-thread-16-----请求资源
pool-1-thread-17-----请求资源
pool-1-thread-18-----请求资源
pool-1-thread-19-----请求资源
pool-1-thread-21-----请求资源
pool-1-thread-20-----请求资源
pool-1-thread-22-----请求资源
pool-1-thread-23-----请求资源
pool-1-thread-24-----请求资源
pool-1-thread-25-----请求资源
pool-1-thread-27-----请求资源
pool-1-thread-26-----请求资源
pool-1-thread-28-----请求资源
pool-1-thread-29-----请求资源
pool-1-thread-30-----请求资源
pool-1-thread-6----获取资源,保存数据!
pool-1-thread-13----获取资源,保存数据!
pool-1-thread-12----获取资源,保存数据!
pool-1-thread-15----获取资源,保存数据!
pool-1-thread-16----获取资源,保存数据!
pool-1-thread-17----获取资源,保存数据!
pool-1-thread-14----获取资源,保存数据!
pool-1-thread-21----获取资源,保存数据!
pool-1-thread-19----获取资源,保存数据!
pool-1-thread-18----获取资源,保存数据!
pool-1-thread-20----获取资源,保存数据!
pool-1-thread-22----获取资源,保存数据!
pool-1-thread-23----获取资源,保存数据!
pool-1-thread-26----获取资源,保存数据!
pool-1-thread-27----获取资源,保存数据!
pool-1-thread-25----获取资源,保存数据!
pool-1-thread-24----获取资源,保存数据!
pool-1-thread-30----获取资源,保存数据!
pool-1-thread-29----获取资源,保存数据!
pool-1-thread-28----获取资源,保存数据!

Semaphore简单用法如下:

//构造方法,可用的许可证数量,默认使用非公平锁的方式创建
public Semaphore(int permits){...}
//尝试获取许可证
public boolean tryAcquire(){...}
//尝试获取许可证,在指定时间内若获取不到则返回
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...}
//返回信号量中当前可用的许可证数
public int availablePermits(){...}
//返回正在等待获取许可证的线程数(估计值)
public final int getQueueLength() {...}
//查询是否有任何线程等待获取许可证
public final boolean hasQueuedThreads(){...}
//减少reduction个许可证,这个方法在使用的子类中很有用跟踪不可用资源的信号量
protected void reducePermits(int reduction) {...}
//返回所有等待获取许可证的线程集合
protected Collection<Thread> getQueuedThreads(){...}


上一篇:学习笔记五:线程间的协作与通信

下一篇:待续....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  JDK Count_Down