您的位置:首页 > 编程语言 > Java开发

[Java 并发]Java多线程同步优化(三)

2016-05-26 16:44 489 查看
上次说到Java可以用最基本的
wait()
,
notifyAll()
,
synchronized
实现最基本的线程同步,那么今天看看其他的实现方法。

从Linux说起

互斥量

什么事互斥量:一个mutex,只有、持有这个mutex才可以去操作mutex对于的资源,但是这个非常容易引起死锁,所以就会有
pthread_mutex_trylock
函数的使用(是非阻塞的加锁函数,若加锁失败,则立即返回EBUSY。),来终止死锁四个条件的环路等待条件。下面2个函数就是具体的使用

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);


信号量

信号量机制,
sem_wait(sem_t *sem)
首先将对应的sem资源的数目减一,然后再去判断是否可用,如果可用就去使用,如果不行的话,就把自己加入到sem对应的阻塞队列里面去。然后
sem_post(sem_t *sem)
先对对应的sem资源的数目加一(即释放资源),然后唤醒其对应的阻塞队列。所以这个机制和Java原生态的进程同步相似。(补充一点:在操作系统的课上我们学到还有一个AND型信号量,用于多个资源的同时申请,也就是把若干个
wait()
语句当作一个原子操作一起执行)

sem_wait(sem_t *sem);
sem_post(sem_t *sem);


管道通信,在2个进程(一般是父进程和
fork()
出来的子进程)进行单项的数据通信。

消息队列,在Android里面有个
Handler + Looper + MessageQueue
实现多线程之间的通信。原理如下:生产者生产放在消息队列里面,然后消费者在消息队列里面取出来进行消费。

生产者-消费者队列

Java.util.concurrent.BlockingQueue
就提供了这样一个接口,给我们封装了消息队列,生产者与消费者持有同一个阻塞队列的引用,就可以对队列进行操作。

首先我们看看代码:

首先定义一个消息队列:用来传输消息。

package lesson_3;

import java.util.concurrent.LinkedBlockingQueue;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class BeanQueue extends LinkedBlockingQueue<Integer> {
}


然后消费者,生产者:

package lesson_3;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class Consumer implements Runnable {

BeanQueue mBeanQueue;

public Consumer(BeanQueue mBeanQueue) {
this.mBeanQueue = mBeanQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Thread.sleep(2000);
System.out.println("Get-> " + mBeanQueue.take()); //出队列
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


package lesson_3;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class Producer implements Runnable {

BeanQueue mBeanQueue;
static int count = 0;

public Producer(BeanQueue mBeanQueue) {
this.mBeanQueue = mBeanQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Thread.sleep(1000);
System.out.println("Put-> " + count);
mBeanQueue.put(count++); //入队列
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


然后Main:

package lesson_3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class Main {
public static void main(String args[]) {
BeanQueue mBeanQueue = new BeanQueue();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Producer(mBeanQueue));
service.execute(new Consumer(mBeanQueue));
}
}


运行一段时间,可以看到如下结果:

Put-> 0
Get-> 0
Put-> 1
Put-> 2
Get-> 1
Put-> 3
Put-> 4
Get-> 2
Put-> 5
Put-> 6
Get-> 3
Put-> 7
Put-> 8
Get-> 4
Put-> 9


下面来进行分析:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {


继承了
AbstractQueue
,所以可以用
put(),take()
来进行入队列和出队列。关于队列的方法可以参考这里

这样,相对于显式调用
wait() + notifyAll()
几个方法,每个类只和
BlockingQueue
进行交互,降低了类与类之间的耦合。

Java.util.concurrent
的其他并发工具类

CountDownLatch
:Latch是阀门的意思。这个类可以设置一个初始值,每调用一次
countdown()
这个值就会减一。直到0的时候才会调用
await()
,也就是上面说的信号量机制。

下面看看一个实例:

package lesson_4;

import java.util.concurrent.CountDownLatch;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class TaskPortion implements Runnable {

CountDownLatch mLatch;

public TaskPortion(CountDownLatch mLatch) {
this.mLatch = mLatch;
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
Thread.sleep(1000);
System.out.println("Consume-");
mLatch.countDown();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


package lesson_4;

import java.util.concurrent.CountDownLatch;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class WaitingTask implements Runnable {

CountDownLatch mLatch;

public WaitingTask(CountDownLatch mLatch) {
this.mLatch = mLatch;
}

@Override
public void run() {
try {
mLatch.await();
System.out.println("WaitingTask-");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


package lesson_4;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created on 2016/5/21.
*
* @author 王启航
* @version 1.0
*/
public class Main {
public static void main(String args[]) {
CountDownLatch mLatch = new CountDownLatch(5);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new TaskPortion(mLatch));
service.execute(new WaitingTask(mLatch));
}
}


结果如下:

Consume-
Consume-
Consume-
Consume-
Consume-
WaitingTask-


过程如下:首先
TaskPortion
WaitingTask
会一起并发执行.但是在
WaitingTask
await()
的会阻塞,所以在
TaskPortion
里面的
countdown()
减到0的时候,
WaitingTask
await()
之后的方法才会被调用。

那么有没有觉得这个类特别像
join()
函数???那么这个类比
join()
有什么好处呢?

- 直接由
Executor
进行控制,,避免了直接对线程进行操作而导致的一些问题

-
join()
只可以让一个线程进行等待,而
CountDownLatch
可以让多个线程进行等待,只需持有相同的引用即可。

CyclicBarrier


可以把其看做一个公共的屏障,所有的线程到达这个屏障之后,再去执行其他的操作,

所以在
CyclicBarrier
的构造函数里面传入一个
Runnable
,为到达公共屏障之后的动作。然后一个
CyclicBarrier#await()
表示到达公共屏障。

最后还有一些
BlockingQueue
的各种实现类,大家可以自己去看JavaDoc.

一些思考

有一个问题:主线程必须等待所有(大量)的子线程执行完毕后才可以执行,这个问题有哪些解决方案:

利用
join
,每个线程都去调用
join
方法

利用
CountDownLatch
,可以参考上文

利用线程池,
ExecutorService
的一个方法
boolean awaitTermination(long timeout, TimeUnit unit)
,即阻塞主线程,等待线程池的所有线程执行完成。另外,
while(!executor.isTerminated())
也可以替代上面的
while (!executor.awaitTermination(10,TimeUnit.SECONDS))
isTerminated
是用来判断线程池是否执行完成。但是二者比较我认为还是
awaitTermination
更好,它有一个超时时间可以控制每隔多久循环一次,而不是一直在循环来消耗性能。参考

利用
Callable
+
Future
Callable
类似于
Runnable
,但是
Callable
有返回值,返回一个
Future
对象,然后再去调用
Future
里面的方法。

// Callable + Future 简单用法,,,或者使用ExecutorService#submit来执行Callable
@org.junit.Test
public void test() {
Callable<Integer> callable = () -> new Random().nextInt(100);

FutureTask<Integer> future = new FutureTask<>(callable);
new Thread(future).start();

try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}


邮箱:1906362072@qq.com

本人大二,欢迎大家联系。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: