您的位置:首页 > 编程语言

多线程编程之线程的封装

2012-09-24 15:17 176 查看

一. 多线程要考虑的问题

前人总结出,一个线程安全的class应当满足的条件:

1. 从多个线程访问时,其表现出正确的行为,无论操作系统如何调度这些线程,无论这些线程的执行顺序如何交织。

2. 调用端代码无需额外的同步或其他协调动作

在写多线程程序时脑子里要有这样的意识,下面我总结了几条比较具体的注意事项。
使用多线程要考虑的问题:

1. 线程访问资源安全(线程同步问题),这个问题,可用Mutex封装临界区,我的另一篇blog里有介绍.

2. 线程退出安全(主线程结束前线程安全退出),可在主线程结束时,在对象析构里等待线程退出再析构.

3. 跨线程使用的对象的生命周期的控制,要保证对象(指针)不再被使用时才析构。为了安全,少费精力,还是用shared_ptr代替原始指针吧。shared_ptr是boost库的智能指针,现在已加入标准库中,用法不说了,说一下线程中使用注意事项吧。

shared_ptr的线程安全:

shared_ptr的线程安全级别与内建类型一样,多个线程同时对一个shared_ptr进行读操作时是安全的,多个线程同时对多个shared_ptr进行写操作时是安全的,其余都是未定义的。

shared_ptr作为函数参数:

因为拷贝shared_ptr是要修改引用计数的,所以拷贝shared_ptr比拷贝原始指针成本要高,多数情况下可以以reference to const方式传递,只需要在最外层的函数有个实体对象,以后都可以用reference to const来使用这个shared_ptr。

二. 一个封装好的线程类

下面我写了个线程封装类(windows平台),线程时使用它能少写一些代码~ 源码呈上:

#ifndef HBASE_HTHREAD_H__
#define HBASE_HTHREAD_H__

#include <Windows.h>

#ifdef Xxx_CALLBACK
#include <Core/Core.h>
#endif

/** 在类A中使用HThread类:
*  将HThread类的对象作为类A的成员.
*  Start函数,启动一个线程
*  WaitExit函数,在A的析构里调用,保证在主线程退出前线程退出.
*  注意:一个HThread类同时最多只能运行一个线程。
*  如果使用Callback封装unsigned (__stdcall * _start_address) (void *)函数,使用更灵活.
*/

class HThread
{
public:
HThread();
~HThread();
#ifdef Xxx_CALLBACK
bool        Start(Xxx::Callback _cb);
#endif
bool        Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list);
int         WaitExit();

bool        IsRunning();
void        Detach();
bool        IsOpen() const     { return (NULL != handle_); }
HANDLE      GetHandle() const  { return handle_; }

protected:
#ifdef Xxx_CALLBACK
static unsigned __stdcall ThreadRuntime(void* p);
#endif

private:
HThread(const HThread&);
HThread& operator=(const HThread&);

private:
HANDLE      handle_;
};

#endif


#include <process.h>
#include "hthread.h"

HThread::HThread() : handle_(NULL)
{}

HThread::~HThread()
{
Detach();   // Wait() ?
}

#ifdef Xxx_CALLBACK
unsigned __stdcall HThread::ThreadRuntime(void* p)
{
Xxx::Callback *cb = (Xxx::Callback *)p;
(*cb)();
delete cb;
return 0;
}

bool HThread::Start(Xxx::Callback _cb)
{
Detach();
Xxx::Callback *cb = new Xxx::Callback(_cb);
handle_ = (HANDLE)_beginthreadex(0, 0, &HThread::ThreadRuntime, cb, 0, NULL);
return (0 != handle_);
}
#endif

bool HThread::Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list)
{
Detach();
handle_ = (HANDLE)_beginthreadex(0, 0, _start_address, _arg_list, 0, NULL);
return (0 != handle_);
}

int HThread::WaitExit()
{
if(!IsOpen())
return -1;
int out;
DWORD exit;
if(!GetExitCodeThread(handle_, &exit))
return -1;
if(exit != STILL_ACTIVE)
out = (int)exit;
else
{
if(WaitForSingleObject(handle_, INFINITE) != WAIT_OBJECT_0)
return 0;
out = GetExitCodeThread(handle_, &exit) ? int(exit) : int(0);
}
Detach();
return out;
}

bool HThread::IsRunning()
{
if(!IsOpen())
return false;

DWORD exit;
if(!GetExitCodeThread(handle_, &exit))
return true;

return (exit == STILL_ACTIVE)? true : false;
}

void HThread::Detach()
{
if(handle_) {
CloseHandle(handle_);
handle_ = 0;
}
}


三. 根据任务类型封装的线程类

在用线程处理问题时常常分两种情况:

A. 这个线程只做一件事,或循环做一件事情,做完这件事情之后就不再用它了,此时就要结束这个线程,称这种线程为单任务型。

B. 启动一个线程后,有个事情A需要在线程里完成,此时把任务A交给线程,没过多久又有个任务B要做,再把B交给这个线程去完成,也就是说这个线程能完成各种类型的任务,这种类型的线程称为多任务型,如果管理多个线程,就成线程池了。

单任务型适合用于耗时的重复性操作,比如上传1000张图片,这种线程为该任务而生,任务结束线程也就结束了。

多任务型线程长期活着,接各种耗时短的杂活,有时候会下载个文件,有时候数据库插入个图片。

说了这么多,看代码吧~

#ifndef MATERIAL_THREAD_H__
#define MATERIAL_THREAD_H__

#include <Windows.h>
#include <Core/Core.h>
#include <deque>

class TaskThreadBase
{
public:
TaskThreadBase();
virtual ~TaskThreadBase();

virtual int WaitExit();

bool        IsRunning();
void        Detach();
bool        IsOpen() const     { return (NULL != handle_); }
HANDLE      GetHandle() const  { return handle_; }

private:
TaskThreadBase(const TaskThreadBase&);
TaskThreadBase& operator=(const TaskThreadBase&);

protected:
HANDLE      handle_;
bool        running_;
};

class TaskThread : public TaskThreadBase
{
public:
TaskThread();
~TaskThread();
bool        Start(Upp::Callback _cb);
bool        Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list);

protected:
static unsigned __stdcall ThreadRuntime(void* p);
};

class MutiTaskThread : public TaskThreadBase
{
private:
struct Task
{
Task() {}
Task(Upp::Callback cb) : cb(cb) {}
Upp::Callback cb;
};

public:
MutiTaskThread();
~MutiTaskThread();

void SetMaxTaskCount(size_t task_count);
bool Start();
bool AddTask(Upp::Callback cb, bool priority = false);

virtual int WaitExit();

private:
static unsigned __stdcall ThreadRuntime(void* p);
void Run();
void DoTask(const Task& task);
void RequestStop();

private:
std::deque<Task>  tasks_;
size_t            max_task_count_;

Upp::Mutex        mutex_;
Upp::Semaphore    semaphore_;
bool              quit_;
};

#endif


#include <process.h>
#include "materialthread.h"

TaskThreadBase::TaskThreadBase() : handle_(NULL)
{}

TaskThreadBase::~TaskThreadBase()
{
Detach();   // Wait() ?
}

int TaskThreadBase::WaitExit()
{
if(!IsOpen())
return -1;
int out;
DWORD exit;
if(!GetExitCodeThread(handle_, &exit))
return -1;
if(exit != STILL_ACTIVE)
out = (int)exit;
else
{
if(WaitForSingleObject(handle_, INFINITE) != WAIT_OBJECT_0)
return 0;
out = GetExitCodeThread(handle_, &exit) ? int(exit) : int(0);
}
Detach();
return out;
}

bool TaskThreadBase::IsRunning()
{
if(!IsOpen())
return false;

DWORD exit;
if(!GetExitCodeThread(handle_, &exit))
return true;

return (exit == STILL_ACTIVE)? true : false;
}

void TaskThreadBase::Detach()
{
if(handle_) {
CloseHandle(handle_);
handle_ = 0;
}
}

TaskThread::TaskThread()
{
}

TaskThread::~TaskThread()
{
}

unsigned __stdcall TaskThread::ThreadRuntime(void* p)
{
Upp::Callback *cb = (Upp::Callback *)p;
(*cb)();
delete cb;
return 0;
}

bool TaskThread::Start(Upp::Callback _cb)
{
Detach();
Upp::Callback *cb = new Upp::Callback(_cb);
handle_ = (HANDLE)_beginthreadex(0, 0, &TaskThread::ThreadRuntime, cb, 0, NULL);
return (0 != handle_);
}

bool TaskThread::Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list)
{
Detach();
handle_ = (HANDLE)_beginthreadex(0, 0, _start_address, _arg_list, 0, NULL);
return (0 != handle_);
}

MutiTaskThread::MutiTaskThread() : max_task_count_(1), quit_(false)
{
}

MutiTaskThread::~MutiTaskThread()
{
}

void MutiTaskThread::SetMaxTaskCount(size_t task_count)
{
max_task_count_ = task_count;
}

bool MutiTaskThread::Start()
{
if (running_) return false;
running_ = true;
handle_ = (HANDLE)_beginthreadex(0, 0, ThreadRuntime, this, 0, NULL);
return (0 != handle_);
}

unsigned __stdcall MutiTaskThread::ThreadRuntime(void* p)
{
MutiTaskThread* self = (MutiTaskThread*)p;
self ->Run();
return 0;
}

void MutiTaskThread::RequestStop()
{
quit_ = true;
semaphore_.Release();
}

void MutiTaskThread::Run()
{
while(true)
{
Task cur_task;
bool empty = false;
{
Upp::Mutex::Lock lock(mutex_);
if (!tasks_.empty())
cur_task = tasks_.front();
else
empty = true;
}

if (quit_) break;

if (!empty) {
DoTask(cur_task);
{
Upp::Mutex::Lock lock(mutex_);
if (!tasks_.empty())
tasks_.pop_front();
}
}
else
semaphore_.Wait();
}
running_ = false;
}

bool MutiTaskThread::AddTask(Upp::Callback cb, bool priority /*= false*/)
{
bool ret = false;
Upp::Mutex::Lock lock(mutex_);
Task task(cb);
size_t n = tasks_.size();
if (n < max_task_count_) {
if (priority)
tasks_.push_front(task);
else
tasks_.push_back(task);
ret = true;
if (0 == n)
semaphore_.Release();
}

return ret;
}

void MutiTaskThread::DoTask(const Task& task)
{
task.cb();
}

int MutiTaskThread::WaitExit()
{
RequestStop();
return TaskThreadBase::WaitExit();
}


(完)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: