您的位置:首页 > 其它

关于生产者消费者排序问题

2013-01-10 14:56 162 查看
首先说一个题目,以这个题目来分析生产者消费者问题以及涉及到得io流于排序

题目:生产者从输入文件source.txt中读取字符串信息并放入缓冲区,消费者从缓冲区中获取信息、按题中要求的排序规则排序,将排序后的字符串输出到结果输出文件result.txt中。

要求:

(1) 排序要求:45,3,12,887,35,454,排列以后是887,3,454,12,45,35(就是以一大一小排列)

(2)       生产者线程从文件中读取字符串数字并存入缓冲区,缓冲区(Buffer)中允许的最大数字数为5个。
(3)       消费者线程从缓冲区中读取信息,并按题中要求的排序规则排序。最后请在文件中输出排序的结果并结束线程。例如以上例子的排序后结果为:
887,3, 454, 12, 45, 35
看完这个题目我们来分析一下
首先这个题目可以分为三部分:
        1、生产者和消费者模型;
           2、文件io流
        3、数组的排序
先看生产者消费者模型:
    生产者消费者,顾名思义就是你来生产我来消费,但有个前提就是有东西让我来消费,也就是说先生产后消费。这个地方就有几种方式来实现了:
          1、wait()方法和notify()方法;
          2、await()和signal()也就是Condition类;
          3、就是阻塞队列BlcokingQueue,put(Object o)和take();
       下面就把这三种方法都写一个小实例
      wait()方法和notify()方法:
package com.isoftstone.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class Pro_Con_test {

private  List<Integer> kep = new ArrayList<Integer>(5);
private static final int MAX = 5;

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
final Pro_Con_test ptt =  new Pro_Con_test();
new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
ptt.put();
}
}
}).start();

new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
ptt.pop();
}
}
}).start();
}

public synchronized void put()
{
try {
if(MAX == kep.size())
{
System.out.println("kep is full!");
this.wait();
}
Integer i = new Random().nextInt();
System.out.println("produce : " + i);
kep.add(i);
Thread.sleep(300);
this.notify();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public synchronized void pop()
{
try {
if(0 == kep.size())
{
System.out.println("kep is empty");
this.wait();
}
Integer i = kep.remove(kep.size() - 1);
System.out.println("consume : " + i);
this.notify();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}


await()和signal()也就是Condition类
 
package com.isoftstone.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Pro_Cus {

private final Lock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();
private List<Object> kep = new ArrayList<Object>(10);
private static final int MAX = 10;

private void start()
{
Thread pro = new Thread(new Producer());
Thread cus = new Thread(new Customer());
pro.setName("Producer");
cus.setName("Customer");
pro.start();
cus.start();
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
Pro_Cus pc = new Pro_Cus();
pc.start();

}

class Producer implements Runnable
{
@Override
public void run()
{
while(true)
{
lock.lock();
try
{
if(MAX == kep.size())
{
System.out.println("kep is full!!!!!! Please wait!!");
full.await();
}
Object o = new Object();
if(kep.add(o))
{
System.out.println("Producer is :" + o);
empty.signal();
}

} catch (Exception e)
{
// TODO: handle exception
e.printStackTrace();
}
finally
{
lock.unlock();
}
}
}
}

class Customer implements Runnable
{

@Override
public void run() {
// TODO Auto-generated method stub
while(true)
{
lock.lock();
try
{
if(0 == kep.size())
{
System.out.println("kep is empty!!!!!! Please wait");
empty.await();
}
Object o = kep.remove(kep.size() - 1);
System.out.println("Customer is :" + o);
full.signal();
}
catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
finally
{
lock.unlock();
}
}
}

}

}


阻塞队列BlcokingQueue,put(Object o)和take()
package com.isoftstone.Thread;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Pro_Con_Queue {

/**
* @param args
*/
public static void main(String[] args) {
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5);
// TODO Auto-generated method stub
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
Integer i = (int) new Random().nextInt();
System.out.println("Produce : " + i);
queue.put(i);
//Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
Integer i = queue.take();
System.out.println("Consume : " + i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();

}

}


这就是三种方式的生产者消费者模型的实现,第一种也就是最常用的就不细说了。
第二种也是我最喜欢用的Condition条件阻塞,他是jdk1.5以后新增加的,实现了线程的等待与唤醒与Object类中wait()和notify()相对应,不过我感觉Condition更直接明了更符合我们的逻辑,大大简化了代码的复杂度,更通俗易懂。个人介意用这一种
第三中队列阻塞,就相当于一个加锁的篮子一样,也可以指定篮子的大小,只有篮子有东西的时候才能去取东西,用起来也很方便,根据具体情况具体使用,不过他配合线程池的使用是一大特色。
 



接下来说说文件io流
        文件io流在平时用处是很大的,简单的说就是从文件中读取有用的信息,把信息记录在文件中,自然也就分了两个大类:输入流和输出流(读出流和写入流),这两者的分类是以程序的角度出发的,而从写和读的内容格式来说他又分成了字节流和字符流,他们的读取很原始,一般都要加一点修饰,就像没加修饰以前使用小水管来用的很慢也很损坏磁盘,修饰以后就相当于用了一个带嘴的小桶,先把他装满在读取。同时java也封装了很多功能性的io流,很方便。、
 输入流: IntputStream    Reader
 输出流:  OutputStream  Writer
注意文件流资源的释放,一般建议在finally中显示释放
看一个小例子:
package com.isoftstone.io;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;

public class IOTest
{

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
File rfile = new File("d:\\soucer.txt");
readFile(rfile);
File wfile = new File("d:\\result.txt");
writeFile(wfile);

}

public static void readFile(File file)
{
//File file = new File("d:\\soucer.txt");
BufferedReader br = null;
String line = "";
try
{
br = new BufferedReader(new FileReader(file));
while((line = br.readLine()) != null)
{
System.out.println(line);
}
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
try
{
if(br != null)
{
br.close();
br = null;
}
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public static void writeFile(File file)
{
//File file = new File("d:\\result.txt");
FileOutputStream fos = null;
try
{
fos = new FileOutputStream(file);
String str = "hehehhehehe";
fos.write(str.getBytes());
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
try
{
if(fos != null)
{
fos.close();
fos = null;
}
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}


排序就不说了,也就是升序降序交替排序(有好的算法给我们学习一下!谢谢)
现在回到原题中:
        准备知识充分了就应该组合了,这个题中应该有一个缓冲区来共享信息
 下面就是我写的程序请大神指点:
 
package com.isoftstone.test;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ProduceConsumeSort
{
/**
* @param args
*/
public static void main(String[] args)
{

Share share = new Share();
File in = new File("D:\\1\\source.txt");
File out = new File("D:\\1\\result.txt");
Produce reader = new Produce(share, in);
Consume writer = new Consume(share, out);
reader.start();
writer.start();

}
}

//缓冲共享区
class Share
{
private List<String> contents = new ArrayList<String>(5);
private boolean isEnd = false;

public boolean isEnd()
{
return isEnd;
}

public void setEnd(boolean isEnd)
{
this.isEnd = isEnd;
}

/**
* 取出缓冲中的数据
* @return String 取出的数据
*/
public synchronized String get()
{
while (!isEnd && contents.size() == 0)
{
try
{
wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
String line = null;
synchronized (contents)
{
if (contents.size() == 0)
{
return null;
}
line = contents.remove(0);
}
notify();
return line;
}

/**
* 在缓冲区中放入数据
* @param value(String) 不能为空,要放入缓冲的数据
*/
public synchronized void put(String value)
{
while (contents.size() >= 5)
{
try
{
wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
notify();
contents.add(value);
}
}

/**
* 生产者线程,控制从指定文件中读取数据并放入缓冲区
* @author wWX161568
*
*/
class Produce extends Thread
{
private Share shared;
private File file;

public Produce(Share shared, File file)
{
this.shared = shared;
this.file = file;
}

public File getFile()
{
return file;
}

public void setFile(File file)
{
this.file = file;
}

@Override
public void run()
{
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
while (true)
{
String line = reader.readLine();
if (line == null)
{
shared.setEnd(true);
break;
}
System.out.println("read line : " + line);
shared.put(line);
}
} catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
} finally
{
if (reader != null) {
try
{
reader.close();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
}
}

/**
* 消费者线程,控制从缓冲区中取出数据,并排序后写入指定的文件
* @author wWX161568
*
*/
class Consume extends Thread
{
private Share shared;
private File file;

public Consume(Share shared, File file)
{
this.shared = shared;
this.file = file;
}

public File getFile()
{
return file;
}

public void setFile(File file)
{
this.file = file;
}

@Override
public void run()
{
BufferedWriter writer = null;
try
{
writer = new BufferedWriter(new FileWriter(file));
while (!shared.isEnd())
{
String line = shared.get();
System.out.println("write line : " + line);
writer.write(line + "\n");
}
while (true)
{
String line = shared.get();
if (line == null) {
System.out.println("已经写完了....");
break;
}
String[] arr = line.split(",");
int[] nums = new int[arr.length];
for (int i = 0; i < arr.length; i++)
{
String str = arr[i];
int num = Integer.parseInt(str);
nums[i] = num;

}
int[] numsNew = new Sort().sort(nums);
String s = "";
for (int a : numsNew)
{

s += a + ",";
}
s = s.substring(0, s.length() - 1);
// System.out.println("排序后的:" + s);

System.out.println("write line : " + line + "\n 排序后的:" + s);

writer.write(s);
}
writer.flush();
} catch (IOException e)
{
e.printStackTrace();
}
finally
{
if (writer != null)
{
try
{
writer.close();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
}
}

/**
* 排序的类,排序是以最大的最小的,第二大的,第二小的一次排序
* @author wWX161568
*
*/
class Sort
{
public int[] sort(int[] arr)
{
boolean flag = true;
for (int i = 0; i < arr.length; i++)
{
if (flag)
{
//将剩余没有排序的数字升序排列
for (int j = arr.length - 1; j > i; j--)
{
if (arr[j] > arr[j - 1])
{
int temp = arr[j];
arr[j] = arr[j - 1];
arr[j - 1] = temp;
}
}
}
else
{
//将剩余没有排序的数字降序排列
for (int j = arr.length - 1; j > i; j--)
{
if (arr[j] <= arr[j - 1])
{
int temp = arr[j];
arr[j] = arr[j - 1];
arr[j - 1] = temp;
}
}
}
flag = !flag;
}
return arr;

}

}


 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  io流 Thread