您的位置:首页 > 其它

模板库的基本使用( 《二》 对象管理)

2011-07-18 01:04 507 查看
前面讲了使用模板库进行对象的管理这里我们讲下算法的 管理 代码都很简单就不详细讲解了不懂可以留言或者交流

//=======================================================================================

#ifndef __MPDK_DEF_H__
#define __MPDK_DEF_H__
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <pthread.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/timeb.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <netinet/in.h>
//#include <netinet/sctp.h>
#include <netinet/if_ether.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/tcp.h>
#include <net/if.h>
#include <netdb.h>
#include <arpa/inet.h> //inet_addr()
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <iconv.h>
#include <ctype.h>
#include <typeinfo>
#include <dirent.h>
#include <stdarg.h>
#include <utility>
#include <assert.h>
#include <errno.h>
#include <stdint.h> //for uint32_t
#include <unistd.h>
/// Public header file reference
#include <string>
#include <vector>
#include <list>
#include <map>
#include <queue>
#include <deque>
#include <iostream>
#include <fstream>
#include <cctype>
#include <algorithm>
#include <ext/hash_map>
#include <ext/hash_set>
#ifdef _DEBUG
#include <set>
#endif
using namespace std;
using namespace __gnu_cxx;
#define TRUE 1
#define FALSE 0
#define closesocket close
#define SOCKET_ERROR -1
#define TRACE printf
#define MAX_PATH 260
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#ifndef NTOHL
#define NTOHL(x) (x) = ntohl(x)
#define NTOHS(x) (x) = ntohs(x)
#define HTONL(x) (x) = htonl(x)
#define HTONS(x) (x) = htons(x)
#endif
#define MPDK_API
#endif // __MPDK_DEF_H__

//=======================================================================================


#ifndef __MPDK_THREAD_H__
#define __MPDK_THREAD_H__
#include "MPDKDef.h"
namespace MPDK
{
namespace MPThread
{
class CMutex
{
public:
CMutex(void);
virtual ~CMutex(void);
public:
void Lock(void);
void UnLock(void);
private:
pthread_mutex_t m_Mutex;
};
class CLock
{
public:
CLock(CMutex& mutex);
virtual ~CLock(void);
private:
CMutex& m_Mutex;
};
class CThread
{
public:
CThread(void);
virtual ~CThread(void);
public:
bool Start(void);
void Stop(void);
bool IsRunning(void) const;
static void MsSleep(int nMsec);
protected:
void Run(void);
void Destory(void);
virtual bool OnStart(void);
virtual void OnStop(void);
virtual void Execute(void) = 0;
private:
static void* ThreadFunc(void* pParam = NULL);
private:
bool m_bRunning;
pthread_t m_hThread;
};
template <typename T>
class CQueue
{
public:
CQueue(unsigned int uiSize = 30000000);
virtual ~CQueue(void);
public:
inline bool Push(T* e);
inline bool Pop(T** e);
private:
T** m_pQueue;
unsigned int m_uiSize;
unsigned int m_uiPop;
unsigned int m_uiPush;
unsigned int m_uiNum;
CMutex m_Mutex;
};
template <typename T>
bool CQueue<T>::Pop( T** e )
{
CLock x(m_Mutex);
if (!m_uiNum)
{
return false;
}
if (m_uiPop == m_uiSize)
{
m_uiPop = 0;
}
*e = *(m_pQueue + m_uiPop%m_uiSize);
m_uiPop++;
m_uiNum--;
return true;
}
template <typename T>
bool CQueue<T>::Push( T* e )
{
CLock x(m_Mutex);
if (m_uiNum >= m_uiSize)
{
#ifdef _DEBUG_
printf("Msg Push fail(e=%lu)...\n", (uint64_t)e);
#endif
return false;
}
if (m_uiPush == m_uiSize)
{
m_uiPush = 0;
}
//printf("Msg Push (e=%lu), pos=%lu\n", (uint64_t)e, m_uiPush%m_uiSize);
//m_pQueue[m_uiPush%m_uiSize] = e;
*(m_pQueue + m_uiPush%m_uiSize) = e;
//printf("Msg Push end(%lu)\n", *(m_pQueue + m_uiPush%m_uiSize));
m_uiPush++;
m_uiNum++;
return true;
}
template <typename T>
CQueue<T>::CQueue( unsigned int uiSize /*= 50000000*/ ) : m_uiSize(uiSize), m_uiPop(0), m_uiPush(0), m_uiNum(0)
{
// 略。。。 。。 。
uiSize = uiCount;
m_uiSize = uiCount;
m_pQueue = ( new T*[uiSize] );
assert(m_pQueue);
// pNode[0] = new CMsg;
// *(pNode + 3) = new CMsg;
}
template <typename T>
CQueue<T>::~CQueue(void)
{
if (m_pQueue)
{
delete[] m_pQueue;
m_pQueue = NULL;
}
}
}
}
#endif // __MPDK_THREAD_H__

//=======================================================================================

#include "thread.h"
namespace MPDK
{
namespace MPThread
{
CMutex::CMutex(void)
{
pthread_mutex_init(&m_Mutex,NULL);
}
CMutex::~CMutex(void)
{
pthread_mutex_destroy(&m_Mutex);
}
void CMutex::Lock(void)
{
pthread_mutex_lock(&m_Mutex);
}
void CMutex::UnLock(void)
{
pthread_mutex_unlock(&m_Mutex);
}
CLock::CLock(CMutex& mutex):m_Mutex(mutex)
{
m_Mutex.Lock();
}
CLock::~CLock(void)
{
m_Mutex.UnLock();
}
CThread::CThread(void) : m_bRunning(false)
{
}
CThread::~CThread(void)
{
if (m_bRunning)
{
Stop();
}
}
void CThread::Run(void)
{
Execute();
m_bRunning = false;
Destory();
}
void CThread::Destory(void)
{
pthread_exit(NULL);
}
bool CThread::Start(void)
{
if (m_bRunning)
{
return false;
}
if ( !OnStart() )
{
return m_bRunning;
}
m_bRunning = true;
if (pthread_create(&m_hThread, NULL, CThread::ThreadFunc, (void*)this) != 0)
{
m_bRunning = false;
return m_bRunning;
}
return m_bRunning;
}
void CThread::Stop(void)
{
m_bRunning = false;
OnStop();
pthread_join(m_hThread,0);
pthread_cancel(m_hThread);
}
bool CThread::IsRunning(void) const
{
return m_bRunning;
}
void CThread::MsSleep(int nMsec)
{
usleep(nMsec * 1000L);
}
void* CThread::ThreadFunc(void* pParam)
{
CThread* pThread = (CThread*)pParam;
pthread_detach(pthread_self());
if ( NULL != pThread )
{
pThread->Run();
}
return NULL;
}
bool CThread::OnStart(void)
{
return true;
}
void CThread::OnStop(void)
{
}
}
}


//=======================================================================================


#ifndef __DISPATCH_H__
#define __DISPATCH_H__

class CMain;
template <typename T> class CAynscWriter : public MPDK::MPThread::CThread
{
public:
CAynscWriter(void){};
virtual ~CAynscWriter(void){};
public:
/************************************************************************
* 名称: Write
* 功能: 归属事务处理者的任务入口
* 输入: e=>任务句柄
* 输出: 无
* 返回: 是否入列成功
************************************************************************/
bool Write(T* e)
{
return m_Task.Push(e);
}
private:
/************************************************************************
* 名称: Execute
* 功能: 事务处理者线程接口
* 输入: 无
* 输出: 无
* 返回: 无
************************************************************************/
void Execute(void)
{
T* e = NULL;
while (MPDK::MPThread::CThread::IsRunning())
{
if ( m_Task.Pop(&e) )
{
Dispatch(e);
continue;
}
MPDK::MPThread::CThread::MsSleep(1);
}
}

/************************************************************************
* 名称: Dispatch
* 功能: 事务处理者分派出口
* 输入: 无
* 输出: 无
* 返回: 无
************************************************************************/
void Dispatch(T* pTask)
{
CMain* pMain = (CMain*)(((CMsg*)pTask)->UserData);
// 调用对象内部的算法 。。。 。。 。
pMain ->Dispatch((CMsg*)pTask);
}
void OnStop(void){};
private:
// 当前事务处理者任务队列
MPDK::MPThread::CQueue<T> m_Task;
};
template <typename T> class CDispatchMgr : public MPDK::MPThread::CThread
{
public:
// 暂时固定,待续,后续会自动根据获取机器性能调整
CDispatchMgr(uint8_t ucNum=8) : m_ucWriterNum(ucNum)
{
// 略 。。。 。。 。
// 事务处理者创建
m_pWrites = new CAynscWriter<T>[m_ucWriterNum];
assert(m_pWrites);
}
virtual ~CDispatchMgr(void){};
public:
/************************************************************************
* 名称: Write
* 功能: 所有任务入口
* 输入: e=>任务句柄
* 输出: 无
* 返回: 是否入列成功
************************************************************************/
bool Write(T* e)
{
return m_AllTask.Push(e);
}
/************************************************************************
* 名称: WriterNum
* 功能: 取事务组处理者数量
* 输入: 无
* 输出: 无
* 返回: 处理者数量
************************************************************************/
uint8_t WriterNum(void) const
{
return m_ucWriterNum;
}
protected:
/************************************************************************
* 名称: OnStart
* 功能: 分派器启动接口,负责启动所有事务处理者
* 输入: 无
* 输出: 无
* 返回: 是否启动成功(未用,直接返回成功)
************************************************************************/
bool OnStart(void)
{
for (uint8_t n = 0 ; n < m_ucWriterNum ; n ++)
{
if (NULL != (m_pWrites + n))
{
((CAynscWriter<T>*)(m_pWrites + n))->Start();
}
}
return true;
}

/************************************************************************
* 名称: OnStop
* 功能: 分派器停止接口,负责停止所有事务处理者
* 输入: 无
* 输出: 无
* 返回: 无
************************************************************************/
void OnStop(void)
{
for (uint8_t n = 0 ; n < m_ucWriterNum ; n ++)
{
if (NULL != (m_pWrites + n))
{
((CAynscWriter<T>*)(m_pWrites + n))->Stop();
}
}
}

/************************************************************************
* 名称: Execute
* 功能: 分派器分派出口
* 输入: 无
* 输出: 无
* 返回: 无
************************************************************************/
void Execute(void)
{
T* e = NULL;
while (MPDK::MPThread::CThread::IsRunning())
{
if ( m_AllTask.Pop(&e) )
{
Router(e);
continue;
}
MPDK::MPThread::CThread::MsSleep(1);
}
}

/************************************************************************
* 名称: Router
* 功能: 分派器执行任务路由
* 输入: pTask=>任务
* 输出: 无
* 返回: 无
************************************************************************/
void Router(T* pTask)
{
assert(pTask);
CHECK_CONDITION3((!pTask));
uint32_t uiID = ((CMsg*)pTask)->Receiver;
if ((uiID >= m_ucWriterNum) || (!m_pWrites))
{
#ifdef __AIUCS_DEBUG__
printf("AynscWriter Handle invalid...\n");
#endif
return;
}
if (NULL != (m_pWrites + uiID))
{
((CAynscWriter<T>*)(m_pWrites + uiID))->Write(pTask);
}
}
private:
// 所有任务队列
MPDK::MPThread::CQueue<T> m_AllTask;
// 事务处理者组
CAynscWriter<T>* m_pWrites;
// 事务处理者数
uint8_t m_ucWriterNum;
};
#endif // __DISPATCH_H__

// =====================================================================
class CInto
{
void OnMsg(CMsg *pMsg);
// 对象管理
CMgr m_Mgr;
// 消息分派器 算法处理《多线程》
CDispatchMgr<CMsg> m_DispatchMgr;
}

//========================================================================

int main()
{
CInto m_CInto;
while(1)
{
... .. .
m_CInto.OnMsg(...);
... .. .
}
return 0;
}

// 中间省略了有些实现 相信懂点都知道


// 下面说说具体的怎么实现数据和算法的协调处理
1.程序入口处(全局类中)定义一个 《一》 中的管理对象
2.主线程(全局类)中的消息依据类型负责对象的分配回收
3.依据队列中对象绑定的键值和使用对象指针中得到的线程标识 分派进相应的线程队列中
4.每个线程负责取出消息然后调用对象的入口函数传入消息
5.执行对象中的算法对消息进行处理

存在问题
1.一个对象列表可能会存在于多个线程中取调用 这样会接触到锁 --- 如果每个线程只负责处理一类对象则问题解决
2.其他问题依据具体问题可能还需优化
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐