java concurrent 包中并发类的Demo
2015-07-30 17:35
531 查看
并非纯原创,如有雷同纯属巧合。
![](http://img.blog.csdn.net/20150730173524066?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
Atomic(原子操作):
tools(并发工具):
CountDownLatch:
CyclicBarrier:
Exchange:
Semaphore:
Executor(线程池):
Queue(线程安全队列):
线程不安全队列就是 LinkedList
Lock(线程锁):
Atomic(原子操作):
package com.gaode.concurrent; import java.util.concurrent.atomic.AtomicInteger; /** * i++ 或者 i-- 原子操作的问题 */ public class TestAtomic extends Thread { //private static volatile int count = 0; private static AtomicInteger count = new AtomicInteger(0); private static final int times = Integer.MAX_VALUE; public static void main(String[] args) throws InterruptedException { long curTime = System.nanoTime(); Thread decThread = new DecThread(); decThread.start(); // 使用run()来运行结果为0,原因是单线程执行不会有线程安全问题 // new DecThread().run(); System.out.println("Start thread: " + Thread.currentThread() + " i++"); for (int i = 0; i < times; i++) { //count++; count.incrementAndGet(); } System.out.println("End thread: " + Thread.currentThread() + " i--"); decThread.join(); long duration = System.nanoTime() - curTime; /** * 如果 count 是 int 就没法保证 count 的结果为 0 */ System.out.println("Result: " + count); System.out.format("Duration: %.2fs\n", duration / 1.0e9); } private static class DecThread extends Thread { @Override public void run() { System.out.println("Start thread: " + Thread.currentThread() + " i--"); for (int i = 0; i < times; i++) { //count--; count.decrementAndGet(); } System.out.println("End thread: " + Thread.currentThread() + " i--"); } } }
tools(并发工具):
CountDownLatch:
package com.gaode.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。 * 比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch * 来实现这种功能了。 */ public class TestCountDownLatch { static int i = 6; static CountDownLatch countDownLatch = new CountDownLatch(i); public static void main(String[] args) { S s = new S(); s.start(); for (int i = 0; i < 8; i++) { T t = new T(); t.start(); } } static class T extends Thread { @Override public void run() { try { Thread.sleep(1000); /** * 将计数器减为0 */ countDownLatch.countDown(); long count = countDownLatch.getCount(); System.out.println(count); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有线程处理完毕,继续处理其他任务..."); } } static class S extends Thread { @Override public void run() { try { Thread.sleep(1000); /** * 当countDownLatch计数减为0时才会执行 */ countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("必须等待count为0的时候才会执行"); } } }
CyclicBarrier:
package com.gaode.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。 * 在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。 * 因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持 * 一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前), * 该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。 */ public class TestCyclicBarrier { /** * 没有当前个数的线程不会往下执行 */ static int i = 5; static CyclicBarrier barrier = new CyclicBarrier(i); public static void main(String[] args) { for (int i = 0; i < 5; i++) { T t = new T(); t.start(); } } static class T extends Thread { @Override public void run() { try { Thread.sleep(1000); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("所有线程处理完毕,继续处理其他任务..."); } } }
Exchange:
package com.gaode.concurrent; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 两个线程之间交互数据,多了没用只会有前两个线程交换其他线程执行都不会执行 */ public class TestExchange { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable() { @Override public void run() { try { String a = "A"; String b = (String) exchanger.exchange(a); System.out.println("线程 1 交换数据 a => ? :" + a + "=>" + b); } catch (Exception e) { e.printStackTrace(); } } }); service.execute(new Runnable() { @Override public void run() { try { String c = "C"; String b = (String) exchanger.exchange(c); System.out.println("线程 2 交换数据 c => ? :" + c + "=>" + b); } catch (Exception e) { e.printStackTrace(); } } }); service.execute(new Runnable() { @Override public void run() { try { String d = "D"; String b = (String) exchanger.exchange(d); System.out.println("线程 3 交换数据 d => ? :" + d + "=>" + b); } catch (Exception e) { e.printStackTrace(); } } }); } }
Semaphore:
package com.gaode.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * 使用一个计数器,用来限制线程的同时访问数量 PS:抢购神器 */ public class TestSemaphore { static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { Thread thread = new Thread() { @Override public void run() { try { System.out.println("获得一个计数器"); semaphore.acquire(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); System.out.println("释放一个计数器"); } }; service.execute(thread); } } }
Executor(线程池):
package com.gaode.concurrent; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; /** * 传说中的线程池而已,使用线程最好用他 */ public class TestExecutors extends Thread { ExecutorService service = Executors.newFixedThreadPool(1000); class Cl { public String getClassString() { return "抗压类啊"; } } public void execute() throws Exception { List<Callable<String>> callables = new LinkedList<>(); final int rnd = (int) (Math.random() * 100); for (int i = 0; i < 1000; i++) { Callable callable = new Callable() { @Override public String call() throws Exception { Cl cl = new Cl(); return cl.getClassString(); } }; callables.add(callable); } try { List<Future<String>> results = service.invokeAll(callables); String name = null; for (Future<String> future : results) { String cname = future.get(); if (name == null) { name = cname.intern(); } if (name != cname) { throw new Exception("报错啦!怪我咯!"); } } service.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public void run() { try { this.execute(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { int i = 10000; do { Thread thread = new TestExecutors(); thread.start(); thread.join(); i--; } while (i < 0); System.out.println("太狠了 1w个线程"); } }
Queue(线程安全队列):
线程不安全队列就是 LinkedList
package com.gaode.concurrent; import java.util.*; import java.util.concurrent.*; public class TestQueue { int count = 0; static Queue queue = new LinkedBlockingQueue<>(); static final int times = 100000; public void push() { for (int i = 0; i < times; i++) { queue.add(count++); } } public void get() { for (int i = 0; i < times; i++) { int c = (int) queue.remove(); System.out.println("Next:" + c); } } public static void main(String args[]) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(1000); List<Callable<String>> callables = new LinkedList<>(); for (int i = 0; i < 1000; i++) { Callable callable = new Callable() { @Override public String call() throws Exception { TestQueue testQueue = new TestQueue(); int var = (int) (Math.random() * 10); if (var >= 5) { testQueue.push(); } else { testQueue.get(); } return null; } }; callables.add(callable); } service.invokeAll(callables); System.out.println("完成了"); } }
Lock(线程锁):
package com.gaode.concurrent; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 生产者和消费者的问题,线程锁问题 */ public class TestReentrantLock { int count = 10; LinkedList<String> list = new LinkedList<>(); ReentrantLock reentrantLock = new ReentrantLock(); /** * 生产锁 */ Condition pushLock = reentrantLock.newCondition(); /** * 消费锁 */ Condition getLock = reentrantLock.newCondition(); public void push(String stg) { reentrantLock.lock(); /** * 最好用try 避免Exception 死锁 */ try { while (list.size() >= count) { System.out.println("List 已满!"); pushLock.await(); } list.add(stg); getLock.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } public String get() { reentrantLock.lock(); String stg = null; try { while (list.size() <= 0) { System.out.println("List 为空!"); getLock.await(); } stg = list.removeFirst(); pushLock.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } return stg; } public static void main(String args[]) { ExecutorService service = Executors.newFixedThreadPool(1000); List<Callable<String>> callables = new LinkedList<>(); for (int i = 0; i < 1000; i++) { Callable callable = new Callable() { @Override public String call() throws Exception { TestReentrantLock lock = new TestReentrantLock(); int var = (int) (Math.random() * 10); if (var >= 5) { for (int i = 0; i < 100; i++) { lock.push(i + ""); } } else { for (int i = 0; i < 100; i++) { String is = lock.get(); System.out.println(is); } } if (lock.list.size() < 0 || lock.list.size() > 100) { throw new Exception("报错说明线程锁没有控制住!"); } return null; } }; callables.add(callable); } try { List<Future<String>> results = service.invokeAll(callables); service.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关文章推荐
- JavaWeb学习总结——文件上传和下载
- java并发编程之volatile详解
- java里split(" {1,}")什么意思啊?
- Eclipse设置
- 《深入浅出struts2》--第十一章 持久层-DAO模式方式
- Eclipse安装aptana
- 关于java.lang.VerifyError的另外一种错误原因
- JAVA——各类型所占字节数
- Spring整合hibernate4:事务管理
- 图说Java —— 理解Java机制最受欢迎的8幅图
- Java回顾之Spring基础
- Java时间格式验证
- Java--数组
- Java复习第三天
- Java调用JNI的基本步骤(实现两数加法)
- Java学习笔记--------为什么不能这样用???
- spring中IOC是什么意思 个人觉得解释的非常有意思
- spring中IOC是什么意思 个人觉得解释的非常有意思
- spring中IOC是什么意思 个人觉得解释的非常有意思
- java通过当前日期得到本周的起止日期