您的位置:首页 > 其它

五、线程通信

2015-10-20 20:59 387 查看
需求:

    要有取款者和存款者,二者不断的进行存款、取款动作。当存款者存入指定账户时,取钱者立即取出这笔钱,不允许存款者连续2次存款,也不允许取款者2次连续取钱。

思路:

设置旗标标示账户中是否有存款,flag=false即为没有
当存款者存钱后,flag=true,并调用notify()或者notifyAll()来唤醒其他线程
当存款者进入线程体后如果旗标为true则wait()等待
当flag=true时取款者可以取钱,取完后flag=false,并调用notify()或者notifyAll()来唤醒其他线程
当取款者进入线程体后如果旗标为false则wait()等待

传统的线程通信

新建Account类并提供draw()和deposit()方法分别对应取钱和存款操作
/*
* Creation : 2015年10月15日
*/
package com.tan.thread.bank.notify;

public class Account {

private String accountNo;

private double balance;

private boolean flag = false; // 设置初始值为false表示么有钱

public Account(String accountNo, double balance) {
super();
this.accountNo = accountNo;
this.balance = balance;
}

public String getAccountNo() {
return accountNo;
}

public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}

public double getBalance() {
return balance;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account account = (Account) obj;
return account.getAccountNo().equals(accountNo);
}

return false;
}

@Override
public int hashCode() {
return this.accountNo.hashCode();
}

/**
* Draw.
*
* @param drawAmount the draw account
* @throws InterruptedException
*/
public synchronized void draw(double drawAmount) {
try {
if (!flag) {
wait();
} else {
System.out.println(Thread.currentThread().getName() + " 开始取钱:" + drawAmount);
balance -= drawAmount;
System.out.println("取款后账户余额为:" + balance);

flag = false; // 账户没有钱了
notifyAll();// 通知其他线程存钱
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* Deposit.
*
* @param depositAmount the deposit amount
*/
public synchronized void deposit(double depositAmount) {
try {
// 如果flag为true,表示已经有钱存进去了,则存钱方法阻塞...等待取钱
if (flag) {
wait();
} else {
System.out.println(Thread.currentThread().getName() + " 开始存钱:" + depositAmount);
balance += depositAmount;
System.out.println("存钱后的账户余额为:" + balance);

flag = true; // 表示账户已经有钱
notifyAll(); // 通知其他线程取钱(包含存款线程,只不过再次执行deposit方法是flag为true会被阻塞,这时候只有执行draw方法才可以继续向下执行)

}
} catch (Exception e) {

}

}

}


取款线程,连续模拟取100次
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify;

/**
* 取款者线程循环100次存款操作
*/
public class DrawThread extends Thread {
private Account account; // 模拟用户账户
private double drawAmount; // 当前希望取的钱数

public DrawThread(String name, Account account, double drawAmount) {
super(name);
this.account = account;
this.drawAmount = drawAmount;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.draw(drawAmount);
}
}
}


存款线程,连续模拟存100次
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify;

/**
* 存款者线程循环100次存款操作
*/
public class DepositThread extends Thread {
private Account account; // 模拟用户账户
private double depositAmount; // 当前希望存款钱数

public DepositThread(String name, Account account, double depositAmount) {
super(name);
this.account = account;
this.depositAmount = depositAmount;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.deposit(depositAmount);
}
}
}


主测试类
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify;

/**
* 现象:可以看到存款者线程和取款者线程交替执行的情况,每当存款者存入800块钱后取款者线程立即取出这笔钱
*
* 程序最后被阻塞:因为有3个存款者线程需要300次存款操作,而只有2个取钱线程只需要200次,故而程序被阻塞。
*
* 注意:阻塞并不是死锁,对于以下情况存款者线程已经执行结束,它只是在等带取款者线程取款而已,可是取款者线程执行200次就结束了,没有人取款,它只能等待,
*     此时flag=true,即使它自己没有执行完也只能阻塞。
*
*/
public class MainTest {
public static void main(String[] args) {
Account account = new Account("0001", 0);
new DrawThread("取款者甲", account, 800).start();
new DrawThread("取款者乙", account, 800).start();
new DepositThread("存款者B", account, 800).start();
new DepositThread("存款者C", account, 800).start();
new DepositThread("存款者D", account, 800).start();
}
}
//...
//存款者B 开始存钱:800.0
//存钱后的账户余额为:800.0
//取款者乙 开始取钱:800.0
//取款后账户余额为:0.0
//存款者D 开始存钱:800.0
//存钱后的账户余额为:800.0
//取款者甲 开始取钱:800.0
//取款后账户余额为:0.0
//存款者B 开始存钱:800.0
//存钱后的账户余额为:800.0


使用Condition控制线程通信

      Condition

           |--await()          类似于wait() ,使当前线程处于等待状态

           |--signal() :       唤醒在次Lock对象上等待的单个线程

           |--signalAll() :   唤醒所有处于等待的线程

Account类
/*
* Creation : 2015年10月15日
*/
package com.tan.thread.bank.notify.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 使用情况: 如果程序不适用synchronized关键字来保证同步,则系统中就不存在隐士的同步监视器,就不能使用wait,notify,notifyAll方法来进行线程通信。
* Lock-》替代了同步方法或者同步代码块
* Condition-》替代了同步监视器的功能 异同点:
* 相同:和前面的那个逻辑基本相似 不同:显示的使用了lock对象来充当同步监视器,需要使用Condition对象暂停,唤醒指定线程
*/
public class Account {
// 显示定义lock对象
private final Lock lock = new ReentrantLock();
// 获得指定的lock对象对应的condition
private final Condition condition = lock.newCondition();

// 账号
private String accountNo;
// 余额
private double balance;

private boolean flag = false; // 设置初始值为false表示么有钱

public Account() {
}

public Account(String accountNo, double balance) {
super();
this.accountNo = accountNo;
this.balance = balance;
}

public String getAccountNo() {
return accountNo;
}

public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}

public double getBalance() {
return balance;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account account = (Account) obj;
return account.getAccountNo().equals(accountNo);
}

return false;
}

@Override
public int hashCode() {
return this.accountNo.hashCode();
}

/**
* Draw.
*
* @param drawAmount the draw amount
*/
public void draw(double drawAmount) {
lock.lock();
try {
if (!flag) {
condition.await(); // ★--》类似隐式同步监视器的wait方法
} else {
System.out.println(Thread.currentThread().getName() + " 开始取钱:" + drawAmount);
balance -= drawAmount;
System.out.println("取款后账户余额为:" + balance);

flag = false; // 账户没有钱了
condition.signalAll();// ★--》类似notifyAll();

}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* Deposit.
*
* @param depositAmount the deposit amount
*/
public synchronized void deposit(double depositAmount) {
lock.lock();
try {
// 如果flag为true,表示已经有钱存进去了,则存钱方法阻塞...等待取钱
if (flag) {
condition.await(); // ★--》类似隐式同步监视器的wait方法
} else {
System.out.println(Thread.currentThread().getName() + " 开始存钱:" + depositAmount);
balance += depositAmount;
System.out.println("存钱后的账户余额为:" + balance);

flag = true; // 表示账户已经有钱
condition.signalAll();// ★--》类似notifyAll();

}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

}


取钱线程
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify.condition;

/**
* 取款者线程循环100次存款操作
*/
public class DrawThread extends Thread {
private Account account; // 模拟用户账户
private double drawAmount; // 当前希望取的钱数

public DrawThread(String name, Account account, double drawAmount) {
super(name);
this.account = account;
this.drawAmount = drawAmount;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.draw(drawAmount);
}
}
}


存钱线程
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify.condition;

/**
* 存款者线程循环100次存款操作
*/
public class DepositThread extends Thread {
private Account account; // 模拟用户账户
private double depositAmount; // 当前希望存款钱数

public DepositThread(String name, Account account, double depositAmount) {
super(name);
this.account = account;
this.depositAmount = depositAmount;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.deposit(depositAmount);
}
}
}


主测试类
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.bank.notify.condition;

public class MainTest {
public static void main(String[] args) {
Account account = new Account("0001", 0);
new DrawThread("取款者甲", account, 800).start();
new DrawThread("取款者已", account, 800).start();

new DepositThread("存款者B", account, 800).start();
new DepositThread("存款者C", account, 800).start();
new DepositThread("存款者D", account, 800).start();
}
}


结论

该测试类与上面一个的运行效果完全一样,2个程序的逻辑基本相似,只是显示的使用Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒指定线程。



使用阻塞队列

BlockingQeueTest
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.BlockQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* 介绍:BlockingQueue 有很多的实现类,
* 比如:DelayQueue|LinkedBlockingQueue|ArrayBlockingQueue|SynchronousQueue|PriorityQueue|TransferQueue等
* 目的:以ArrayBlockingQueue为例来介绍阻塞队列的功能和用法
*/
public class BlockQueueTest {
public static void main(String[] args) throws InterruptedException {
// 定义一个长度为2的阻塞队列
BlockingQueue<String> bq = new ArrayBlockingQueue<String>(2);
bq.put("java");
bq.put("java");
System.out.println("队列已经满了,阻塞中...");
bq.put("java"); // 阻塞线程,放不进去了,队列已满

bq.clear();
// System.out.println("队列已经清空的清空下取也会阻塞....");
// System.out.println(bq.take()); //阻塞

// System.out.println(bq.remove());//Exception in thread "main" java.util.NoSuchElementException

// System.out.println(bq.poll()); //null

}

}


使用阻塞队列进行线程通信
/*
* Creation : 2015年10月17日
*/
package com.tan.thread.BlockQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* 目的:使用BlockingQueue来进行线程通信 现象:启动了三个生产者线程和一个消费者线程,本程序集合容量为1,
* 因此3个生产者线程无法连续放入元素必须等消费者取出元素后三个生产者线程中的一个才可以放入一个元素
* 结果:三个生产者线程都想向队列中放入元素,但是只要一个线程向该队列中放入元素后其他生产者线程必须等待,等待消费者线程取出里面的元素
*/
public class BlockingQueueTest2 {
public static void main(String[] args) {
// 定义一个容量为1的阻塞队列
BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
// 启动三个生产者线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
// 启动一个消费者线程
new Consumer(bq).start();

}
}

/**
* The Class Producer.
*/
class Producer extends Thread {
private BlockingQueue<String> bq;

public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}

@Override
public void run() {
String strArr[] = new String[] { "Spring", "Struts2", "Hibernate", "AngularJs" };
for (int i = 0; i < 100; i++) {
System.out.println(getName() + " 生产者准备生产集合元素!");
try {
Thread.sleep(200);
bq.put(strArr[i % 3]); // 尝试放入元素,如果队列已经满了则线程被阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " 生产完成:" + bq);
}
}
}

/**
* The Class Consumer.
*/
class Consumer extends Thread {
private BlockingQueue<String> bq;

public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}

@Override
public void run() {
while (true) {
System.out.println(getName() + " 消费者准备消费集合元素!");
try {
Thread.sleep(200);// 休眠等待生产者生产元素
bq.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " 消费完成:" + bq);
}
}

}

// 运行结果:
// Thread-0 生产者准备生产集合元素! ----无法放入元素
// Thread-2 生产者准备生产集合元素! ----无法放入元素
// Thread-1 生产者准备生产集合元素! ----无法放入元素
// Thread-3 消费者准备消费集合元素! ----消费者说我要买书,没说买什么
// Thread-1 生产完成:[Spring] ----生产者1赶紧生产出来Spring书籍给他
// Thread-3 消费完成:[] ----消费者消费完成!

// Thread-3 消费者准备消费集合元素! ----消费者说我还要买书
// Thread-0 生产完成:[Spring] ----生产者0赶紧生产出来Spring书籍
// Thread-0 生产者准备生产集合元素! ----队列已满,不能再生产
// Thread-1 生产者准备生产集合元素! ----队列已满,不能再生产
// Thread-3 消费完成:[Struts2] ----消费者
// Thread-3 消费者准备消费集合元素!
// Thread-1 生产完成:[Struts2]
// Thread-1 生产者准备生产集合元素!
// Thread-3 消费完成:[]
// Thread-0 生产完成:[Struts2]
// Thread-0 生产者准备生产集合元素!
// Thread-3 消费者准备消费集合元素!
// Thread-3 消费完成:[]
// Thread-2 生产完成:[Spring]
// Thread-2 生产者准备生产集合元素!
// Thread-3 消费者准备消费集合元素!
// Thread-3 消费完成:[Struts2]
// Thread-3 消费者准备消费集合元素!
// Thread-2 生产完成:[Struts2]
// Thread-2 生产者准备生产集合元素!
// Thread-3 消费完成:[Hibernate]
//
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: