您的位置:首页 > 其它

秒杀多线程第十一篇 读者写者问题(续)

2014-11-12 17:50 330 查看
java实现:

本问题的关键是读者写者之间的同步问题,尤其使用java来操作。

1.等待读者,使用CountDownLatch mReaderLatch, 但是CountDownLatch只能使用一次,所以需要每次都new 一个。

或者可以考虑使用semaphore代替,但是semaphore需要acquire(READ_THREAD_SIZE)才能等待所有读者线程结束。

2.等待写入操作。使用semaphore来控制, mWriteSema.release(READ_THREAD_SIZE);表示读者都可以同时操作。

关键代码如下:

public void waitReaderEnd()
{
//多个read End,using countdownlatch
try {
mReaderLatch.await();
mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void singalWriteEnd(){
mWriteSema.release(READ_THREAD_SIZE);
}

public void waitWriteEnd(){
try {
mWriteSema.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void singalReadEnd(){

mReaderLatch.countDown();
}

public void initReadNone(){
try {
mWriteSema.acquire(READ_THREAD_SIZE);
for(int i=0;i<READ_THREAD_SIZE;i++)
{
mReaderLatch.countDown();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}


详细代码如下:

package com.multithread.readwrite;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import com.multithread.main.ExampleInterface;
import com.multithread.prosumer.ProducerThread;

/**
* 1.写者要等待所有读者读完才能继续写入。
* 2.所有读者要等待写者写完才能继续读取。
* 3.读者这件可以共享读取文件。
*
*
*
* */
public class ReaderWriterExample extends ExampleInterface {
public static final int READ_THREAD_SIZE = 4;
public static final int BUFFER_LENGTH = 100;

public List<Integer> g_productor = new ArrayList<Integer>();

public CountDownLatch mReaderLatch = new CountDownLatch(READ_THREAD_SIZE);
public Semaphore mWriteSema = new Semaphore(READ_THREAD_SIZE);

public boolean bStopFlag = false;
public CountDownLatch mLatchDown = new CountDownLatch(1+READ_THREAD_SIZE);
public CountDownLatch mLatchStart = new CountDownLatch(1+READ_THREAD_SIZE);
@Override
public void startDemo() {
// TODO Auto-generated method stub

try {

initReadNone();
bStopFlag = false;
Executor mEcecutor = Executors.newFixedThreadPool(1+READ_THREAD_SIZE);
mEcecutor.execute(new WriteThread(this,"Writer"));

for(int i=1;i<=READ_THREAD_SIZE;i++)
{
mEcecutor.execute(new ReadThread(this,"Reader"+i));
}

mLatchStart.await();
System.out.println("All Thread is runnning");

mLatchDown.await();

System.out.println("All Thread is Down");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void waitReaderEnd() { //多个read End,using countdownlatch try { mReaderLatch.await(); mReaderLatch = new CountDownLatch(READ_THREAD_SIZE); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalWriteEnd(){ mWriteSema.release(READ_THREAD_SIZE); } public void waitWriteEnd(){ try { mWriteSema.acquire(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void singalReadEnd(){ mReaderLatch.countDown(); } public void initReadNone(){ try { mWriteSema.acquire(READ_THREAD_SIZE); for(int i=0;i<READ_THREAD_SIZE;i++) { mReaderLatch.countDown(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}


package com.multithread.readwrite;

public class ReadThread extends Thread {

ReaderWriterExample mRwExample = null;
String name = null;
boolean flag = true;
public ReadThread(ReaderWriterExample re,String name)
{
mRwExample = re;
this.name = name;
}

@Override
public void run() {
mRwExample.mLatchStart.countDown();
while(flag)
{
//等待写入结束
mRwExample.waitWriteEnd();

//读取文件,直到本次末尾 //check 文件写入操作是否已经彻底结束,结束read线程。
int mReadLength = mRwExample.g_productor.size();
String mReadStr = "";
if(mReadLength>0)
{
for(Integer a:mRwExample.g_productor)
{
mReadStr+=String.format("%x", a);
}
System.out.println(name+"读取数据:"+mReadStr);
}

if(mRwExample.bStopFlag)
{
flag = false;
}

//通知本次读者结束。
mRwExample.singalReadEnd();
}

System.out.println(name+"读取数据结束");
mRwExample.mLatchDown.countDown();
}

}


package com.multithread.readwrite;

public class WriteThread extends Thread {

ReaderWriterExample mRwExample = null;
String name = null;
int iFlag = 0;

public WriteThread(ReaderWriterExample re,String name)
{
mRwExample = re;
this.name = name;
}

@Override
public void run() {
mRwExample.mLatchStart.countDown();
int index = 0;
while(index<ReaderWriterExample.BUFFER_LENGTH)
{
//等待所有读者结束读取操作。
mRwExample.waitReaderEnd();

int mWriteLength = (int) (Math.random()*9)+1;//1-10;
if(mWriteLength >(ReaderWriterExample.BUFFER_LENGTH - index))
{
mWriteLength = ReaderWriterExample.BUFFER_LENGTH - index;
}

//写入数据到文件最后  //check 写入数据到最后。
mRwExample.g_productor.clear();
int mParam = 0;
String writeline = "";
for(int i=0;i<mWriteLength;i++)
{
mParam = (int) (Math.random()*14)+1;//0-E
mRwExample.g_productor.add(mParam);
writeline+= String.format("%1$x", mParam);
}

index = index +mWriteLength;
System.out.println(name+"写入数据:"+writeline+"\t 当前index:"+index);

if(index>=ReaderWriterExample.BUFFER_LENGTH)
{
mRwExample.bStopFlag = true;
}

//通知写入操作结束,可以读取。
mRwExample.singalWriteEnd();

iFlag++;
}

System.out.println(name+"线程操作结束");
mRwExample.mLatchDown.countDown();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: