秒杀多线程第十一篇 读者写者问题(续)
2014-11-12 17:50
330 查看
java实现:
本问题的关键是读者写者之间的同步问题,尤其使用java来操作。
1.等待读者,使用CountDownLatch mReaderLatch, 但是CountDownLatch只能使用一次,所以需要每次都new 一个。
或者可以考虑使用semaphore代替,但是semaphore需要acquire(READ_THREAD_SIZE)才能等待所有读者线程结束。
2.等待写入操作。使用semaphore来控制, mWriteSema.release(READ_THREAD_SIZE);表示读者都可以同时操作。
关键代码如下:
详细代码如下:
本问题的关键是读者写者之间的同步问题,尤其使用java来操作。
1.等待读者,使用CountDownLatch mReaderLatch, 但是CountDownLatch只能使用一次,所以需要每次都new 一个。
或者可以考虑使用semaphore代替,但是semaphore需要acquire(READ_THREAD_SIZE)才能等待所有读者线程结束。
2.等待写入操作。使用semaphore来控制, mWriteSema.release(READ_THREAD_SIZE);表示读者都可以同时操作。
关键代码如下:
public void waitReaderEnd() { //多个read End,using countdownlatch try { mReaderLatch.await(); mReaderLatch = new CountDownLatch(READ_THREAD_SIZE); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalWriteEnd(){ mWriteSema.release(READ_THREAD_SIZE); } public void waitWriteEnd(){ try { mWriteSema.acquire(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalReadEnd(){ mReaderLatch.countDown(); } public void initReadNone(){ try { mWriteSema.acquire(READ_THREAD_SIZE); for(int i=0;i<READ_THREAD_SIZE;i++) { mReaderLatch.countDown(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
详细代码如下:
package com.multithread.readwrite;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import com.multithread.main.ExampleInterface;
import com.multithread.prosumer.ProducerThread;
/**
* 1.写者要等待所有读者读完才能继续写入。
* 2.所有读者要等待写者写完才能继续读取。
* 3.读者这件可以共享读取文件。
*
*
*
* */
public class ReaderWriterExample extends ExampleInterface {
public static final int READ_THREAD_SIZE = 4;
public static final int BUFFER_LENGTH = 100;
public List<Integer> g_productor = new ArrayList<Integer>();
public CountDownLatch mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);
public Semaphore mWriteSema = new Semaphore(READ_THREAD_SIZE);
public boolean bStopFlag = false;
public CountDownLatch mLatchDown = new CountDownLatch(1+READ_THREAD_SIZE);
public CountDownLatch mLatchStart = new CountDownLatch(1+READ_THREAD_SIZE);
@Override
public void startDemo() {
// TODO Auto-generated method stub
try {
initReadNone();
bStopFlag = false;
Executor mEcecutor = Executors.newFixedThreadPool(1+READ_THREAD_SIZE);
mEcecutor.execute(new WriteThread(this,"Writer"));
for(int i=1;i<=READ_THREAD_SIZE;i++)
{
mEcecutor.execute(new ReadThread(this,"Reader"+i));
}
mLatchStart.await();
System.out.println("All Thread is runnning");
mLatchDown.await();
System.out.println("All Thread is Down");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void waitReaderEnd() { //多个read End,using countdownlatch try { mReaderLatch.await(); mReaderLatch = new CountDownLatch(READ_THREAD_SIZE); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalWriteEnd(){ mWriteSema.release(READ_THREAD_SIZE); } public void waitWriteEnd(){ try { mWriteSema.acquire(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalReadEnd(){ mReaderLatch.countDown(); } public void initReadNone(){ try { mWriteSema.acquire(READ_THREAD_SIZE); for(int i=0;i<READ_THREAD_SIZE;i++) { mReaderLatch.countDown(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
package com.multithread.readwrite; public class ReadThread extends Thread { ReaderWriterExample mRwExample = null; String name = null; boolean flag = true; public ReadThread(ReaderWriterExample re,String name) { mRwExample = re; this.name = name; } @Override public void run() { mRwExample.mLatchStart.countDown(); while(flag) { //等待写入结束 mRwExample.waitWriteEnd(); //读取文件,直到本次末尾 //check 文件写入操作是否已经彻底结束,结束read线程。 int mReadLength = mRwExample.g_productor.size(); String mReadStr = ""; if(mReadLength>0) { for(Integer a:mRwExample.g_productor) { mReadStr+=String.format("%x", a); } System.out.println(name+"读取数据:"+mReadStr); } if(mRwExample.bStopFlag) { flag = false; } //通知本次读者结束。 mRwExample.singalReadEnd(); } System.out.println(name+"读取数据结束"); mRwExample.mLatchDown.countDown(); } }
package com.multithread.readwrite; public class WriteThread extends Thread { ReaderWriterExample mRwExample = null; String name = null; int iFlag = 0; public WriteThread(ReaderWriterExample re,String name) { mRwExample = re; this.name = name; } @Override public void run() { mRwExample.mLatchStart.countDown(); int index = 0; while(index<ReaderWriterExample.BUFFER_LENGTH) { //等待所有读者结束读取操作。 mRwExample.waitReaderEnd(); int mWriteLength = (int) (Math.random()*9)+1;//1-10; if(mWriteLength >(ReaderWriterExample.BUFFER_LENGTH - index)) { mWriteLength = ReaderWriterExample.BUFFER_LENGTH - index; } //写入数据到文件最后 //check 写入数据到最后。 mRwExample.g_productor.clear(); int mParam = 0; String writeline = ""; for(int i=0;i<mWriteLength;i++) { mParam = (int) (Math.random()*14)+1;//0-E mRwExample.g_productor.add(mParam); writeline+= String.format("%1$x", mParam); } index = index +mWriteLength; System.out.println(name+"写入数据:"+writeline+"\t 当前index:"+index); if(index>=ReaderWriterExample.BUFFER_LENGTH) { mRwExample.bStopFlag = true; } //通知写入操作结束,可以读取。 mRwExample.singalWriteEnd(); iFlag++; } System.out.println(name+"线程操作结束"); mRwExample.mLatchDown.countDown(); } }
相关文章推荐
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇---读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题续 读写锁SRWLock
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 转---秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十一篇 读者写者问题
- 秒杀多线程第十四篇 读者写者问题继 读写锁SRWLock
- 秒杀多线程第十四篇 读者写者问题继 读写锁SRWLock
- 秒杀多线程第十四篇 读者写者问题继 读写锁SRWLock (续)