[Java 并发]Java多线程同步优化(三)
2016-05-26 16:44
489 查看
上次说到Java可以用最基本的
什么事互斥量:一个mutex,只有、持有这个mutex才可以去操作mutex对于的资源,但是这个非常容易引起死锁,所以就会有
信号量
信号量机制,
管道通信,在2个进程(一般是父进程和
消息队列,在Android里面有个
首先我们看看代码:
首先定义一个消息队列:用来传输消息。
然后消费者,生产者:
然后Main:
运行一段时间,可以看到如下结果:
下面来进行分析:
继承了
这样,相对于显式调用
下面看看一个实例:
结果如下:
过程如下:首先
那么有没有觉得这个类特别像
- 直接由
-
可以把其看做一个公共的屏障,所有的线程到达这个屏障之后,再去执行其他的操作,
所以在
最后还有一些
利用
利用
利用线程池,
利用
邮箱:1906362072@qq.com
本人大二,欢迎大家联系。。。
wait(),
notifyAll(),
synchronized实现最基本的线程同步,那么今天看看其他的实现方法。
从Linux说起
互斥量什么事互斥量:一个mutex,只有、持有这个mutex才可以去操作mutex对于的资源,但是这个非常容易引起死锁,所以就会有
pthread_mutex_trylock函数的使用(是非阻塞的加锁函数,若加锁失败,则立即返回EBUSY。),来终止死锁四个条件的环路等待条件。下面2个函数就是具体的使用
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
信号量
信号量机制,
sem_wait(sem_t *sem)首先将对应的sem资源的数目减一,然后再去判断是否可用,如果可用就去使用,如果不行的话,就把自己加入到sem对应的阻塞队列里面去。然后
sem_post(sem_t *sem)先对对应的sem资源的数目加一(即释放资源),然后唤醒其对应的阻塞队列。所以这个机制和Java原生态的进程同步相似。(补充一点:在操作系统的课上我们学到还有一个AND型信号量,用于多个资源的同时申请,也就是把若干个
wait()语句当作一个原子操作一起执行)
sem_wait(sem_t *sem); sem_post(sem_t *sem);
管道通信,在2个进程(一般是父进程和
fork()出来的子进程)进行单项的数据通信。
消息队列,在Android里面有个
Handler + Looper + MessageQueue实现多线程之间的通信。原理如下:生产者生产放在消息队列里面,然后消费者在消息队列里面取出来进行消费。
生产者-消费者队列
在Java.util.concurrent.BlockingQueue就提供了这样一个接口,给我们封装了消息队列,生产者与消费者持有同一个阻塞队列的引用,就可以对队列进行操作。
首先我们看看代码:
首先定义一个消息队列:用来传输消息。
package lesson_3; import java.util.concurrent.LinkedBlockingQueue; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class BeanQueue extends LinkedBlockingQueue<Integer> { }
然后消费者,生产者:
package lesson_3; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class Consumer implements Runnable { BeanQueue mBeanQueue; public Consumer(BeanQueue mBeanQueue) { this.mBeanQueue = mBeanQueue; } @Override public void run() { try { while (!Thread.interrupted()) { Thread.sleep(2000); System.out.println("Get-> " + mBeanQueue.take()); //出队列 } } catch (InterruptedException e) { e.printStackTrace(); } } }
package lesson_3; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class Producer implements Runnable { BeanQueue mBeanQueue; static int count = 0; public Producer(BeanQueue mBeanQueue) { this.mBeanQueue = mBeanQueue; } @Override public void run() { try { while (!Thread.interrupted()) { Thread.sleep(1000); System.out.println("Put-> " + count); mBeanQueue.put(count++); //入队列 } } catch (InterruptedException e) { e.printStackTrace(); } } }
然后Main:
package lesson_3; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class Main { public static void main(String args[]) { BeanQueue mBeanQueue = new BeanQueue(); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new Producer(mBeanQueue)); service.execute(new Consumer(mBeanQueue)); } }
运行一段时间,可以看到如下结果:
Put-> 0 Get-> 0 Put-> 1 Put-> 2 Get-> 1 Put-> 3 Put-> 4 Get-> 2 Put-> 5 Put-> 6 Get-> 3 Put-> 7 Put-> 8 Get-> 4 Put-> 9
下面来进行分析:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
继承了
AbstractQueue,所以可以用
put(),take()来进行入队列和出队列。关于队列的方法可以参考这里
这样,相对于显式调用
wait() + notifyAll()几个方法,每个类只和
BlockingQueue进行交互,降低了类与类之间的耦合。
Java.util.concurrent
的其他并发工具类
CountDownLatch:Latch是阀门的意思。这个类可以设置一个初始值,每调用一次
countdown()这个值就会减一。直到0的时候才会调用
await(),也就是上面说的信号量机制。
下面看看一个实例:
package lesson_4; import java.util.concurrent.CountDownLatch; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class TaskPortion implements Runnable { CountDownLatch mLatch; public TaskPortion(CountDownLatch mLatch) { this.mLatch = mLatch; } @Override public void run() { while (!Thread.interrupted()) { try { Thread.sleep(1000); System.out.println("Consume-"); mLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package lesson_4; import java.util.concurrent.CountDownLatch; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class WaitingTask implements Runnable { CountDownLatch mLatch; public WaitingTask(CountDownLatch mLatch) { this.mLatch = mLatch; } @Override public void run() { try { mLatch.await(); System.out.println("WaitingTask-"); } catch (InterruptedException e) { e.printStackTrace(); } } }
package lesson_4; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created on 2016/5/21. * * @author 王启航 * @version 1.0 */ public class Main { public static void main(String args[]) { CountDownLatch mLatch = new CountDownLatch(5); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new TaskPortion(mLatch)); service.execute(new WaitingTask(mLatch)); } }
结果如下:
Consume- Consume- Consume- Consume- Consume- WaitingTask-
过程如下:首先
TaskPortion与
WaitingTask会一起并发执行.但是在
WaitingTask的
await()的会阻塞,所以在
TaskPortion里面的
countdown()减到0的时候,
WaitingTask的
await()之后的方法才会被调用。
那么有没有觉得这个类特别像
join()函数???那么这个类比
join()有什么好处呢?
- 直接由
Executor进行控制,,避免了直接对线程进行操作而导致的一些问题
-
join()只可以让一个线程进行等待,而
CountDownLatch可以让多个线程进行等待,只需持有相同的引用即可。
CyclicBarrier
可以把其看做一个公共的屏障,所有的线程到达这个屏障之后,再去执行其他的操作,
所以在
CyclicBarrier的构造函数里面传入一个
Runnable,为到达公共屏障之后的动作。然后一个
CyclicBarrier#await()表示到达公共屏障。
最后还有一些
BlockingQueue的各种实现类,大家可以自己去看JavaDoc.
一些思考
有一个问题:主线程必须等待所有(大量)的子线程执行完毕后才可以执行,这个问题有哪些解决方案:利用
join,每个线程都去调用
join方法
利用
CountDownLatch,可以参考上文
利用线程池,
ExecutorService的一个方法
boolean awaitTermination(long timeout, TimeUnit unit),即阻塞主线程,等待线程池的所有线程执行完成。另外,
while(!executor.isTerminated())也可以替代上面的
while (!executor.awaitTermination(10,TimeUnit.SECONDS)),
isTerminated是用来判断线程池是否执行完成。但是二者比较我认为还是
awaitTermination更好,它有一个超时时间可以控制每隔多久循环一次,而不是一直在循环来消耗性能。参考
利用
Callable+
Future,
Callable类似于
Runnable,但是
Callable有返回值,返回一个
Future对象,然后再去调用
Future里面的方法。
// Callable + Future 简单用法,,,或者使用ExecutorService#submit来执行Callable @org.junit.Test public void test() { Callable<Integer> callable = () -> new Random().nextInt(100); FutureTask<Integer> future = new FutureTask<>(callable); new Thread(future).start(); try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
邮箱:1906362072@qq.com
本人大二,欢迎大家联系。。。
相关文章推荐
- 杨辉三角形
- Spring3.2.11与Quartz2.2.1整合时内存泄漏问题解决
- 一起来学Android Studio:(四)导入eclipse项目
- javaWeb 文件上传功能
- Java中@Override的作用
- spring事务传播行为种类
- 方法一 Java如何判断线程池所有任务是否执行完毕
- Java中异常的总结
- MyEclipse8.5破解方法
- eclipse快捷键以及使用技巧大全
- springmvc注解式事务手动回滚
- Javascirpt中创建对象的几种方式
- Java发送邮件工具类
- JVM内存模型图解
- Java---静态代理VS动态代理
- Spring Security(20)——整合Cas
- SpringMVC的Interceptor
- Spring Security(19)——对Acl的支持
- java.lang.NoClassDefFoundError: org/hibernate/engine/SessionFactoryImplementor
- java责任链模式1