您的位置:首页 > 其它

消费者和生产者问题的实现-基于线程安全的容器来和非线程安全的容器类

2017-10-27 20:36 525 查看

生产者和消费者的实现的大概思路

缓冲区的接口

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public interface Plate {
final int PLATE_SIZE=10;
ReentrantLock reentrantLock=new ReentrantLock();
Condition notEmpty=reentrantLock.newCondition();
Condition notFull=reentrantLock.newCondition();
void takeApple();
void putApple();

}


非线程安全的容器类,以list为例

import java.util.ArrayList;
import java.util.List;

/**
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public class NoSafePlate implements Plate {

List<Integer>  plateList=new ArrayList<Integer>();
@Override
public void takeApple(){
reentrantLock.lock();
if(plateList.size()==0){
try {
System.out.println(Thread.currentThread()+"other consumer wait");
notEmpty.await();//阻塞消费进程
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
plateList.remove(0);
System.out.println(Thread.currentThread()+"consumer :"+plateList.size()+" apple");
notFull.signalAll();//通知生产线程可以放苹果
}
reentrantLock.unlock();
}
@Override
public void putApple(){
reentrantLock.lock();
if(plateList.size()==PLATE_SIZE){
try {
System.out.println(Thread.currentThread()+"other producer wait");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
plateList.add(1);
System.out.println(Thread.currentThread()+" producer :"+plateList.size()+" apple");
notEmpty.signalAll();
}
reentrantLock.unlock();
}

}


线程安全的容器类

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public class SafePlate implements Plate{
//一下的方法虽然生成的list是线程安全的list,也只是仅仅局限于原子操作的方法,在多线程的条件下,这个list也有可能会被修改
List<Integer> plateList= Collections.synchronizedList(new ArrayList<Integer>());
@Override
public  void takeApple(){
synchronized (plateList) {
while (plateList.size()==0){
System.out.println(Thread.currentThread()+"consumer wait");
try {
plateList.wait();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
plateList.remove(0);
System.out.println(Thread.currentThread() + "consumer : " + plateList.size());
plateList.notifyAll();
}
}
@Override
public  void putApple(){
synchronized (plateList) {
while (plateList.size()==PLATE_SIZE){
System.out.println(Thread.currentThread()+"producer wait");
try {
plateList.wait();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
plateList.add(1);
System.out.println(Thread.currentThread() + "producer : " + plateList.size());
plateList.notifyAll();
}
}

}


生产者

import java.util.Random;
/**
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public class Producer implements Runnable {
private Plate plate;

Random r=new Random(10);
public Producer(NoSafePlate noSafePlate) {
this.plate = noSafePlate;
}

public Producer(SafePlate safePlate) {
this.plate = safePlate;
}

@Override
public  void run() {
while (true) {
plate.putApple();
}
}

}


消费者

/**
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public class Consumer implements Runnable {

private  Plate plate;
public Consumer(NoSafePlate noSafePlate) {
this.plate = noSafePlate;
}
public Consumer(SafePlate safePlate) {
this.plate = safePlate;
}
@Override
public void run() {
//消费者先等待一段时间,等待生产者生产了部分之后再开始消费
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
plate.takeApple();
}
}

}


容器安全的测试类

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Copyright 2010-2020  OPPO Mobile Comm Corp., Ltd.
* All rights reserved.
* Created by zhaideyin on 2017/10/27.
* Description:
*/
public class SafePlateTest {
public static void main(String[] args) throws InterruptedException {
SafePlate safePlate=new SafePlate();
ExecutorService service= Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
service.submit(new Producer(safePlate));
}
for(int i=0;i<5;i++){
service.submit(new Consumer(safePlate));
}
Thread.sleep(10*1000);
service.shutdown();

}

}


运行结果



线程不安全的容器类的测试类

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerAndProducerTest {
public static void main(String[] args) throws InterruptedException{
NoSafePlate noSafePlate =new NoSafePlate();
ExecutorService service= Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
service.submit(new Producer(noSafePlate));
}
for(int i=0;i<5;i++){
service.submit(new Consumer(noSafePlate));
}
Thread.sleep(10*1000);
service.shutdown();

}
}


运行结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: