您的位置:首页 > 其它

三个线程同步中一个纠结的问题

2013-08-05 19:39 155 查看
前几天一直没想明白,搜索我的记忆海,愣是没找到这种三个线程相交互的!

1、问题描述

目前有三个class

WriteDB他主要负责把网页的信息写到数据中(间隔5分钟),每次更新之后都会他会相应的把DataOperation中的变量isDataUpdate_设置为true;

Search他负责处理对用户执行查询信息的操作,当有用户执行Search,他会相应的把DataOperation中的变量isUserSearch_设置为true;

DataOperation这是一个中间的类,负责做类WriteDB和类Search的同步操作,会一直检查这两个变量isDataUpdate_和isUserSearch_,当他们都为false的时候,把最新在内存中的数据库写入到对应的文件中,以便能够查询最新的信息,如果其中一个为true,他都等待。

2、迷糊的那段时间

也就是前几天,一直在想,怎么能实现上面的东西!一直被WriteDB这个class和isUserSearch_(bool类型)所困扰!

还写出来类似第一版的DataOperation代码:

class DataOperation
{
public:

DataOperation()
{
running_flag = true;
isDataUpdate_ = false;
isUserSearch_ = false;
}

void Stop()
{
running_flag = false;
{
boost::recursive_mutex::scoped_lock lock(mtx);
cond.notify_one();
}
}

void EmitSignalThread()
{
while (running_flag)
{
if (isSearchEnd_)
{
break;
}

{
boost::recursive_mutex::scoped_lock lock(mtx);

if (IsReplaceDB())
{
// 需要替换数据库
for (int i = 0; i <= 999; i++)
{
cout << "正在替换数据库" << i << endl;
}
}

cond.notify_all();
}
boost::this_thread::sleep(boost::posix_time::millisec(1000));
}
}

// 当后台有数据要更新,并且目前没有用户进行search的时候replace
// 返回true表明正在进行,false表示还需要等待
bool IsReplaceDB()
{
return (isDataUpdate_ && isUserSearch_);
}

// 这个会阻塞
void WaitReplaceDB()
{
boost::recursive_mutex::scoped_lock lock(mtx);
cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
cond.wait(mtx);
boost::this_thread::sleep(boost::posix_time::millisec(100));
}

void SetIsDataUpdate(bool isDataUpdate = true)
{
isDataUpdate_ = isDataUpdate;
}

void SetIsUserSearch(bool isUserSearch = true)
{
isUserSearch_ = isUserSearch;
}

void SetIsSearchEnd(bool isSearchEnd = false)
{
isSearchEnd_ = isSearchEnd;
}

bool GetIsSearchEnd()
{
return isSearchEnd_;
}

private:
bool running_flag;		// 数据助手是否需要存活
bool isDataUpdate_;		// 数据是否有更新
bool isUserSearch_;		// 当前是否有用户正在查询
bool isSearchEnd_;		// 程序是否已经退出
boost::recursive_mutex mtx;
boost::condition_variable_any cond;
};


3、困扰的原因

我一直在想,以前的生产者和消费者是怎么做的?有没有想过还带有条件变量的?

4、能不能更简单

比如我就想过,只在DataOperation类中检测这两个bool变量,如果满足则更新DB,不满足Sleep;最终发现也有漏洞,毕竟这么做了Search中也要对应检测DB当前是否在更新,更新就Sleep----举一个极端的例子,但是是很可能发生:当前DataOperation发现没有用户在执行查询,那么DataOperation去设置标记(表面将要去更新数据库),当在设置标记的时候,用户更好输入了查询,发现数据库没在更新,那么就会造成同时出现DataOperation正在更新数据库、用户正在查询的情况!那么此时系统就会崩溃了。

5、所以说

条件变量、互斥量是确实有存在的意义的!

6、再来看一下整体的设计

从作用来看WriteDB只是自己做自己的,并不和多线程的同步有关,他仅仅是每次都设置isDataUpdate_为true就行;剩下的同步仅在于Search和Dataoperation中,bool isUserSearch_;和bool isSearchEnd_;都没有实际的存在价值;只需要把isUserSearch_设置为条件变量就行。这样前面的两个bool变量的功能就被取代了(同时提一下,原来的条件变量,在这里才真正的用起来)

7、最终的测试代码

#include "iostream"
#include "boost/thread.hpp"
#include "boost/bind.hpp"
using namespace std;
#include "windows.h"
#include "fstream"

namespace MyNamespace
{
ofstream cout("log.txt");
}

//#define WRITEFILE_NAMESPACE

#ifdef WRITEFILE_NAMESPACE
ofstream out("log.txt");
#endif // 0

class DataOperation
{
public:

DataOperation()
{
running_flag_  = true;
isDataUpdate_  = false;
}

void Stop()
{
running_flag_ = false;
}

void EmitSignalThread()
{
while (running_flag_)
{
boost::recursive_mutex::scoped_lock lock(mtx);
bool flag = GetDataUpdate();

// 如果有更新 那些就替换数据库
if (GetDataUpdate())
{
// 需要替换数据库
for (int i = 0; i <= 999; i++)
{
if (i == 1)
{
#ifdef WRITEFILE_NAMESPACE
out << "正在替换数据库" << i << endl;
#else
cout << "正在替换数据库" << i << endl;
#endif // WRITEFILE_NAMESPACE

}
if (i == 999)
{
#ifdef WRITEFILE_NAMESPACE
out << "替换完毕" << i << endl;
#else
cout << "替换完毕" << i << endl;
#endif // WRITEFILE_NAMESPACE
}
}
isDataUpdate_ = false;
}
isUserSearch_.notify_all();
// 等待10秒钟,所以结束的时候可能会在这里阻塞
//		boost::this_thread::sleep(boost::posix_time::millisec(10000));
}
}

// 这个会阻塞
void WaitReplaceDB()
{
boost::recursive_mutex::scoped_lock lock(mtx);
#ifdef WRITEFILE_NAMESPACE
out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
#else
cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;
#endif // WRITEFILE_NAMESPACE
isUserSearch_.wait(mtx);
boost::this_thread::sleep(boost::posix_time::millisec(100));
}

void SetDataUpdate(bool isDataUpdate = true)
{
isDataUpdate_ = isDataUpdate;
}

bool GetDataUpdate()
{
return isDataUpdate_;
}

bool GetRunningFlag()
{
return running_flag_;
}

private:
bool running_flag_;							 // 数据助手是否需要存活
bool isDataUpdate_;
boost::recursive_mutex mtx;
boost::condition_variable_any isUserSearch_; // 当前是否有用户正在查询
};

class WriteDB
{
public:
void Start(DataOperation& dataoperation)
{
while (true)
{
// 在这里 这个线程进行数据库的更新可能需要大概好几分钟
for (int i = 0; i <= 9999; i++)
{
if (i == 1)
{
#ifdef WRITEFILE_NAMESPACE
out << "WriteDB wait..." << i << endl;
#else
cout << "WriteDB wait..." << i << endl;
#endif // WRITEFILE_NAMESPACE
}
if (i == 9999)
{
#ifdef WRITEFILE_NAMESPACE
out << "WriteDB wait..." << i << endl;
#else
cout << "WriteDB wait..." << i << endl;
#endif // WRITEFILE_NAMESPACE
}
}

dataoperation.SetDataUpdate();

// 查询一下,如果程序退出了,那么线程也退出
if (!dataoperation.GetRunningFlag())
{
#ifdef WRITEFILE_NAMESPACE
out << "bye-bye" << endl;
#else
cout << "bye-bye" << endl;
#endif // WRITEFILE_NAMESPACE
break;
}
Sleep(10000);
}
}
};

class Search
{
public:
void Start(DataOperation& dataoperation)
{
string user_enter;
while (true)
{
getline(cin, user_enter);
if (user_enter == "q")
{
dataoperation.Stop();
break;
}

#ifdef WRITEFILE_NAMESPACE
out << "当前用户输入的是:" << user_enter << endl;
#else
cout << "当前用户输入的是:" << user_enter << endl;
#endif // WRITEFILE_NAMESPACE

// 如果正在更新数据库,那么wait
dataoperation.WaitReplaceDB();
//	out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl;

// 如果没在更新数据库,直接进行查询
for (int i = 0; i <= 99999; i++)
{
//	cout << "正在为您查询数据..." << i << endl;
//////////////////////////////////////////////////////////////////////////
if (i == 1)
{
#ifdef WRITEFILE_NAMESPACE
out << "正在为您查询数据..." << i << endl;
#else
cout << "正在为您查询数据..." << i << endl;
#endif // WRITEFILE_NAMESPACE
}
if (i == 99999)
{
#ifdef WRITEFILE_NAMESPACE
out << "查询完毕..." << i << endl;
#else
cout << "查询完毕..." << i << endl;
#endif // WRITEFILE_NAMESPACE
}
//////////////////////////////////////////////////////////////////////////
}

// 向等待的线程发出信号,让他去更新数据库
Sleep(100);
}
}
};

int main()
{
boost::thread_group grp;
WriteDB write_db;
Search search;
DataOperation data_operation;

grp.create_thread(boost::bind(&WriteDB::Start, &write_db, boost::ref(data_operation)));
grp.create_thread(boost::bind(&Search::Start, &search, boost::ref(data_operation)));
grp.create_thread(boost::bind(&DataOperation::EmitSignalThread, boost::ref(data_operation)));

grp.join_all();

return 0;
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐