您的位置:首页 > 其它

线程池相关

2015-06-13 16:24 375 查看
己实现一个“线程池” pp616 [原作]

关键字 线程池

出处

今天见论坛里有问怎么样实现线程池。碰巧原来写过一个类似的。现在来说说。(下面的全是个人理解,不见得是正确的。)

1。先来说说线程池。为什么要使用线程池?

因为创建线程和释放线程是要消耗系统资源的,如果要完成一个工作要不停的创建和释放线程必然会造成很大的系统资源的浪费,所以用线程池。在线程本次工作完成后,不释放线程,让线程等待。再有需要让线程去完成的工作时就把原来创建的线程取过来继续使用。这样节省了重复的创建释放线程的过程。

2。如何实现功能。

根据上面的理解我们来实现这些工作。

A.我们先要创建一个容器来装这些已经创建的线程。

B.然后我们需要用一套机制来让我们知道容器中的线程哪个是空闲的。哪个正在工作。

开始动手写吧.

//.h文件

#ifndef MyThreadPoolH

#define MyThreadPoolH

#include <Classes.hpp>

//定义通讯消息

#define TP_AddThread WM_USER + 1001 //向容器添加一个线程

#define TP_DeleteThread WM_USER + 1002 //删除容器中的一个线程

#define TP_GetFreeThread WM_USER + 1003 //获取一个空闲线程

#define TP_GetThreadCount WM_USER + 1004 //得到容器中的线程总数

class MyThreadPool : public TObject

{

private:

HANDLE FHandle;//线程池的句柄 用来接收通讯消。

TList *FThreadList; //用一个TList来做容器

bool FError; //是否出现错误

void __fastcall (TMessage &Message);//消息处理函数

long __fastcall FGetThreadCount(void);//得到容器中线程的总数

public:

__fastcall MyThreadPool();

__fastcall ~MyThreadPool();

__published:

//发布属性

__property HNDLE Handle={read=FHandle};

__property bool Error={read=FError};

//__property TList *ThreadList={read=FThreadList};//如果有必要把容器发布出来 !但会降低安全性!

__property long ThreadCount={read=GetFreeThread};

};

#endif

//.cpp

#include <vcl.h>

#pragma hdrstop

#include "MyThreadPool.h"

#pragma package(smart_init)

__fastcall MyThreadPool::MyThreadPool()

{

FError=false;

FHandle=AllocateHWnd(MyProc);//创建一个窗口句柄来处理消息

if(FHandle==NULL)

{

FError=true;

return;

}

FThreadList=new TList;//创建容器

if(FThreadList==NULL)

{

FError=true;

return;

}

}

__fastcall MyThreadPool::~MyThreadPool()

{

if(FHandle!=NULL)//释放句柄

{

DeallocateHWnd(FHandle);

}

if(FThreadList!=NULL)//释放容器

{//这里只把容器中的线程指针给删除了。

//中间的线程并没有释放。要释放需要自己添加代码

FThreadList->Clear();

delete FThreadList;

}

}

//处理消息

void __fastcall MyThreadPool::MyProc(TMessage &Message)

{

void *pThread;

int ret;

switch(Message.Msg)

{

case TP_AddThread://添加线程的时候消息的WParam参数为线程指针

pThread=(void *)Message.WParam;

ret=FThreadList->Add(pThread);

Message.Result=ret;//返回线程指针在容器中的index

return;

case TP_DeleteThread://删除线程时消息的WParam参数为线程指针

pThread=(void *)Message.WParam;

ret=FThreadList->IndexOf(pThread);

//如果线程指针不在容器中返回-1,成功删除返回1

if(ret==-1)

{

Message.Result=-1;

}

else

{

FThreadList->Delete(ret);

Message.Result=1;

}

return;

case TP_GetFreeThread://得到一个空闲的线程,如果有空闲消息返回值为线程指针。

//一但线程给取出线程的Working属性就倍设置成true;

for(int i=0;i<FThreadList->Count;i++)

{

pThreadFThreadList->Items[i];

if(((TMyThread *)pThread)->Working==false)

{

((TMyThread *)pThread)->Working=true;

Message.Result=(long)pThread;

return;

}

}

Message.Result=0;

return;

case TP_GetThreadCount://返回容器中的总数

Message.Result=FThreadList->Count;

return;

}

try

{

Dispatch(&Message);

if(Message.Msg==WM_QUERYENDSESSION)

{

Message.Result=1;

}

}

catch(...){};

}

3。我们还需要定制一个自己的线程类来配合上面的ThreadPool来使用

class TMyThread : public TThread

{

private:

bool FWorking;

HANDLE PoolHandle

protected:

void __fastcall Execute();

public:

__fastcall TMyThread(bool CreateSuspended,HANDLE hHandle/*线程池的句柄*/);

__published:

//发布属性

__property bool Working={read=FWorking,write=FWorking}; //线程是否空闲

};

__fastcall TMyThread::TMyThread(bool CreateSuspended,HANDLE hHandle)

: TThread(CreateSuspended)

{

PoolHandle=hHandle;

FWorking=false;

}

void __fastcall TMyThread::Execute()

{

while(!Terminated)

{

FWorking=true;

//工作代码

//。。。。

FWorking=false;

this->Suspend();

}

::SendMessage(PoolHandle,TP_DeleteThread,(long)this,0);//在线程池的容器中删除本线程

return;//线程结束

}

4。下面来演示一下如何使用

//1.创建线程池对象

MyThreadPool *pMyTP=new MyThreadPool;

if(!pMyTP || pMyTP->Error)

{

//创建出错。

//。。。处理代码

}

//2.创建N个TMyThread线程对象,并加入线程池

TMyThread *pThread;

for(int i=0;i<N;i++)

{

pThread=new TMyThread(true,pMyTP->Handle);

::SendMessage(pMyTP->Handle,TP_AddThread,(long)pThread,0);

}

//3.从线程池中去空闲线程

TMyThread *pThread;

pThread=(TMyThread *)::SendMessage(pMyTP->Handle,TP_GetFreeThread,0,0);

if(pThread)//如果有空闲线程

{

pThread->Resume();

}

这样就大致实现了线程池的功能。这种东西我已经用在了程序里了。效果还不错节省了不少资源。但是可能和书上说的线程池还是有很大差别的。

我们暂且叫它线程池A。反正已经达到我开始的时候设计它的目的,节省了资源。

就当是抛砖引玉吧。希望这些代码能给兄弟们一点启发,对兄弟门有用。

对该文的评论

losthold ( 2004-07-05)

楼主的这种线程池处理模式还存在问题,在数据量较大的时候线程会出现死锁,主要体现在Working和Suspended都是true,以上结论是在5线程,每秒处理1500个数据的环境下测试得出的,大概5分钟-半小时左右就会有线程出现死锁。

enoloo ( 2004-03-13)

同意 Darkay_Lee。我还想知道的是如果让多个不同的任务执行,线程池还有没有优势?

在哪里能找到比较完整的线程池的实现方法?

谢谢。

kataboy ( 2004-03-04)

不错,一个比较好的方法,支持支持!

帮贴主更正一下:

void __fastcall (TMessage &Message);//消息处理函数

应该改为:

void __fastcall MyProc(TMessage &Message);//消息处理函数

应该是这样子吧!^_^

hotcat ( 2004-03-03)

to:xjcpower

你没有理解作者的意思,只有调用terminate的时候才将线程删除,在一个线程没有收到terminate时,且自己的本职工作已经完成了,则进入休眠状态,将来可以被唤醒!

LeslieY ( 2004-02-25)

我有一点疑惑,为什么用消息机制,而不是直接调用线程池的函数?

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

//-----------------------------------------------------------------------------------------------------------

主  题: cb中怎么创建线程池??

作  者: CoolSoftBird (Bird)

等  级:

信 誉 值: 91

所属社区: C++ Builder 基础类

问题点数: 60

回复次数: 7

发表时间: 2004-7-15 18:11:58

1:要实现的功能:手机上网看电影,我要写个服务端。

2:我的方案:客户发个请求,我就创建一个线程。

开始用createthread来做,势必造成资源的浪费以及内存碎片

所以想用线程池来做?怎么用线程池?????

回复人: CoolSoftBird(Bird) ( ) 信誉:91 2004-7-15 18:12:20 得分: 0

顶啊

Top

回复人: robbyzi(红客robby)(目标★★★) ( ) 信誉:100 2004-7-15 18:32:49 得分: 40

http://zydlm.wxhc.com.cn/down_view.asp?id=3

实现线程池与在线程中实现定时器的功能源码[C++Builder6]

这个例子也许对你有帮助。

Top

回复人: constantine(飘遥的安吉儿) ( ) 信誉:101 2004-7-16 9:38:17 得分: 20

http://zydlm.wxhc.com.cn/forum_view.asp?forum_id=7&view_id=378

看看,不错哦

Top

回复人: pp616(傻小子) ( ) 信誉:97 2004-7-16 10:55:42 得分: 0

:)

Top

回复人: wdh924(秦歌) ( ) 信誉:100 2004-7-16 14:07:53 得分: 0

:)

Top

回复人: CoolSoftBird(Bird) ( ) 信誉:91 2004-7-17 8:31:32 得分: 0

大家,不要保守,,说说,,,

Top

回复人: CoolSoftBird(Bird) ( ) 信誉:91 2004-7-19 8:40:16 得分: 0

constantine(飘遥的安吉儿) ( )

请问开多少线程比较合适???

Top

该问题已经结贴 ,得分记录: robbyzi (40)、 constantine (20)、

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

主  题: 没办法,不得不又讨论一下线程池的基本技术了!高手请进!

作  者: wenminghu (看到人生的希望)

等  级:

信 誉 值: 100

所属社区: VC/MFC 进程/线程/DLL

问题点数: 100

回复次数: 20

发表时间: 2004-7-2 11:41:41

我有个服务线程,里面是客户端来个连接请求,就开一个线程跟客户端通信,现在的问题是,我要在服务器端限制客户端的连接数目。

那么我怎么知道当前有多少个客户端跟服务器相连接呢?

保存线程句柄?

但是怎么知道和客户端通信的线程结束了呢?

谢谢!

最好有个代码示例!

回复人: flyelf(空谷清音) ( ) 信誉:121 2004-7-2 12:17:45 得分: 0

我也遇到类似的问题,目前采用的是保存句柄的方法

期待更好的方法

Top

回复人: lianglp() ( ) 信誉:105 2004-7-2 12:26:50 得分: 20

我想设置一个结构保存句柄和线程ID相关联等信息比较好。

与楼上差不多!

Top

回复人: elssann() ( ) 信誉:97 2004-7-2 12:39:51 得分: 20

???

设置一个计数,连接成功后就+1,断开后就-1

有什么问题吗?

我在完成端口网络服务程序里就这么做的

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 12:46:40 得分: 0

问题,怎么知道客户端断开连接了呢?

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 12:50:12 得分: 0

定时检测句柄是否是0?

Top

回复人: runall(龙行天下) ( ) 信誉:100 2004-7-2 12:53:22 得分: 0

gz

Top

回复人: dongfa(阿东) ( ) 信誉:99 2004-7-2 12:54:52 得分: 20

应该增加一个函数来检测客户是否断开,然后定时刷新。就像QQ等软件一样,否则没有太好的办法。

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 13:03:45 得分: 0

大家兴趣来了,我就贴一段别人的代码吧,大家帮我分析一下:谢谢!

#ifndef OTA_THREADPOOL_H_CAESAR__DEF

#define OTA_THREADPOOL_H_CAESAR__DEF

#include <windows.h>

class CWorkDesc

{

public:

CWorkDesc(){}

virtual ~CWorkDesc(){}

};

class CWork

{

public:

CWork(){}

virtual ~CWork(){}

virtual void ProcessJob(CWorkDesc* pJob)=0;

};

class CThreadPool

{

friend static unsigned __stdcall ManagerProc(void* pThread);

friend static unsigned __stdcall WorkerProc(void* pThread);

typedef struct THREADINFO // 线程信息

{

THREADINFO()

{

hThreadHandle = NULL;

dwThreadID = 0;

bIsBusy = false;

bIsQuit = false;

}

HANDLE hThreadHandle;

unsigned dwThreadID;

volatile bool bIsBusy;

volatile bool bIsQuit;

}*LPTHREADINFO;

public:

CThreadPool();

virtual ~CThreadPool();

enum EReturnValue

{

MANAGERPROC_RETURN_VALUE = 10001,

WORKERPROC_RETURN_VALUE = 10002

};

enum EThreadStatus

{

BUSY,

NORMAL,

IDLE

};

bool Start(DWORD dwStatic, DWORD dwMax);

void Stop(void);

void ProcessJob(CWorkDesc* pJob, CWork* pWorker) const;

protected:

static unsigned __stdcall ManagerProc(void* pThread);

static unsigned __stdcall WorkerProc(void* pThread);

LPTHREADINFO m_pThreadInfo;

volatile DWORD m_dwStaticThreadNum; // 预先分配线程数目

volatile DWORD m_dwMaxThreadNum; // 最大线程数目

volatile bool m_bQuitManager;

DWORD m_dwMSeconds;

HANDLE m_hManagerIO;

HANDLE m_hWorkerIO;

HANDLE m_hManagerThread;

CRITICAL_SECTION m_csLock;

private:

CThreadPool::EThreadStatus GetWorkThreadStatus();

void AddThread(void);

void DelThread(void);

int GetThreadbyID(DWORD dwID);

};

#endif

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 13:04:19 得分: 0

#include <afxwin.h>

#include <process.h>

#include "../include/ThreadPool.h"

CThreadPool::CThreadPool()

{

m_hManagerIO = NULL;

m_hWorkerIO = NULL;

m_hManagerThread= NULL;

m_pThreadInfo = NULL;

m_dwMSeconds = 200;

m_bQuitManager = false;

//初使化互斥体

::InitializeCriticalSection(&m_csLock);

}

CThreadPool::~CThreadPool()

{

//关闭IO

if(m_hManagerIO)

{

::CloseHandle(m_hManagerIO);

}

if(m_hWorkerIO)

{

::CloseHandle(m_hWorkerIO);

}

if(m_pThreadInfo)

{

delete [] m_pThreadInfo;

}

//关闭互斥体

::DeleteCriticalSection(&m_csLock);

}

bool CThreadPool::Start(DWORD dwStatic, DWORD dwMax)

{

if(!(dwStatic && dwMax))

{

return false;

}

m_dwStaticThreadNum = dwStatic;

m_dwMaxThreadNum = dwMax;

//LOCK

::EnterCriticalSection(&m_csLock);

//创建工作线程数据

if(m_pThreadInfo)

{

delete [] m_pThreadInfo;

m_pThreadInfo = NULL;

}

m_pThreadInfo = new THREADINFO[dwMax]();

if(m_pThreadInfo == NULL)

{

return false;

}

//创建异步的不关联文件的完全IO端口

if(m_hManagerIO)

{

::CloseHandle(m_hManagerIO);

m_hManagerIO = NULL;

}

m_hManagerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

if(m_hManagerIO == NULL)

{

return false;

}

if(m_hWorkerIO)

{

::CloseHandle(m_hWorkerIO);

m_hManagerIO = NULL;

}

m_hWorkerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

if(m_hWorkerIO == NULL)

{

return false;

}

//创建管理线程

m_bQuitManager = false;

if(m_hManagerThread)

{

::TerminateThread(m_hManagerThread,0);

m_hManagerThread = NULL;

}

unsigned ManagerThreadID;

m_hManagerThread = (HANDLE)_beginthreadex( NULL,

0,

ManagerProc,

this,

0,

&ManagerThreadID);

if(m_hManagerThread == NULL)

{

return false;

}

//创建工作线程

for(WORD i=0;i < dwStatic; i++)

{

m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex(NULL,

0,

WorkerProc,

this,

0,

&m_pThreadInfo[i].dwThreadID);

}

//UNLOCK

::LeaveCriticalSection(&m_csLock);

return true;

}

void CThreadPool::Stop(void)

{

//LOCK

::EnterCriticalSection(&m_csLock);

//向管理线程发送关闭线程消息

m_bQuitManager = true;

//判断并关闭管理线程

DWORD dwRes = 0;

for(int i=0; i<10; i++)

{

::GetExitCodeThread(m_hManagerThread, &dwRes);

if(dwRes == CThreadPool::MANAGERPROC_RETURN_VALUE)

{

break;

}

if(i == 9)

{

//关闭线程

::TerminateThread(m_hManagerThread,0);

}

else

{

::Sleep(1000);

}

}

//关闭IO

::CloseHandle(m_hManagerIO);

m_hManagerIO = NULL;

//关闭所有工作线程

for(DWORD dwi=0; dwi < m_dwMaxThreadNum; dwi++)

{

if(m_pThreadInfo[dwi].dwThreadID == 0)

{

continue;

}

m_pThreadInfo[dwi].bIsQuit = true;

for(int j=0; j<10; j++)

{

::GetExitCodeThread(m_pThreadInfo[dwi].hThreadHandle,&dwRes);

if(dwRes == CThreadPool::WORKERPROC_RETURN_VALUE)

{

break;

}

if(j == 9)

{

::TerminateThread(m_pThreadInfo[dwi].hThreadHandle,0);

}

else

{

::Sleep(500);

}

}

}

//关闭IO

::CloseHandle(m_hWorkerIO);

m_hWorkerIO = NULL;

//删除线程结构

if(m_pThreadInfo)

{

delete [] m_pThreadInfo;

m_pThreadInfo = NULL;

}

//删除所有代处理数据

unsigned long pN1 = 0;

unsigned long pN2 = 0;

OVERLAPPED* pOverLapped;

while(::GetQueuedCompletionStatus(m_hWorkerIO,&pN1,&pN2,&pOverLapped,0))

{

CWork* pWork = reinterpret_cast<CWork*>(pN1);

CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2);

delete pWorkDesc;

}

//UNLOCK

::LeaveCriticalSection(&m_csLock);

}

void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const

{

::PostQueuedCompletionStatus(

m_hWorkerIO,

reinterpret_cast<DWORD>(pWorker),

reinterpret_cast<DWORD>(pJob),

NULL);

}

unsigned __stdcall CThreadPool::ManagerProc(void* pThread)

{

unsigned long pN1 = 0;

unsigned long pN2 = 0;

OVERLAPPED* pOverLapped;

CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread);

while(!pServer->m_bQuitManager)

{

if(::GetQueuedCompletionStatus(pServer->m_hManagerIO,&pN1,&pN2,&pOverLapped,pServer->m_dwMSeconds) == TRUE)

{

if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)

{

//收到退出码

break;

}

}

else

{

//超时,判断工作线程状态

EThreadStatus stat = pServer->GetWorkThreadStatus();

if(stat == CThreadPool::BUSY)

{

puts("Add thread");

pServer->AddThread();

}

else

if(stat == CThreadPool::IDLE)

{

puts("Del thread");

pServer->DelThread();

}

}

};

_endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE);

return CThreadPool::MANAGERPROC_RETURN_VALUE;

}

unsigned __stdcall CThreadPool::WorkerProc(void* pThread)

{

unsigned long pN1 = 0;

unsigned long pN2 = 0;

OVERLAPPED* pOverLapped = 0;

CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread);

DWORD dwThreadID = ::GetCurrentThreadId();

int nSeq = pServer->GetThreadbyID(dwThreadID);

if(nSeq < 0)

{

return 0;

}

while(!pServer->m_pThreadInfo[nSeq].bIsQuit)

{

if(::GetQueuedCompletionStatus( pServer->m_hWorkerIO,

&pN1,

&pN2,

&pOverLapped,

pServer->m_dwMSeconds))

{

CWork* pWork = reinterpret_cast<CWork*>(pN1);

CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2);

printf("do work/n");

//在工作之前将状态设置为Busy

pServer->m_pThreadInfo[nSeq].bIsBusy = true;

//工作处理过程

pWork->ProcessJob(pWorkDesc);

delete pWorkDesc;

//在工作后将状态设置为非Busy

pServer->m_pThreadInfo[nSeq].bIsBusy = false;

printf("do work over/n");

}

}

printf("worker thread down/n");

//退出之前将线程ID设置为0

pServer->m_pThreadInfo[nSeq].dwThreadID = 0;

_endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE);

return CThreadPool::WORKERPROC_RETURN_VALUE;

}

CThreadPool::EThreadStatus CThreadPool::GetWorkThreadStatus()

{

float fAll=0.0;

float fRun=0.0;

for(DWORD i=0; i < m_dwMaxThreadNum; i++)

{

if(m_pThreadInfo[i].dwThreadID)

{

fAll++;

if(m_pThreadInfo[i].bIsBusy)

{

fRun++;

}

}

}

if(fAll == 0)

return CThreadPool::IDLE;

if(fRun/(1.0*fAll)>0.8)

{

return CThreadPool::BUSY;

}

if(fRun/(1.0*fAll)<0.2)

{

return CThreadPool::IDLE;

}

return CThreadPool::NORMAL;

}

void CThreadPool::AddThread(void)

{

for(DWORD i = m_dwStaticThreadNum; i < m_dwMaxThreadNum; i++)

{

if(!m_pThreadInfo[i].dwThreadID)

{

m_pThreadInfo[i].bIsBusy = false;

m_pThreadInfo[i].bIsQuit = false;

m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex( NULL,

0,

WorkerProc,

this,

0,

&m_pThreadInfo[i].dwThreadID);

break;

}

}

}

void CThreadPool::DelThread(void)

{

for(DWORD i = m_dwMaxThreadNum; i > m_dwStaticThreadNum; i--)

{

if(m_pThreadInfo[i-1].dwThreadID)

{

m_pThreadInfo[i-1].bIsQuit = true;

::Sleep(m_dwMSeconds);

break;

}

}

}

int CThreadPool::GetThreadbyID(DWORD dwID)

{

for(DWORD i=0; i < m_dwMaxThreadNum; i++)

{

if(m_pThreadInfo[i].dwThreadID == dwID)

{

return i;

}

}

return -1;

}

Top

回复人: lianglp() ( ) 信誉:105 2004-7-2 13:05:52 得分: 0

楼上方法可行!

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 13:08:43 得分: 0

分析下先,我还没看全懂,主要是端口完成部分。

是不是动不动就要用端口完成啊?

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 13:10:01 得分: 0

就搞到一份源代码,怎么使用还不得而知。郁闷。。。

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-2 13:20:44 得分: 0

大家有没有发现,作者在同一份代码里面使用了两次下面两个函数,何故?

friend static unsigned __stdcall ManagerProc(void* pThread);

friend static unsigned __stdcall WorkerProc(void* pThread);

各位高手发发言啊。

Top

回复人: cident() ( ) 信誉:100 2004-7-2 16:10:52 得分: 0

看了看,ProcessJob(para1, para2)的功能不明白!

Top

回复人: elssann() ( ) 信誉:97 2004-7-2 17:02:28 得分: 20

复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-07-02 12:46:00 得分: 0

问题,怎么知道客户端断开连接了呢?

--------------------------------------------------------------------------

当在某个连接上正在有一个IO操作(比如send/recv or WSASend/WSARecv)的时候,客户端断开了IO会失败,这时候就知道了。

如果在某个连接上没有IO操作,客户端主动断开了连接,我们是无法知道,但是在我们系统中,我们是定时有KEEPLIVE的包发送,当客户主动断开连接后,这个KEEPLIVE包的发送就会失败,也就知道了客户端已经断开了

Top

回复人: cident() ( ) 信誉:100 2004-7-2 17:11:31 得分: 0

楼上的,分析一下,如果是socket程序用iocp来实现,那么服务器端预先分配的线程怎么和来的客户端socket联系起来进而进行通信呢?

Top

回复人: elssann() ( ) 信誉:97 2004-7-3 16:43:29 得分: 0

这个问题我们已经解决了,但是一下说不清楚,,

Top

回复人: wenminghu(看到人生的希望) ( ) 信誉:100 2004-7-4 13:08:10 得分: 0

楼上的有msn嘛?

Top

回复人: yuanbocsut(打盹的神仙) ( ) 信誉:92 2004-7-6 17:12:05 得分: 0

UP

Top

回复人: everandforever(Forever) ( ) 信誉:110 2004-7-6 18:03:10 得分: 20

iocp里面线程和客户端没有一一对应的关系. 碰到谁就为谁服务.

Top

该问题已经结贴 ,得分记录: lianglp (20)、 elssann (20)、 dongfa (20)、 elssann (20)、 everandforever (20)、

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

请问:我想作一个简单的线程池,但是我不能确定一个事情,请对线程有研究朋友看看

作  者: rexfa (牙医-Rex)

等  级:

信 誉 值: 89

所属社区: Java J2SE / 基础类

问题点数: 50

回复次数: 6

发表时间: 2004-4-9 10:42:38

我不能确定:是保持已经完成任务线程实例,以后重新对内部数据赋值再次使用,这样好呢? 还是,用过即丢,把用过的线程交给VM回收,这样好么?

我可以多占一些内存,但是希望有高一些的效率。

好一段没来 发现信誉下降,是自己疏忽了

回复人: rexfa(牙医-Rex) ( ) 信誉:89 2004-4-9 14:49:17 得分: 0

up

Top

回复人: Polarislee(北极星) ( ) 信誉:105 2004-4-9 15:36:15 得分: 15

"保持已经完成任务线程实例,以后重新对内部数据赋值再次使用"

这是不可能的,Java中,一个线程一旦执行完毕时不能重新开始的——就是说对于已经结束的线程对象再次调用start方法是不允许的。

Top

回复人: 19830711(为你守候) ( ) 信誉:100 2004-4-9 16:58:40 得分: 0

关注中

Top

回复人: bigcrazy(大疯狂) ( ) 信誉:100 2004-4-9 17:19:06 得分: 10

实现起来比较复杂,有个思路:

你的线程首先应该是个有条件的无限循环,每次循环是一次工作如:

while(!stop){

//do something again and again

}

其次,线程执行完工作后,把自己放到池里。

再次,线程要用到wait()方法,完成工作后等待。

然后是有个标志来判断是否赋新值,如果赋了新值,并被激活,则根据该标志,决定是往下循环执行,还是继续等待。

最后,调度线程在池中取线程,向其赋值后,使用notifyAll激活所有线程。

Top

回复人: jewelsh(▲▲▲▲▲) ( ) 信誉:100 2004-4-9 22:36:41 得分: 15

本人目前项目中也有类似情况,我把这称为“资本家和工人”问题。打个比方,工厂有一些活要做,将工人分成三班,第一班做完布置的任务后就下班休息了,接着就是第二班、第三班跟进,很典型的“三班倒”,这是代表“工人阶级”立场的。还有一种方式就是所有工人不分早中晚,一班到底,当一个时段的任务完成后,下一任务还没来之前的这段时间内作等待状态,以等待继续被剥削._>:),这是代表“资本家”利益的。在线程任务中,工人就代表着线程,在作业效率方面,到底是让一个线程完成任务后进入等待状态,以等待下一次任务的唤醒一继续任务高效呢?还是当次完成任务就释放线程,下次有任务再新开线程高效一些呢?本人在项目中没有能够得出结论,希望大家多多讨论。

"保持已经完成任务线程实例,以后重新对内部数据赋值再次使用" Polarislee(北极星) 朋友说得没错,一个Thread stop后是不能再次被start的,关键是让thread结束任务后不被stop而是进入wait状态,有新任务后再notify激活。利用bigcrazy(大疯狂) 朋友的思路完全可行。

Top

回复人: taolei(实在无聊) ( ) 信誉:100 2004-4-9 23:26:59 得分: 10

/**

* free software

* from apusic

* by www.cn-java.com 2001

*/

import java.util.LinkedList;

public class ThreadPool

{

static final long IDLE_TIMEOUT = 60000L;

private String name;

private int minsize;

private int maxsize;

private int nextWorkerId = 0;

private LinkedList pool = new LinkedList();

public ThreadPool()

{

this("PooledThread");

}

public ThreadPool(String name)

{

this(name, 0, 20);

}

public ThreadPool(String name, int minsize, int maxsize)

{

this.name = name;

this.minsize = minsize;

this.maxsize = maxsize;

}

public synchronized void setSize(int min,int max)

{

minsize = min;

maxsize = max;

}

public synchronized int getMinSize()

{

return minsize;

}

public synchronized int getMaxSize()

{

return maxsize;

}

public synchronized void run(Runnable runner)

{

Worker worker;

if(runner == null)

{

throw new NullPointerException();

}

// get a worker from free list...

if(!pool.isEmpty())

{

worker = (Worker)pool.removeFirst();

}

else

{

// ...no free worker available, create new one...

worker = new Worker(name + "-" + ++nextWorkerId);

worker.start();

}

// ...and wake up worker to service incoming runner

worker.wakeup(runner);

}

// Notified when a worker has idled timeout

// @return true if worker should die, false otherwise

synchronized boolean notifyTimeout(Worker worker)

{

if(worker.runner != null)

{

return false;

}

if(pool.size() > minsize)

{

// Remove from free list

pool.remove(worker);

return true; // die

}

return false; // continue

}

// Notified when a worker has finished his work and

// free to service next runner

// @return true if worker should die, false otherwise

synchronized boolean notifyFree(Worker worker)

{

if(pool.size() < maxsize)

{

// Add to free list

pool.addLast(worker);

return false; // continue

}

return true; // die

}

// The inner class that implement worker thread

class Worker extends Thread

{

Runnable runner = null;

public Worker(String name)

{

super(name);

this.setDaemon(true);

}

synchronized void wakeup(Runnable runner)

{

this.runner = runner;

notify();

}

public void run()

{

for(; ; )

{

synchronized(this)

{

if(runner == null)

{

try

{

wait(IDLE_TIMEOUT);

}

catch(InterruptedException e)

{}

}

}

// idle timed out, die or put into free list

if(runner == null)

{

if(notifyTimeout(this))

return;

else

continue;

}

try

{

runner.run();

}

finally

{

runner = null;

if(notifyFree(this))

return;

}

}

}

}

}

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

http://community.csdn.net/Expert/topic/2755/2755676.xml

njg_jh(糨糊) ( ) 信誉:99 2004-2-21 20:25:43 得分:15

方案1:异步调用函数

假设有一个服务器进程,该进程有一个主线程,正在等待客户机的请求。当主线程收到该

请求时,它就产生一个专门的线程,以便处理该请求。这使得应用程序的主线程循环运行,并

等待另一个客户机的请求。这个方案是客户机/服务器应用程序的典型实现方法。虽然它的实

现方法非常明确,但是也可以使用新线程池函数来实现它。

当服务器进程的主线程收到客户机的请求时,它可以调用下面这个函数:

BOOL QueueUserWorkItem(

LPTHREAD_START_ROUTINE Function, // starting address

PVOID Context, // function data

ULONG Flags // worker options

);

该函数将一个“工作项目”排队放入线程池中的一个线程中并且立即返回。所谓工作项

目是指一个(用p f n C a l l b a c k参数标识的)函数,它被调用并传递单个参数p v C o n t e x t。最后,线程池中的某个线程将处理该工作项目,导致函数被调用。所编的回调函数必须采用下面的

原型:DWORD WINAPI WorkItemFunc(PVOID pvContext);

注意,你自己从来不调用C r e a t e T h r e a d。系统会自动为你的进程创建一个线程池,线程池中的一个线程将调用你的函数。

方案2:按规定的时间间隔调用函数

若要调度在某个时间运行的工作项目,首先要调用下面的函数,创建一个定时器队列:

HANDLE CreateTimerQueue();

定时器队列对一组定时器进行组织安排。例如,有一个可执行文件控制着若干个服务程序。每个服务程序需要触发定时器,以帮助保持它的状态,比如客户机何时不再作出响应,何时收集和更新某些统计信息等。让每个服务程序占用一个等待定时器和专用线程,这是不经济的。相反,每个服务程序可以拥有它自己的定时器队列(这是个轻便的资源),并且共享定时器组件的线程和等待定时器对象。当一个服务程序终止运行时,它只需要删除它的定时器队列即可,因为这会删除该队列创建的所有定时器。

一旦拥有一个定时器队列,就可以在该队列中创建下面的定时器:

BOOL CreateTimerQueueTimer(

PHANDLE phNewTimer, // handle to timer

HANDLE TimerQueue, // handle to timer queue

WAITORTIMERCALLBACK Callback, // timer callback function

PVOID Parameter, // callback parameter

DWORD DueTime, // timer due time

DWORD Period, // timer period

ULONG Flags // options

);

对于第二个参数,可以传递想要在其中创建定时器的定时器队列的句柄。如果只是创建少

数几个定时器,只需要为h Ti m e r Q u e u e参数传递N U L L,并且完全避免调用C r e a t e Ti m e r Q u e u e函数。传递N U L L,会告诉该函数使用默认的定时器队列,并且简化了你的代码。p f n C a l l b a c k和p v C o n t e x t参数用于指明应该调用什么函数以及到了规定的时间应该将什么传递给该函数。d w D u e Ti m e参数用于指明应该经过多少毫秒才能第一次调用该函数(如果这个值是0,那么只要可能,就调用该函数,使得C
r e a t e Ti m e r Q u e u e Ti m e r函数类似Q u e u e U s e r Wo r k I t e m)。

d w P e r i o d参数用于指明应该经过多少毫秒才能在将来调用该函数。如果为d w P e r i o d传递0,那么就使它成为一个单步定时器,使工作项目只能进行一次排队。新定时器的句柄通过函数的p h N e w Ti m e r参数返回。

工作回调函数必须采用下面的原型:

VOID WINAPI WaitOrTimerCallback(

PVOID pvContext,

BOOL fTimerOrWaitFired);

当不再想要触发定时器时,必须通过调用下面的函数将它删除:

BOOL DeleteTimerQueueTimer(

HANDLE TimerQueue, // handle to timer queue

HANDLE Timer, // handle to timer

HANDLE CompletionEvent // handle to completion event

);

一旦创建了一个定时器,可以调用下面这个函数来改变它的到期时间和到期周期:

BOOL ChangeTimerQueueTimer(

HANDLE TimerQueue, // handle to timer queue

HANDLE Timer, // handle to timer

ULONG DueTime, // timer due time

ULONG Period // timer period

);

当不再需要一组定时器时,可以调用下面这个函数,删除定时器队列:

BOOL DeleteTimerQueueEx(

HANDLE TimerQueue, // handle to timer queue

HANDLE CompletionEvent // handle to completion event

);

方案3:当单个内核对象变为已通知状态时调用函数

M i c r o s o f t发现,许多应用程序产生的线程只是为了等待内核对象变为已通知状态。一旦对象得到通知,该线程就将某种通知移植到另一个线程,然后返回,等待该对象再次被通知。有些编程人员甚至编写了代码,在这种代码中,若干个线程各自等待一个对象。这对系统资源是个很大的浪费。当然,与创建进程相比,创建线程需要的的开销要小得多,但是线程是需要资源的。每个线程有一个堆栈,并且需要大量的C P U指令来创建和撤消线程。始终都应该尽量减少它使用的资源。

如果想在内核对象得到通知时注册一个要执行的工作项目,可以使用另一个新的线程池函

数:

BOOL RegisterWaitForSingleObject(

PHANDLE phNewWaitObject, // wait handle

HANDLE hObject, // handle to object

WAITORTIMERCALLBACK Callback, // timer callback function

PVOID Context // callback function parameter

ULONG dwMilliseconds, // time-out interval

ULONG dwFlags // options

);

工作回调函数必须采用下面的原型:

VOID WINAPI WaitOrTimerCallbackFunc(

PVOID pvContext,

BOOLEAN fTimerOrWaitFired);

调用下面这个函数,可以取消等待组件的注册状态:

BOOL UnregisterWaitEx(

HANDLE WaitHandle, // wait handle

HANDLE CompletionEvent // completion event

);

方案4:当异步I / O请求完成运行时调用函数

最后一个方案是个常用的方案,即服务器应用程序发出某些异步I / O请求,当这些请求完成时,需要让一个线程池准备好来处理已完成的I / O请求。这个结构是I / O完成端口原先设计时所针对的一种结构。如果要管理自己的线程池,就要创建一个I / O完成端口,并创建一个等待该端口的线程池。还需要打开多个I / O设备,将它们的句柄与完成端口关联起来。当异步I / O请求完成时,设备驱动程序就将“工作项目”排队列入该完成端口。

这是一种非常出色的结构,它使少数线程能够有效地处理若干个工作项目,同时它又是一

种很特殊的结构,因为线程池函数内置了这个结构,使你可以节省大量的设计和精力。若要利

用这个结构,只需要打开设备,将它与线程池的非I / O组件关联起来。记住, I / O组件的线程全部在一个I / O组件端口上等待。若要将一个设备与该组件关联起来,可以调用下面的函数:

BOOL BindIoCompletionCallback(

HANDLE FileHandle, // handle to file

LPOVERLAPPED_COMPLETION_ROUTINE Function, // callback

ULONG Flags // reserved

);

该函数在内部调用C r e a t e I o C o m p l e t i o n P o r t,传递h D e v i c e和内部完成端口的句柄。调用B i n d I o C o m p l e t i o n C a l l b a c k也可以保证至少有一个线程始终在非I / O组件中。与该设备相关联的完成关键字是重叠完成例程的地址。这样,当该设备的I / O运行完成时,非I / O组件就知道要调用哪个函数,以便它能够处理已完成的I / O请求。该完成例程必须采用下面的原型:

VOID WINAPI OverlappedCompletionRoutine(

DWORD dwErrorCode,

DWORD dwNumberOfBytesTransferred,

POVERLAPPED pOverlapped);

回复人: jiangsheng(蒋晟.MSMVP2004Jan) ( ) 信誉:253 2004-2-24 20:01:31 得分:70

大吞吐量的话推荐完成端口

在基于MFC的IIS扩展中创建线程池

http://support.microsoft.com/support/kb/articles/Q197/7/28.asp

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

http://community.csdn.net/Expert/topic/2975/2975870.xml?temp=.4083368

yujia120(于佳) ( ) 信誉:99 2004-04-17 09:24:30Z 得分: 50

http://www.codeproject.com/threads/work_queue.asp

enoloo(行者无疆) ( ) 信誉:102 2004-04-17 09:15:51Z 得分: 50

http://www.codeproject.com/threads/multi_threaded_job_queue.asp

这有一个~

Top

回复人: enoloo(行者无疆) ( ) 信誉:102 2004-04-17 09:17:23Z 得分: 0

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

http://www.codeproject.com/threads/diggerthreadpool.asp

如果连接客户过多,线程池如何处理呢?

作  者: acqy (Just Programmer)

等  级:

信 誉 值: 94

所属社区: Linux/Unix社区 程序开发区

问题点数: 50

回复次数: 20

发表时间: 2004-02-27 15:12:36Z

服务器程序使用线程池,比如开始的时候派生1000个线程来处理客户请求,当有1001个客户都请求连接那应该怎么办?

回复人: rexp(沧浪客) ( ) 信誉:99 2004-02-27 16:45:32Z 得分: 0

1. 选用一个现在空闲的线程

2. 继续创建新的线程。

Top

回复人: nsly(oRunner (Yama)) ( ) 信誉:100 2004-02-27 18:18:32Z 得分: 0

初始线程数目就是最大允许线程数目吗?我觉得不合理。

Top

回复人: acqy(Just Programmer) ( ) 信誉:94 2004-03-01 08:24:49Z 得分: 0

继续讨论:如果这个服务器需要连接的客户数量超过1千万,那么我首先派生出来1000个线程明显不满足要求,假设连接上的客户不会马上退出(也就是预先派生的线程不会马上结束),那么要服务剩下的1千万-1000个用户就需要临时创建线程。我想知道的是,如果连接客户数很大,趋于无穷,那么就会有更多的连接处理需要临时创建线程。那么预先派生线程有什么意义呢?

Top

回复人: hoyt(hoyt) ( ) 信誉:100 2004-03-01 08:35:02Z 得分: 0

你的一台机器能处理1千万个连接吗?好像大型机都不行吧

Top

回复人: mechgoukiteng(变态是一种生活态度) ( ) 信誉:100 2004-03-01 09:31:31Z 得分: 0

1千万的连接根本是pc和linux/uinx/win无法完成的

只有使用vxworks这些实时os和硬件配合 才可能达到

Top

回复人: neil78duan(失落得程序员) ( ) 信誉:100 2004-03-01 13:46:13Z 得分: 0

有必要一个连接用一个线程嘛?

Top

回复人: mechgoukiteng(变态是一种生活态度) ( ) 信誉:100 2004-03-01 14:00:18Z 得分: 0

如果基于消息的设计

可以使用leader/follower的模式设计

Top

回复人: acqy(Just Programmer) ( ) 信誉:94 2004-03-01 17:32:31Z 得分: 0

那如果是一个服务器集群呢?能否处理1000万个连接?或者说是否还有其他的办法来处理这些连接?

Top

回复人: beipiao(北漂) ( ) 信誉:100 2004-03-01 17:40:09Z 得分: 0

有趣的假设,顶

Top

回复人: acqy(Just Programmer) ( ) 信誉:94 2004-03-02 08:42:52Z 得分: 0

我觉得这样的问题我们应该讨论,而且应该讨论清楚。可能这样的问题对有些高手来说可以算是小儿科,但我也希望高手能“高抬贵手”,给我们一点建议和提示,让我们能对这些问题有更加清楚和深刻的了解。

做学问和数学一样,黑白逻辑,懂或者不懂,不应该有模棱两可的情况。

Top

回复人: acqy(Just Programmer) ( ) 信誉:94 2004-03-02 08:47:52Z 得分: 0

这里大家也可以把服务器设计中的一些问题拿出来讨论

Top

回复人: lovepeacer(netboy) ( ) 信誉:100 2004-03-12 11:17:24Z 得分: 0

dddd

Top

回复人: neil78duan(失落得程序员) ( ) 信誉:100 2004-03-24 09:49:16Z 得分: 0

1000万,别开玩笑了,一台服务器肯定是不可能完成的.有这么强的网卡吗?有这么强的操作系统吗?

应该使用集群,负载均衡的办法

Top

回复人: mechgoukiteng(变态是一种生活态度) ( ) 信誉:100 2004-03-24 10:01:36Z 得分: 0

可以参考LVS的实现

不过1000万实连接实在太吓人了

Top

回复人: littlegang(Gang) ( ) 信誉:100 2004-03-24 17:43:38Z 得分: 0

线程池的出发点不是说增加连接的吞吐量,而是减少系统建立线程、撤销线程的开销

池内的线程是一直在的,通常是完成一项特定的任务,而这个任务常常是重复性较高的,

当工作完成后,这个线程就马上又可以进入空闲状态,为下一次工作服务

一个用户的连接,通常不是一个任务就完成的,也不大可能由一个线程从头到尾来负责(当然一般的应用中也可以这样做)

如果池内的线程确实不够用,也可以动态地变化池的大小,以容纳更多的工作线程

Top

回复人: morebin(morebin) ( ) 信誉:100 2004-03-24 21:35:30Z 得分: 0

服务器程序使用线程池,比如开始的时候派生1000个线程来处理客户请求,当有1001个客户都请求连接,那么这个客户只能等到下一个线程空闲时,来处理它。apache就这样做的,在listen的第二个参数里指定等待的连接的最大个数,如果超过这个数,客户的connect会失败。

Top

回复人: yyy159(牧人) ( ) 信誉:100 2004-03-30 12:52:58Z 得分: 0

要用集群了,把连接改到那个特定的IP就该可以了

Top

回复人: hmaple(hmaple) ( ) 信誉:100 2004-03-30 22:44:25Z 得分: 0

线程池不是这样用的。

线程的创建和销毁是极耗系统资源的。上千个线程争夺CPU时间,结果是所有的线程都没有时间。

即使有1000个客户请求,绝大多数应用中每个客户每秒钟交换消息的信息量是很少的。所以线程池的一个线程足以及时的响应几十或上百个服务请求。

线程池中的线程数目是不变的。每收到一个请求,就将它分配给一个线程来处理。线程太多了,会使整个系统性能下降,太少了,又不能充分利用CPU。所以线程数目和CPU数目,系统性能有关。

在我们的系统中,每秒钟要处理100多个呼叫,每个呼叫持续几分钟,用SUN的NETRA220,只需要7,8个线程组成的线程池就搞定了。

Top

回复人: truebjhz(咖啡冷夜雨) ( ) 信誉:100 2004-04-13 21:33:03Z 得分: 0

1 一个线程不一定对应一个连接

2 一个机器不可能建立那么多连接,实际应用中,甚至连1000个都不能。

有些服务器的设计确实是先创建一定数量的线程,而且可能线程数就定下来了。

这样做的好处是不用再为线程的创建和撤消已经相关资源的分派开销时间。

另外,一个连接是一个任务,而线程则是用来执行任务的资源。就进程和CPU的关系差不多。

任务的执行可以是分时的。就是说一个连接可以不是时时刻刻都有一个线程为他做处理的。

这样,第1001个连接到来时,只不过会导致系统的整体响应速度有所下降而已。

当然,如果实际应用不允许这样的情况发生,就只好把第1001个挂起了。

如果是集群,那么肯定有一个负载平衡的部分,直接把这个连接送到还有空线程的服务器上去。

机群技术俺不怎么懂,不敢胡说了:)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: