线程同步工具类
2016-10-21 10:16
211 查看
阅读目录
写在前面闭锁
信号量
栅栏
阻塞队列
写在前面
同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;使用同步工具类可以协调线程的控制流;
同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;
下面将简单介绍以上同步工具类;
闭锁
可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;以CountDownLatch为例,内部包含一个计数器,一开始初始化为一个整数(事件个数),发生一个事件后,调用countDown方法,计数器减1,await用于等待计数器为0后继续执行当前线程;
举个例子如下,main线程等待其它子线程的事件发生后继续执行main线程:
package concurrency; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class TaskTest implements Runnable { private CountDownLatch latch; private int sleepTime; /** * */ public TaskTest(int sleepTime, CountDownLatch latch) { this.sleepTime = sleepTime; this.latch = latch; } /** * @see java.lang.Runnable#run() */ @Override public void run() { try { CountDownLatchTest.print(" is running。"); TimeUnit.MILLISECONDS.sleep(sleepTime); CountDownLatchTest.print(" finished。"); //计数器减减 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } public class CountDownLatchTest { public static void main(String[] args) { int count = 10; final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(count); for (int i = 0; i < count; i++) { es.execute(new TaskTest((i + 1) * 1000, latch)); } try { CountDownLatchTest.print(" waiting..."); //主线程等待其它事件发生 latch.await(); //其它事件已发生,继续执行主线程 CountDownLatchTest.print(" continue。。。"); } catch (InterruptedException e) { e.printStackTrace(); } finally { es.shutdown(); } } public static void print(String str){ SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss"); System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str); } }
结果打印如下:
信号量
控制同时执行某个指定操作的数量,常用于实现资源池,如数据库连接池,线程池...以Semaphore为例,其内部维护一组资源,可以通过构造函数指定数目,其它线程在执行的时候,可以通过acquire方法获取资源,有的话,继续执行(使用结束后释放资源),没有资源的话将阻塞直到有其它线程调用release方法释放资源;
举个例子,如下代码,十个线程竞争三个资源,一开始有三个线程可以直接运行,剩下的七个线程只能阻塞等到其它线程使用资源完毕才能执行;
package concurrency; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreTest { public static void print(String str){ SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss"); System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str); } public static void main(String[] args) { // 线程数目 int threadCount = 10; // 资源数目 Semaphore semaphore = new Semaphore(3); ExecutorService es = Executors.newFixedThreadPool(threadCount); // 启动若干线程 for (int i = 0; i < threadCount; i++) es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore)); } } class ConsumeResourceTask implements Runnable { private Semaphore semaphore; private int sleepTime; /** * */ public ConsumeResourceTask(int sleepTime, Semaphore semaphore) { this.sleepTime = sleepTime; this.semaphore = semaphore; } public void run() { try { //获取资源 semaphore.acquire(); SemaphoreTest.print(" 占用一个资源..."); TimeUnit.MILLISECONDS.sleep(sleepTime); SemaphoreTest.print(" 资源使用结束,释放资源"); //释放资源 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }
[10:30:11]pool-1-thread-1 占用一个资源... [10:30:11]pool-1-thread-2 占用一个资源... [10:30:11]pool-1-thread-3 占用一个资源... [10:30:12]pool-1-thread-1 资源使用结束,释放资源 [10:30:12]pool-1-thread-4 占用一个资源... [10:30:13]pool-1-thread-2 资源使用结束,释放资源 [10:30:13]pool-1-thread-5 占用一个资源... [10:30:14]pool-1-thread-3 资源使用结束,释放资源 [10:30:14]pool-1-thread-8 占用一个资源... [10:30:16]pool-1-thread-4 资源使用结束,释放资源 [10:30:16]pool-1-thread-6 占用一个资源... [10:30:18]pool-1-thread-5 资源使用结束,释放资源 [10:30:18]pool-1-thread-9 占用一个资源... [10:30:22]pool-1-thread-8 资源使用结束,释放资源 [10:30:22]pool-1-thread-7 占用一个资源... [10:30:22]pool-1-thread-6 资源使用结束,释放资源 [10:30:22]pool-1-thread-10 占用一个资源... [10:30:27]pool-1-thread-9 资源使用结束,释放资源 [10:30:29]pool-1-thread-7 资源使用结束,释放资源 [10:30:32]pool-1-thread-10 资源使用结束,释放资源
栅栏
栅栏用于等待其它线程,且会阻塞自己当前线程;所有线程必须同时到达栅栏位置后,才能继续执行;
举个例子如下:
package concurrency; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class CyclicBarrierTaskTest implements Runnable { private CyclicBarrier cyclicBarrier; private int timeout; public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) { this.cyclicBarrier = cyclicBarrier; this.timeout = timeout; } @Override public void run() { TestCyclicBarrier.print(" 正在running..."); try { TimeUnit.MILLISECONDS.sleep(timeout); TestCyclicBarrier.print(" 到达栅栏处,等待其它线程到达"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } TestCyclicBarrier.print(" 所有线程到达栅栏处,继续执行各自线程任务..."); } } public class TestCyclicBarrier { public static void print(String str) { SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss"); System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str); } public static void main(String[] args) { int count = 5; ExecutorService es = Executors.newFixedThreadPool(count); CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() { @Override public void run() { TestCyclicBarrier.print(" 所有线程到达栅栏处,可以在此做一些处理..."); } }); for (int i = 0; i < count; i++) es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000)); } }
[11:07:00]pool-1-thread-2 正在running... [11:07:00]pool-1-thread-1 正在running... [11:07:00]pool-1-thread-5 正在running... [11:07:00]pool-1-thread-3 正在running... [11:07:00]pool-1-thread-4 正在running... [11:07:01]pool-1-thread-1 到达栅栏处,等待其它线程到达 [11:07:02]pool-1-thread-2 到达栅栏处,等待其它线程到达 [11:07:03]pool-1-thread-3 到达栅栏处,等待其它线程到达 [11:07:04]pool-1-thread-4 到达栅栏处,等待其它线程到达 [11:07:05]pool-1-thread-5 到达栅栏处,等待其它线程到达 [11:07:05]pool-1-thread-5 所有线程到达栅栏处,可以在此做一些处理... [11:07:05]pool-1-thread-1 所有线程到达栅栏处,继续执行各自线程任务... [11:07:05]pool-1-thread-2 所有线程到达栅栏处,继续执行各自线程任务... [11:07:05]pool-1-thread-5 所有线程到达栅栏处,继续执行各自线程任务... [11:07:05]pool-1-thread-3 所有线程到达栅栏处,继续执行各自线程任务... [11:07:05]pool-1-thread-4 所有线程到达栅栏处,继续执行各自线程任务...
public
class CompliteServiceMain {
public static void main(String[] args) {
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService exec = Executors.newFixedThreadPool(processors);
ExecutorCompletionService<Integer> completionService =new ExecutorCompletionService<Integer>(exec);
final CyclicBarrier c=new CyclicBarrier(processors);
final long start = System.nanoTime();
for(int i=0;i<processors*2;i++)
{
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Random random = new Random();
int ran = random.nextInt(1000);
Thread.sleep(ran);
System.out.println(Thread.currentThread().getName()+" sleep "+ran+"ms");
c.await();
System.out.println(System.nanoTime()-start);
return ran;
}
});
}
for(int i=0;i<processors*2;i++)
{
try {
Integer res = completionService.take().get();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
exec.shutdown();
}
}public class CompliteServiceMain {
public static void main(String[] args) {
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService exec = Executors.newFixedThreadPool(processors);
ExecutorCompletionService<Integer> completionService =new ExecutorCompletionService<Integer>(exec);
final CyclicBarrier c=new CyclicBarrier(processors);
final long start = System.nanoTime();
for(int i=0;i<processors*2;i++)
{
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Random random = new Random();
int ran = random.nextInt(1000);
Thread.sleep(ran);
System.out.println(Thread.currentThread().getName()+" sleep "+ran+"ms");
c.await();
System.out.println(System.nanoTime()-start);
return ran;
}
});
}
for(int i=0;i<processors*2;i++)
{
try {
Integer res = completionService.take().get();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
exec.shutdown();
}
}
pool-1-thread-2 sleep 365ms pool-1-thread-1 sleep 777ms pool-1-thread-3 sleep 870ms pool-1-thread-4 sleep 859ms 988347215 988531958 988573323 988608157 859 365 777 870 pool-1-thread-3 sleep 847ms pool-1-thread-1 sleep 880ms pool-1-thread-4 sleep 897ms pool-1-thread-2 sleep 959ms 1947452526 1947466210 1947526858 847 880 1947500422 897 959
阻塞队列
阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;
下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...
如下代码:
package concurrency; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; //生产者 public class Producer implements Runnable { private final BlockingQueue<String> fileQueue; public Producer(BlockingQueue<String> queue) { this.fileQueue = queue; } public void run() { try { while (true) { TimeUnit.MILLISECONDS.sleep(1000); String produce = this.produce(); System.out.println(Thread.currentThread() + "生产:" + produce); fileQueue.put(produce); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public String produce() { SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss"); return dfdate.format(new Date()); } public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); for (int i = 0; i < 1; i++) { new Thread(new Producer(queue)).start(); } for (int i = 0; i < 6; i++) { new Thread(new Consumer(queue)).start(); } } } // 消费者 class Consumer implements Runnable { private final BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { try { while (true) { TimeUnit.MILLISECONDS.sleep(1000); System.out.println(Thread.currentThread() + "prepare 消费"); System.out.println(Thread.currentThread() + "starting:" + queue.take()); System.out.println(Thread.currentThread() + "end 消费"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
Thread[Thread-1,5,main]prepare 消费 Thread[Thread-3,5,main]prepare 消费 Thread[Thread-4,5,main]prepare 消费 Thread[Thread-2,5,main]prepare 消费 Thread[Thread-6,5,main]prepare 消费 Thread[Thread-5,5,main]prepare 消费 Thread[Thread-0,5,main]生产:11:36:36 Thread[Thread-1,5,main]starting:11:36:36 Thread[Thread-1,5,main]end 消费 Thread[Thread-1,5,main]prepare 消费 Thread[Thread-0,5,main]生产:11:36:37 Thread[Thread-4,5,main]starting:11:36:37 Thread[Thread-4,5,main]end 消费 Thread[Thread-4,5,main]prepare 消费 Thread[Thread-0,5,main]生产:11:36:38 Thread[Thread-3,5,main]starting:11:36:38 Thread[Thread-3,5,main]end 消费
相关文章推荐
- 线程同步工具类之栅栏
- 线程同步工具类之信号量(Semaphore)
- Java核心知识点学习----线程同步工具类,CyclicBarrier学习
- 线程同步工具类,CyclicBarrier学习
- 线程同步工具类--CyclicBarrier
- 第三章 Thread Synchronization Utilities(线程同步工具类)【上】
- 线程同步工具类
- java Collections工具类的 一些功能(查找,最值,乱序,逆序,排序,交换,替换,让集合线程同步)
- 线程同步工具类——CyclicBarrier
- 线程同步工具类——CountDownLatch
- Java线程知识__其他几种线程同步的工具类的使用(CyclicBarrier,CountDownLatch,Exchanger)
- 第三章 Thread Synchronization Utilities(线程同步工具类)【下】
- android里图片下载工具类AsyncImageLoader分析
- apache.commons.compress zip压缩解压工具类完整版(使用递归支持无限文件夹的解压缩)
- Android工具类之SharedPreferences统一管理类
- 日期工具类常用操作
- StringUtils工具类常用方法汇总(截取、去除空白、包含、查询索引)
- 线程同步(消费者与生产者)
- Java多线程:线程同步与关键字synchronized
- Android 图片加载工具类 Universal-Image-Loader 的封装和使用示例