您的位置:首页 > 编程语言 > Java开发

关于Java多线程和并发运行的学习(四)——之Exchanger

2014-09-14 13:53 309 查看
Exchanger:

可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给
exchange
方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为
SynchronousQueue
的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

可以用于实现两个人之间的数据交换,每个人在完成一定的事务后想要与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据。

用法实例:以下是重点介绍的一个类,该类使用
Exchanger
在线程间交换缓冲区,因此,在需要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程:

class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...

class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}

class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}

void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}


内存一致性效果:对于通过
Exchanger
成功交换对象的每对线程,在每个线程
exchange()
之前的操作
happen-before 从另一线程中相应的
exchange()
返回 的后续操作。

(这句话的英文原文:Memory consistency effects: For each pair of threads that successfully exchange objects via an
Exchanger
, actions prior to the
exchange()
in each thread
happen-before those subsequent to a return from the corresponding
exchange()
in the other thread. 翻译惨不忍睹,看不懂的看英文琢磨吧。也是蛮无奈的。)

在这里实现一个小案例:

package Multithreading;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable() {
public void run() {
try {

String data1 = "zxx";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
Thread.sleep((long) (Math.random() * 10000));
/*
* 等待另一个线程到达此交换点(除非当前线程被中断),
* 然后将给定的对象传送给该线程,并接收该线程的对象。
*/
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {

}
}
});
service.execute(new Runnable() {
public void run() {
try {

String data1 = "lhm";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
Thread.sleep((long) (Math.random() * 10000));
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {

}
}
});
}
}


运行结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: