您的位置:首页 > 其它

zthread学习 实例十 线程间的协助(二)

2011-05-31 19:44 260 查看
例一、生产者-消费者

一个任务制作烤面包(生产者),一个任务给烤面包抹黄油(消费者),还有一个任务是往抹好黄油的烤面包上抹果酱(消费者)。

代码如下:

#include "stdafx.h"
#include "zthread/FastMutex.h"
#include "zthread/CountedPtr.h"
#include "zthread/Runnable.h"
#include "zthread/Condition.h"
#include <iostream>
using namespace ZThread;
using namespace std;

class Jammer : public Runnable
{
public:
Jammer() : butteredToastReady(Lock), jammed(0), isGotButteredToast(false){}

void moreButteredToastReady()
{
Guard<Mutex> g(Lock);

isGotButteredToast = true;
butteredToastReady.signal();
}

void run()
{
try
{
while (!Thread::interrupted())
{
{
Guard<Mutex> g(Lock);
while (isGotButteredToast == false)
butteredToastReady.wait();
++jammed;
}

cout << "Putting jam on toast : " <<jammed <<endl;

{
Guard<Mutex> g(Lock);

isGotButteredToast = false;

}
}
}
catch (Interrupted_Exception& e)
{
cout << " Jarry : "<< e.what() <<endl;
}
}
private:
Mutex		Lock;
Condition	butteredToastReady;
bool		isGotButteredToast;

int			jammed;
};
class Butterer : public Runnable
{
public:
Butterer(CountedPtr<Jammer>& j) : toastReady(Lock), jammer(j), Buttered(0), isGotToast(false){}

void moreToastReady()
{
Guard<Mutex> g(Lock);
isGotToast = true;
toastReady.signal();
}

void run()
{
try
{
while (!Thread::interrupted())
{
{
Guard<Mutex> g(Lock);

while (!isGotToast)
toastReady.wait();

++Buttered;
}
cout << "Buttering toast :" << Buttered <<endl;
jammer->moreButteredToastReady();

{
Guard<Mutex> g(Lock);
isGotToast = false;
}
}
}
catch (Interrupted_Exception& e)
{
cout << " Jarry " << e.what() << endl;
}
}

private:
Mutex				Lock;
Condition			toastReady;
CountedPtr<Jammer>	jammer;
bool				isGotToast;
int					Buttered;
};
class Toast : public Runnable
{
public:
Toast(CountedPtr<Butterer>& aButterer) : butterer(aButterer), toasted(0) {}

void run()
{
try
{
while (!Thread::interrupted())
{
Thread::sleep(1000);

cout << "A new Toast" << ++toasted <<endl;

butterer->moreToastReady();
}
}
catch (Interrupted_Exception& e)
{
cerr << "Jarry " << e.what() <<endl;
}
}
private:
CountedPtr<Butterer> butterer;
int toasted;
};

int _tmain(int argc, _TCHAR* argv[])
{
try
{
CountedPtr<Jammer>		jammer(new Jammer);
CountedPtr<Butterer> 	butterer(new Butterer(jammer));
CountedPtr<Toast>		toast(new Toast(butterer));

ThreadedExecutor executor;
executor.execute(toast);
executor.execute(jammer);
executor.execute(butterer);
cin.get();
executor.interrupt();
}
catch (Synchronization_Exception& e)
{
cerr <<" Jarry Main " << e.what() <<endl;
}

return 0;
}


线程处理问题常常基于需要对任务进行串行化处理。使用队列可以采用同步的方式访问其内部元素,这样可以解决很多线程处理问题,下面基于STL中的deque实现的队列:

#ifndef QUEUE_H
#define QUEUE_H
#include "zthread/Runnable.h"
#include "zthread/PoolExecutor.h"

template<class T>
class TQueue
{
//锁子
ZThread::Mutex lock;
//线程协作基类
ZThread::Condition cond;
//线程队列元素
std::deque<T> data;
public:
TQueue() : cond(lock) {}
void put(T item)
{
ZThread::Guard<ZThread::Mutex> g(lock);
data.push_back(item);
cond.signal();
}
T get()
{
ZThread::Guard<ZThread::Mutex> g(lock);
while(data.empty())
cond.wait();
T returnVal = data.front();
data.pop_front();
return returnVal;
}
};
#endif


1、加入同步以确保在同一时刻不会有两个线程添加对象。

2、加入wait()和signal()在队列空时,挂起线程,并在有多个元素可用时恢复执行。

二、广播

signal()函数唤醒了一个正在等待Condition对象的线程。然而,也许会有多个线程在等待某个相同的条件对象,在这种情况下需要使用broadcast()(与调用线程的个数的signal()效果相同)而不是signal()把这些线程唤醒。

使用广播的例子:每辆Car将在几个阶段内装配完成,本例中将看到这样一个阶段:底盘制作好之后的这段时间里可以 同时 安装发动机、驱动传动装置和车轮(广播)。本例中综合中前面中所有关于线程的知识。

#include <iostream>
#include <string>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
#include "TQueue.h"
using namespace ZThread;
using namespace std;
//driveTrain 驱动传动
class Car
{
int		id;
bool	engine, driveTrain, wheels;
public:
Car(int idn) : id(idn), engine(false),
driveTrain(false), wheels(false) {}
// Empty Car object:
Car() : id(-1), engine(false),
driveTrain(false), wheels(false) {}
// Unsynchronized -- assumes atomic bool operations:
int getId() { return id; }
void addEngine() { engine = true; }
bool engineInstalled() { return engine; }
void addDriveTrain() { driveTrain = true; }
bool driveTrainInstalled() { return driveTrain; }
void addWheels() { wheels = true; }
bool wheelsInstalled() { return wheels; }
friend ostream& operator<<(ostream& os, const Car& c)
{
return os << "Car " << c.id << " ["
<< " engine: " << c.engine
<< " driveTrain: " << c.driveTrain
<< " wheels: " << c.wheels << " ]";
}
};
typedef CountedPtr< TQueue<Car> > CarQueue;
class ChassisBuilder : public Runnable
{
CarQueue	carQueue;
int			counter;
public:
ChassisBuilder(CarQueue& cq) : carQueue(cq),counter(0) {}
void run()
{
try
{
while(!Thread::interrupted())
{
Thread::sleep(100);
// Make chassis:
Car c(counter++);
cout << c << endl;
// Insert into queue
carQueue->put(c);
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "ChassisBuilder off" << endl;
}
};
class Cradle
{
Car			c; // Holds current car being worked on
bool		occupied;
Mutex		workLock, readyLock;
Condition	workCondition, readyCondition;
bool		engineBotHired, wheelBotHired, driveTrainBotHired;
public:
Cradle()
: workCondition(workLock), readyCondition(readyLock)
{
occupied			= false;
engineBotHired		= true;
wheelBotHired		= true;
driveTrainBotHired	= true;
}
//向底盘生产线中加入car对象
//并修改底盘生产线的状态(被占用)
void insertCar(Car chassis)
{
c = chassis;
occupied = true;
}
//从底盘生产线中取出对象
//修正状态为未被占用,并返回car对象
Car getCar()
{
// Can only extract car once
if(!occupied)
{
cerr << "No Car in Cradle for getCar()" << endl;
return Car(); // "Null" Car object
}
occupied = false;
return c;
}
// Access car while in cradle:
//重载指向成员符号
//可以使用(*Cradle)直接调用
Car* operator->() { return &c; }
// Allow robots to offer services to this cradle:
void offerEngineBotServices()
{
Guard<Mutex> g(workLock);
while(engineBotHired)
workCondition.wait();
engineBotHired = true; // Accept the job
}
void offerWheelBotServices()
{
Guard<Mutex> g(workLock);
while(wheelBotHired)
workCondition.wait();
//这个wait被挂起,直到收到了startWork()的广播
//在startWork()中,雇佣的状态会被修改,因此下一步的操作是修正状态
wheelBotHired = true; // Accept the job
}
void offerDriveTrainBotServices()
{
Guard<Mutex> g(workLock);
while(driveTrainBotHired)
workCondition.wait();
driveTrainBotHired = true; // Accept the job
}
// Tell waiting robots that work is ready:
void startWork()
{
Guard<Mutex> g(workLock);
engineBotHired		= false;
wheelBotHired		= false;
driveTrainBotHired	= false;
workCondition.broadcast(); //等价于3个signal()
//workCondition.signal();
//workCondition.signal();
//workCondition.signal();
}
// Each robot reports when their job is done:
void taskFinished()
{
Guard<Mutex> g(readyLock);
//Guard<Mutex> g(workLock);
readyCondition.signal();
}
// Director waits until all jobs are done:
void waitUntilWorkFinished()
{
Guard<Mutex> g(readyLock);
while(!(c.engineInstalled() && c.driveTrainInstalled() && c.wheelsInstalled()))
readyCondition.wait();
}
};

typedef CountedPtr<Cradle> CradlePtr;
class Director : public Runnable
{
CarQueue	chassisQueue, finishingQueue;
CradlePtr	cradle;
public:
Director(CarQueue& cq, CarQueue& fq, CradlePtr cr)
: chassisQueue(cq), finishingQueue(fq), cradle(cr) {}
void run()
{
try
{
while(!Thread::interrupted())
{
// Blocks until chassis is available:
cradle->insertCar(chassisQueue->get());
// Notify robots car is ready for work
cradle->startWork();
// Wait until work completes
cradle->waitUntilWorkFinished();
// Put car into queue for further work
finishingQueue->put(cradle->getCar());
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "Director off" << endl;
}
};

class EngineRobot : public Runnable
{
CradlePtr cradle;
public:
EngineRobot(CradlePtr cr) : cradle(cr) {}
void run()
{
try
{
while(!Thread::interrupted())
{
// Blocks until job is offered/accepted:
cradle->offerEngineBotServices();
cout << "Installing engine" << endl;
(*cradle)->addEngine();			//隐式的调用了operator->()还是隐式的类型转换?
cradle->taskFinished();
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "EngineRobot off" << endl;
}
};

class DriveTrainRobot : public Runnable
{
CradlePtr cradle;
public:
DriveTrainRobot(CradlePtr cr) : cradle(cr) {}
void run()
{
try
{
while(!Thread::interrupted())
{
// Blocks until job is offered/accepted:
cradle->offerDriveTrainBotServices();
cout << "Installing DriveTrain" << endl;
(*cradle.operator ->())->addDriveTrain();
cradle->taskFinished();
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "DriveTrainRobot off" << endl;
}
};
class WheelRobot : public Runnable
{
CradlePtr cradle;
public:
WheelRobot(CradlePtr cr) : cradle(cr) {}
void run()
{
try
{
while(!Thread::interrupted())
{
// Blocks until job is offered/accepted:
cradle->offerWheelBotServices();
cout << "Installing Wheels" << endl;
(*cradle)->addWheels();
cradle->taskFinished();
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "WheelRobot off" << endl;
}
};

class Reporter : public Runnable
{
CarQueue carQueue;
public:
Reporter(CarQueue& cq) : carQueue(cq) {}
void run()
{
try
{
while(!Thread::interrupted())
{
cout << carQueue->get() << endl;
}
}
catch(Interrupted_Exception&) { /* Exit */ }
cout << "Reporter off" << endl;
}
};

int main() {
cout << "Press <Enter> to quit" << endl;
try {
CarQueue chassisQueue(new TQueue<Car>),  finishingQueue(new TQueue<Car>);
CradlePtr cradle(new Cradle);

ThreadedExecutor assemblyLine;
assemblyLine.execute(new EngineRobot(cradle));
assemblyLine.execute(new DriveTrainRobot(cradle));
assemblyLine.execute(new WheelRobot(cradle));
assemblyLine.execute(new Director(chassisQueue, finishingQueue, cradle));
assemblyLine.execute(new Reporter(finishingQueue));
// Start everything running by producing chassis:
assemblyLine.execute(new ChassisBuilder(chassisQueue));
cin.get();
assemblyLine.interrupt();
} catch(Synchronization_Exception& e) {
cerr << e.what() << endl;
}
} ///:~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: