第三章 Thread Synchronization Utilities(线程同步工具类)【上】
2017-08-14 17:12
357 查看
涉及内容:
控制并发获取资源
控制并发获取多份copy资源
等待多个并发事件
在公共点同步任务
运行并发多阶段任务
在阶段任务中控制阶段变化
在并发任务之间改变数据
这章节使用更加高级的同步工具类。
Semaphores: 计数器,表示多少资源可以使用
CountDownLatch:等待其他动作完成
CyclicBarrier: 在共同点同步
Phaser: 分阶段同步
Exchanger: 在两个线程中交换数据
总结:new Semaphore(1)表示有一个资源(类似这里只有一个打印机)
semaphore.acquire();表示获取semaphore计数器,然后这时候为0
semaphore.release():这时候semaphore释放,然后就是1,其他线程可以获取。
其他方法:
acquireUninterruptibly():忽略线程异常interruption
tryAcquire(): 如果这块代码有线程在用返回一个boolean值(false),否则true(可用)
semaphore模式
如:new Semaphore(1,false);(第二个参数)
non-fair mode(非公平模式):false(表示CPU指定)
fair mode(公平模式): true(等待时间最长线程)
只需要修改PrintQueue.java
日志:
总结:
1、它创建一个ReentrantLock(),锁定获取可用的打印机,而freePrinters[] 数组即使判断三台打印的状态。
日志:
总结:
1、开启两种线程,一种是视频会议线程,一种是与会人员,每一个与会人员就是一个线程。
2、CountDownLatch 会计算与会人员,直到为0,否则一直等待。(wait()方法,完成一次减一 countDown())
CountDownLatch与其他同步的不同点
1、CountDownLatch机制并不用于保护共享资源或代码块。它用于同步一个或者多个线程执行多任务的顺序。
2、一旦CountDownLatch到达0,所有调用的它的方法都没有效果。
例子:计算矩阵,计算子集合并结果
日志:
与CountDownLatch不同的是CyclicBarrier可以重置初始化状态(reset()方法)
控制并发获取资源
控制并发获取多份copy资源
等待多个并发事件
在公共点同步任务
运行并发多阶段任务
在阶段任务中控制阶段变化
在并发任务之间改变数据
这章节使用更加高级的同步工具类。
Semaphores: 计数器,表示多少资源可以使用
CountDownLatch:等待其他动作完成
CyclicBarrier: 在共同点同步
Phaser: 分阶段同步
Exchanger: 在两个线程中交换数据
1、控制并发获取资源
采用Semaphores,表示计数器大于0的时候有资源可用。二进制semaphores(也就是0或者1)package com.jack; import java.util.concurrent.Semaphore; public class PrintQueue { private final Semaphore semaphore; public PrintQueue() { super(); this.semaphore = new Semaphore(1); } public void printJob(Object document){ try { semaphore.acquire(); long duration = (long)(Math.random()*10); System.out.printf("%s:打印队列:打印一个工作持续了%d秒\n", Thread.currentThread().getName(), duration); Thread.sleep(duration); }catch (InterruptedException e){ e.printStackTrace(); } finally{ semaphore.release(); } } }
package com.jack; public class Job implements Runnable{ private PrintQueue printQueue; public Job(PrintQueue printQueue) { super(); this.printQueue = printQueue; } @Override public void run() { System.out.printf("%s:开始打印一个工作\n", Thread.currentThread().getName()); printQueue.printJob(new Object()); System.out.printf("%s: 这个文档已经打印了\n", Thread.currentThread().getName()); } }
package com.jack; public class Main { public static void main(String[] args) { PrintQueue printQueue = new PrintQueue(); Thread thread[] = new Thread[10]; for (int i=0; i<10; i++){ thread[i] = new Thread(new Job(printQueue), "Thread" + i); } for (int i=0; i<10; i++){ thread[i].start(); } } }
总结:new Semaphore(1)表示有一个资源(类似这里只有一个打印机)
semaphore.acquire();表示获取semaphore计数器,然后这时候为0
semaphore.release():这时候semaphore释放,然后就是1,其他线程可以获取。
其他方法:
acquireUninterruptibly():忽略线程异常interruption
tryAcquire(): 如果这块代码有线程在用返回一个boolean值(false),否则true(可用)
semaphore模式
如:new Semaphore(1,false);(第二个参数)
non-fair mode(非公平模式):false(表示CPU指定)
fair mode(公平模式): true(等待时间最长线程)
2、控制并发获取多份相同的资源
例子:模拟三台打印机打印工作只需要修改PrintQueue.java
package com.jack; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class PrintQueue { private final Semaphore semaphore; private boolean freePrinters[]; private Lock lockPrinters; public PrintQueue() { super(); this.semaphore = new Semaphore(3); freePrinters = new boolean[3]; for (int i=0; i<3; i++){ freePrinters[i] = true; } lockPrinters = new ReentrantLock(); } public void printJob(Object document){ try { semaphore.acquire(); int assignedPrinter = getPrinter(); long duration = (long)(Math.random()*10); System.out.printf("%s:打印队列:打印一个工作持续了%d秒\n", Thread.currentThread().getName(), duration); Thread.sleep(duration); freePrinters[assignedPrinter] = true; }catch (InterruptedException e){ e.printStackTrace(); } finally{ semaphore.release(); } } private int getPrinter() { int ret = -1; try { lockPrinters.lock(); for (int i=0; i<freePrinters.length; i++){ //true表示空闲 if(freePrinters[i]){ ret = i; freePrinters[i] = false; break; } } } catch (Exception e){ e.printStackTrace(); } finally{ lockPrinters.unlock(); } return ret; } }
日志:
Thread0:开始打印一个工作 Thread6:开始打印一个工作 Thread7:开始打印一个工作 Thread9:开始打印一个工作 Thread8:开始打印一个工作 Thread5:开始打印一个工作 Thread1:开始打印一个工作 Thread4:开始打印一个工作 Thread0:打印队列:打印一个工作持续了4秒 Thread6:打印队列:打印一个工作持续了7秒 Thread3:开始打印一个工作 Thread2:开始打印一个工作 Thread7:打印队列:打印一个工作持续了8秒 Thread0: 这个文档已经打印了 Thread9:打印队列:打印一个工作持续了3秒 Thread9: 这个文档已经打印了 Thread5:打印队列:打印一个工作持续了0秒 Thread8:打印队列:打印一个工作持续了0秒 Thread6: 这个文档已经打印了 Thread4:打印队列:打印一个工作持续了9秒 Thread8: 这个文档已经打印了 Thread1:打印队列:打印一个工作持续了8秒 Thread7: 这个文档已经打印了 Thread5: 这个文档已经打印了 Thread3:打印队列:打印一个工作持续了7秒 Thread3: 这个文档已经打印了 Thread2:打印队列:打印一个工作持续了4秒 Thread1: 这个文档已经打印了 Thread4: 这个文档已经打印了 Thread2: 这个文档已经打印了
总结:
1、它创建一个ReentrantLock(),锁定获取可用的打印机,而freePrinters[] 数组即使判断三台打印的状态。
3、等待多线程并发事件
例子:采用CountDownLatch模拟视频会议package com.jack; import java.util.concurrent.CountDownLatch; public class Videoconference implements Runnable{ private final CountDownLatch controller; public Videoconference(int number){ controller = new CountDownLatch(number); } public void arrive(String name){ System.out.printf("%s 已经到达了。\n", name); controller.countDown(); System.out.printf("电话会议:正在等待%d参与者。\n", controller.getCount()); } @Override public void run() { System.out.printf("电话会议:与会人员数量:%d \n", controller.getCount()); try{ controller.await(); System.out.printf("电话会议:所有参与已经到达了\n"); System.out.printf("电话会议:让我们开始吧。。。\n"); }catch (InterruptedException e){ e.printStackTrace(); } } }
package com.jack; import java.util.concurrent.TimeUnit; public class Participant implements Runnable{ private Videoconference conference; private String name; public Participant(Videoconference conference, String name) { super(); this.conference = conference; this.name = name; } @Override public void run() { long duration = (long)(Math.random()*10); try{ TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e){ e.printStackTrace(); } conference.arrive(name); } }
package com.jack; public class Main { public static void main(String[] args) { Videoconference conference = new Videoconference(10); Thread threadConference = new Thread(conference); threadConference.start(); for(int i=0; i<10; i++){ Participant p = new Participant(conference, "与会人员 " + i); Thread t = new Thread(p); t.start(); } } }
日志:
电话会议:与会人员数量:10 与会人员 7 已经到达了。 电话会议:正在等待9参与者。 与会人员 2 已经到达了。 电话会议:正在等待8参与者。 与会人员 8 已经到达了。 电话会议:正在等待7参与者。 与会人员 4 已经到达了。 电话会议:正在等待6参与者。 与会人员 1 已经到达了。 电话会议:正在等待5参与者。 与会人员 6 已经到达了。 电话会议:正在等待4参与者。 与会人员 9 已经到达了。 电话会议:正在等待3参与者。 与会人员 3 已经到达了。 与会人员 0 已经到达了。 电话会议:正在等待1参与者。 电话会议:正在等待2参与者。 与会人员 5 已经到达了。 电话会议:正在等待0参与者。 电话会议:所有参与已经到达了 电话会议:让我们开始吧。。。
总结:
1、开启两种线程,一种是视频会议线程,一种是与会人员,每一个与会人员就是一个线程。
2、CountDownLatch 会计算与会人员,直到为0,否则一直等待。(wait()方法,完成一次减一 countDown())
CountDownLatch与其他同步的不同点
1、CountDownLatch机制并不用于保护共享资源或代码块。它用于同步一个或者多个线程执行多任务的顺序。
2、一旦CountDownLatch到达0,所有调用的它的方法都没有效果。
4、在公共点同步任务
采用CyclicBarrier类似CountDownLatch, 它就像约个地方吃饭,总会有先到和后到的,当然先到等待后到的,直到所有人到齐。例子:计算矩阵,计算子集合并结果
package com.jack; import java.util.Random; /** * 模拟矩阵 * @author Administrator * */ public class MatrixMock { private int data[][]; public MatrixMock(int size, int length, int number){ int counter = 0; data = new int[size][length]; Random random = new Random(); for (int i=0; i<size; i++){ for (int j=0; j<length; j++){ data[i][j] = random.nextInt(10); if(data[i][j]==number){ counter++; } } } System.out.printf("Mock:在生成数据为%d的数量%d \n", number,counter); } public int[] getRow(int row){ if((row>=0) && (row<data.length)){ return data[row]; } return null; } }
package com.jack; /** * 查找的结果 * @author Administrator * */ public class Results { private int data[]; public Results(int size){ data = new int[size]; } public void setData(int position, int value){ data[position] = value; } public int[] getData(){ return data; } }
package com.jack; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 执行搜索的线程 * @author Administrator * */ public class Searcher implements Runnable { private int firstRow; private int lastRow; private MatrixMock mock; private Results results; private int number; private final CyclicBarrier barrier; public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) { super(); this.firstRow = firstRow; this.lastRow = lastRow; this.mock = mock; this.results = results; this.number = number; this.barrier = barrier; } @Override public void run() { int counter; System.out.printf("%s:处理行数从%d到%d.\n", Thread.currentThread().getName(),firstRow, lastRow); for(int i=firstRow; i<lastRow; i++){ int row[] = mock.getRow(i); counter=0; for(int j=0; j<row.length; j++){ if(row[j]==number) { counter++; } } results.setData(i, counter); } System.out.printf("%s:行数处理完成.\n", Thread.currentThread().getName()); try { //直到所有线程执行完毕,自动创建Grouper执行 barrier.await(); }catch (InterruptedException e){ e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
package com.jack; /** * 统计结果 * @author Administrator * */ public class Grouper implements Runnable{ private Results results; public Grouper(Results results) { super(); this.results = results; } @Override public void run() { int finalResult =0; System.out.printf("组合:处理结果。。。\n"); int data[] = results.getData(); for(int number:data){ finalResult += number; } System.out.printf("组:总结果:%d.\n", finalResult); } }
package com.jack; import java.util.concurrent.CyclicBarrier; public class Main { public static void main(String[] args) { final int ROWS = 10000; final int NUMBERS = 1000; final int SEARCH=5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANT =2000; //表示搜索5出现次数 MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH); //总共的行数结果 Results results = new Results(ROWS); //将每行结果进行相加 Grouper grouper = new Grouper(results); //拦截点就是grouper,在所有线程执行完,再执行grouper CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS,grouper); Searcher searchers[] = new Searcher[PARTICIPANTS]; for (int i=0; i<PARTICIPANTS; i++){ searchers[i] = new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT) + LINES_PARTICIPANT, mock, results, SEARCH, barrier); Thread thread = new Thread(searchers[i]); thread.start(); } System.out.printf("Main : 主线程已经完成了\n"); } }
日志:
Mock:在生成数据为5的数量1000513 组合:处理结果。。。 组:总结果:0. Thread-1:处理行数从0到2000. Thread-5:处理行数从8000到10000. Main : 主线程已经完成了 Thread-4:处理行数从6000到8000. Thread-3:处理行数从4000到6000. Thread-2:处理行数从2000到4000. Thread-5:行数处理完成. Thread-4:行数处理完成. Thread-2:行数处理完成. Thread-3:行数处理完成. Thread-1:行数处理完成. 组合:处理结果。。。 组:总结果:1000513.
与CountDownLatch不同的是CyclicBarrier可以重置初始化状态(reset()方法)
相关文章推荐
- 第三章 Thread Synchronization Utilities(线程同步工具类)【下】
- [笔记][Java7并发编程实战手册]第三章-线程同步辅助类-3.1概要
- 第三章 线程安全的DateFormat工具类
- 线程同步工具类--CyclicBarrier
- 线程同步工具类——CyclicBarrier
- 线程同步工具类
- Java7并发编程指南——第三章:线程同步辅助类
- Java线程知识__其他几种线程同步的工具类的使用(CyclicBarrier,CountDownLatch,Exchanger)
- 线程同步工具类——CountDownLatch
- 线程同步工具类
- 线程同步工具类之栅栏
- 线程同步工具类之信号量(Semaphore)
- 第三章 线程安全的DateFormat工具类
- 第三章线程同步辅助类
- java Collections工具类的 一些功能(查找,最值,乱序,逆序,排序,交换,替换,让集合线程同步)
- Java核心知识点学习----线程同步工具类,CyclicBarrier学习
- 第三章:多线程技术——线程优先级与线程同步
- 线程同步工具类,CyclicBarrier学习
- java并发实战手册第三章(线程同步辅助类介绍)
- 【Android 工具类】常用工具类(方法)大全