您的位置:首页 > 其它

SP短信平台-线程池实现

2008-01-15 14:21 513 查看
这里不支持Delphi源代码高亮,将就着看看了:)

线程池的实现设计的时候考虑得过于复杂了,有些功能实际没有实现或者是实现了,也没有用到或去测试,只是使用了很少一部分的功能.

预分配线程->请求线程->释放线程->销毁线程,这是实际使用的过程,一些细节处理并没有用上,或者说我没有测试过,比如线程守护,双线程池等,还有什么防止线程互锁(这是一个复杂的问题,实际使用中出现的比率很低,所以我也没有太过于考虑和去实现,而且这里面有些实现是有问题的,很可能就引起程序崩溃,当然前提是出现线程互锁).

unit ajSvcThread;

interface

uses
Classes, Windows, SysUtils, SyncObjs, Contnrs, ActiveX;

const
MSG_E = '[Exception]%s[%s]';

type
TRThreadBase = class;
TOnMsg = procedure (Sender: TObject; const Msg: string) of object;
TRThreadExcute = procedure (AThread: TRThreadBase; var Release: Boolean; var
SleepTime: Integer) of object;
TThreadManageMsg = procedure (const Msg: string) of object;
pTaskItem = ^TTaskItem;
TTaskItem = record
Task: TRThreadExcute;
end;
TExcuteEvArr = array of TRThreadExcute;

TajCriticalSection = class(TSynchroObject)
protected
FSection: TRTLCriticalSection;
public
constructor Create;
destructor Destroy; override;
procedure Acquire; override;
procedure AcquireThread(AThread: TRThreadBase);
function AcquireTry(TryCount, WaitTime: Cardinal): Boolean;
procedure Enter;
procedure Leave;
procedure Release; override;
procedure ReleaseThread(AThread: TRThreadBase);
end;

TRThreadManage = class;
TRThreadBase = class(TThread)
private
FCriticalSection: TajCriticalSection;
FDestroyCount: Integer;
FGroupIndex: Integer;
FMaxSpark: Cardinal;
FMgr: TRThreadManage;
FOnExecute: TRThreadExcute;
FPerDecSparkTime: TDateTime;
FRelease: Boolean;
FSleepTime: Integer;
FSpark: Integer;
FSparkCycle: Integer;
protected
procedure DecSpark;
procedure doExecute;
procedure InitSpark;
procedure SetGroupIndex(const Value: Integer);
property OnExecute: TRThreadExcute read FOnExecute write FOnExecute;
public
constructor Create(AMgr: TRThreadManage);
destructor Destroy; override;
property CriticalSection: TajCriticalSection read FCriticalSection write
FCriticalSection;
property GroupIndex: Integer read FGroupIndex write SetGroupIndex;
property MaxSpark: Cardinal read FMaxSpark write FMaxSpark;
property Mgr: TRThreadManage read FMgr;
property Spark: Integer read FSpark;
property SparkCycle: Integer read FSparkCycle write FSparkCycle;
end;

TRPoolThread = class(TRThreadBase)
protected
procedure Execute; override;
end;

TRThreadManage = class(TObject)
private
FActive: Boolean;
FAutoExtend: Boolean;
FBusyBackPool: TStack;
FBusyCurPool: TStack;
FDestroyCountOnce: Integer;
FDestroyTimeOut: Integer;
FDestroyTryCount: Integer;
FExtended: Boolean;
FFreePool: TQueue;
FFurnacePool: TStack;
FMaxThread: Integer;
FMinFreeThread: Integer;
FOnManageMsg: TThreadManageMsg;
FPoolLock: TajCriticalSection;
FScanInterval: Integer;
FScanThread: TRThreadBase;
FShrinked: Boolean;
FTaskList: TStack;
FWaitFree: Boolean;
procedure AddBusy(AThread: TRThreadBase);
procedure AddFree; overload;
procedure AddFree(AThread: TRThreadBase); overload;
procedure AddFurnace(AThread: TRThreadBase);
procedure AddTask(pTask: pTaskItem);
procedure CheckPool;
procedure CheckSycnObj;
function CheckThreadSpark(AThread: TRThreadBase): TRThreadBase;
procedure DestroyAllThread;
procedure DestroyThread;
procedure DispTask;
procedure DoErr(Method: string; E: Exception);
procedure DoMsg(Msg: string);
function GetBusyCount: Integer;
function GetFreeCount: Integer;
function GetFurnaceCount: Integer;
function GetWaitTaskCount: Integer;
function PopBusy: TRThreadBase; overload;
function PopFree: TRThreadBase;
function PopFurnace: TRThreadBase;
function PopTask: pTaskItem;
procedure SetActive(const Value: Boolean);
procedure SetExtended(const Value: Boolean);
procedure SetMaxThread(const Value: Integer);
procedure SetMinFreeThread(Value: Integer);
procedure SetShrinked(const Value: Boolean);
procedure SwapBusyPool;
protected
procedure ExtendPool;
function PopItem(AList: TOrderedList): Pointer;
procedure PushItem(AList: TOrderedList; AItem: Pointer);
procedure ShrinkPool;
public
constructor Create;
destructor Destory;
procedure AcquireThread(ExecutePorc: TRThreadExcute); overload;
function AcquireThread(ExecutePorc: TRThreadExcute; Wait: Boolean;
TryCount: Integer = -1): TRThreadBase; overload;
procedure Scan(AThread: TRThreadBase; var Release: Boolean; var
SleepTime: Integer);
property Active: Boolean read FActive write SetActive;
property AutoExtend: Boolean read FAutoExtend write FAutoExtend;
property BusyCount: Integer read GetBusyCount;
property DestroyTimeOut: Integer read FDestroyTimeOut write
FDestroyTimeOut;
property DestroyTryCount: Integer read FDestroyTryCount write
FDestroyTryCount;
property Extended: Boolean read FExtended write SetExtended;
property FreeCount: Integer read GetFreeCount;
property FurnaceCount: Integer read GetFurnaceCount;
property MaxThread: Integer read FMaxThread write SetMaxThread;
property MinFreeThread: Integer read FMinFreeThread write
SetMinFreeThread;
property OnManageMsg: TThreadManageMsg read FOnManageMsg write
FOnManageMsg;
property ScanInterval: Integer read FScanInterval write FScanInterval;
property Shrinked: Boolean read FShrinked write SetShrinked;
property WaitFree: Boolean read FWaitFree write FWaitFree;
property WaitTaskCount: Integer read GetWaitTaskCount;
end;

TajThreadTList = class(TObject)
private
FList: TList;
FLock: TajCriticalSection;
function GetCount: Integer;
function GetItems(Index: Integer): Pointer;
procedure SetItems(Index: Integer; Value: Pointer);
public
constructor Create;
destructor Destroy; override;
function Acquire: TList;
function Add(AItem: Pointer): Integer;
procedure Clear;
procedure Delete(Index: Integer);
function IndexOf(Item: Pointer): Integer;
function Pop: Pointer;
function PopFirst: Pointer;
procedure Push(AItem: Pointer);
procedure PushFirst(AItem: Pointer);
procedure Release;
function Remove(Item: Pointer): Integer;
property Count: Integer read GetCount;
property Items[Index: Integer]: Pointer read GetItems write SetItems;
end;

implementation

uses DateUtils, ComObj;

{ Important: Methods and properties of objects in visual components can only be
used in a method called using Synchronize, for example,

Synchronize(UpdateCaption);

and UpdateCaption could look like,

procedure TSvcThread.UpdateCaption;
begin
Form1.Caption := 'Updated in a thread';
end; }

{ TSvcThread }

{
****************************** TajCriticalSection ******************************
}
constructor TajCriticalSection.Create;
begin
inherited Create;
InitializeCriticalSection(FSection);
end;

destructor TajCriticalSection.Destroy;
begin
DeleteCriticalSection(FSection);
inherited Destroy;
end;

procedure TajCriticalSection.Acquire;
begin
EnterCriticalSection(FSection);
end;

procedure TajCriticalSection.AcquireThread(AThread: TRThreadBase);
begin
Acquire;
AThread.FCriticalSection := Self;
end;

function TajCriticalSection.AcquireTry(TryCount, WaitTime: Cardinal): Boolean;
begin
while(TryCount > 0) do begin
Result := TryEnterCriticalSection(FSection);
if Result then Exit;
Dec(TryCount);
Sleep(WaitTime);
end;
Result := false;
end;

procedure TajCriticalSection.Enter;
begin
Acquire;
end;

procedure TajCriticalSection.Leave;
begin
Release;
end;

procedure TajCriticalSection.Release;
begin
LeaveCriticalSection(FSection);
end;

procedure TajCriticalSection.ReleaseThread(AThread: TRThreadBase);
begin
Release;
AThread.FCriticalSection := nil;
end;

{
********************************* TRThreadBase *********************************
}
constructor TRThreadBase.Create(AMgr: TRThreadManage);
begin
inherited Create(true);
FDestroyCount := 0;
FGroupIndex := -1;
FSleepTime := 10;
FMaxSpark := 10;
FSpark := FMaxSpark;
FSparkCycle := 1000;
FOnExecute := nil;
FCriticalSection := nil;
FPerDecSparkTime := Now;
FMgr := AMgr;
//CoInitialize(nil);
end;

destructor TRThreadBase.Destroy;
begin
//CoUninitialize();
end;

procedure TRThreadBase.DecSpark;
begin
if MilliSecondOf(Now - FPerDecSparkTime) >= FSparkCycle then begin
Dec(FSpark);
FPerDecSparkTime := Now;
end;
end;

procedure TRThreadBase.doExecute;
begin
try
InitSpark;
if Assigned(FOnExecute) then begin
FOnExecute(self, FRelease, FSleepTime);
if (FRelease or Mgr.WaitFree) then begin
FOnExecute := nil;
Suspend;
end else
Sleep(FSleepTime);
end else
Sleep(100);
except
On E: Exception do
Mgr.DoMsg('ThreadErr:' + E.Message);
end;
end;

procedure TRThreadBase.InitSpark;
begin
FSpark := FMaxSpark;
end;

procedure TRThreadBase.SetGroupIndex(const Value: Integer);
begin
if FGroupIndex <> Value then
begin
FGroupIndex := Value;
end;
end;

{
********************************* TRPoolThread *********************************
}
procedure TRPoolThread.Execute;
begin
//OleInitialize(nil);
while not Terminated do
doExecute;
//OleUninitialize;
end;

{
******************************** TRThreadManage ********************************
}
constructor TRThreadManage.Create;
begin
inherited Create;
FActive := False;
FWaitFree := False;
FMaxThread := 10;
FMinFreeThread := 2;
FAutoExtend := true;
FExtended := true;
FShrinked := true;
FDestroyTimeOut := 5;
FDestroyTryCount := 100;
FDestroyCountOnce := 20;
FScanInterval := 100;
FScanThread := nil;

FPoolLock := TajCriticalSection.Create;
FBusyBackPool := TStack.Create;
FBusyCurPool := TStack.Create;
FFreePool := TQueue.Create;
FFurnacePool := TStack.Create;
FTaskList := TStack.Create;

ExtendPool;
end;

destructor TRThreadManage.Destory;
begin
DestroyAllThread;

FTaskList.Free;
FBusyBackPool.Free;
FBusyCurPool.Free;
FFreePool.Free;
FFurnacePool.Free;
FPoolLock.Free;
end;

procedure TRThreadManage.AcquireThread(ExecutePorc: TRThreadExcute);
var
p: pTaskItem;
begin
New(p);
p.Task := ExecutePorc;
AddTask(p);
end;

function TRThreadManage.AcquireThread(ExecutePorc: TRThreadExcute; Wait:
Boolean; TryCount: Integer = -1): TRThreadBase;
var
s: Integer;
begin
s := 0;
Result := PopFree;
while ((Result = nil) and (Wait)) do begin
Sleep(1);
if FAutoExtend then AddFree;
ExtendPool;
Inc(s);
if ((TryCount > 0) and (s >= TryCount)) then
Break;
Result := PopFree;
end;
if Result <> nil then begin
AddBusy(Result);
Result.OnExecute := ExecutePorc;
Result.Resume;
end;
end;

procedure TRThreadManage.AddBusy(AThread: TRThreadBase);
begin
if AThread = nil then Exit;
try
PushItem(FBusyCurPool, AThread);
except
On E: Exception do
DoErr('ThreadManage.AddBusyThread', E);
end;
end;

procedure TRThreadManage.AddFree;
var
FreeThread: TRPoolThread;
begin
try
FreeThread := TRPoolThread.Create(Self);
AddFree(FreeThread);
except
On E: Exception do
DoErr('ThreadManage.AddFreeCreate', E);
end;
end;

procedure TRThreadManage.AddFree(AThread: TRThreadBase);
begin
if AThread = nil then Exit;
try
PushItem(FFreePool, AThread);
except
On E: Exception do
DoErr('ThreadManage.AddFreeThread', E);
end;
end;

procedure TRThreadManage.AddFurnace(AThread: TRThreadBase);
begin
if AThread = nil then Exit;
try
PushItem(FFurnacePool, AThread);
except
On E: Exception do
DoErr('ThreadManage.AddFurnace', E);
end;
end;

procedure TRThreadManage.AddTask(pTask: pTaskItem);
begin
if (pTask = nil) then Exit;
try
PushItem(FTaskList, pTask);
except
On E: Exception do
DoErr('ThreadManage.AddTask', E);
end;
end;

procedure TRThreadManage.CheckPool;
var
obj: TRThreadBase;
begin
obj := PopBusy;
while (obj <> nil) do begin
if (not Assigned(obj.OnExecute)) then
AddFree(obj)
else
FBusyBackPool.Push(CheckThreadSpark(obj));
obj := PopBusy;
end;
SwapBusyPool;
end;

procedure TRThreadManage.CheckSycnObj;
begin
if FPoolLock.AcquireTry(3, 5) then begin
FPoolLock.Release;
Exit;
end;
DoMsg('[MSG]同步对象发生死锁');
FPoolLock.Release;
end;

function TRThreadManage.CheckThreadSpark(AThread: TRThreadBase): TRThreadBase;
var
Spark: Integer;
begin
Result := nil;
try
Spark := AThread.Spark;
if Spark <= 0 then begin
AddFurnace(AThread);
if Assigned(AThread.OnExecute) then begin
AcquireThread(AThread.OnExecute);
AThread.OnExecute := nil;
end;
Exit;
end;
AThread.DecSpark;
Result := AThread;
except
On E: Exception do
DoErr('ThreadManage.CheckThreadSpark', E);
end;
end;

procedure TRThreadManage.DestroyAllThread;
var
obj: TRThreadBase;
begin
FActive := false;
obj := PopBusy;
while (obj <> nil) do begin
obj.Terminate;
if ōbj = FScanThread then
obj.WaitFor;
AddFurnace(obj);
obj := PopBusy;
end;

CheckSycnObj;
FDestroyTimeOut := 3;
FDestroyTryCount := 0;
obj := PopFree;
while (obj <> nil) do begin
AddFurnace(obj);
obj := PopFree;
end;
FDestroyCountOnce := FurnaceCount;
DestroyThread;
end;

procedure TRThreadManage.DestroyThread;

procedure CheckThreadError(ErrCode: Integer);
begin
if ErrCode <> 0 then
DoMsg(Format('线程错误%s[%d]', [SysErrorMessage(ErrCode), ErrCode]));
end;

function CheckThreadEnd(AThread: TRThreadBase): boolean;
var
tr, exCode: Cardinal;
begin
Result := false;
tr := WaitForSingleObject(AThread.Handle, FDestroyTimeOut);
if tr = WAIT_TIMEOUT then begin
Inc(AThread.FDestroyCount);
if AThread.FDestroyCount > FDestroyTryCount then begin
DoMsg(Format('线程%d结束重试超过%d,系统将强制结束线程',
[AThread.ThreadID, FDestroyTryCount]));
if GetExitCodeThread(AThread.Handle, exCode) then
if TerminateThread(AThread.Handle, exCode) then
DoMsg('TerminateThread成功');
CheckThreadError(GetLastError);
Result := true;
end else
DoMsg(Format('线程结束超时[%d]', [AThread.FDestroyCount]));
end else begin
Result := true;
end;
end;

procedure DeleteFailThread(AThread: TRThreadBase);
begin
try
DoMsg(Format('线程执行发生异常:[%s][%s]',
[AThread.FatalException.ClassName,
Exception(AThread.FatalException).Message]));
except
On E: Exception do
DoErr('DeleteFailThread', E);
end;
end;

var
I: Integer;
Et: TRThreadBase;

begin
Et := PopFurnace;
I := 0;
while ((Et <> nil) and (Et.Handle <> 0)) do begin
if Et.FatalException = nil then begin
if Et.Suspended then
Et.Resume;
Et.Terminate;
try
if CheckThreadEnd(Et) then
Et.Free
else
AddFurnace(Et);
except
On E: Exception do
DoMsg('销毁线程异常:' + E.Message);
end;
end else
DeleteFailThread(Et);

Inc(I);
if (I >= FDestroyCountOnce) then Exit;
Et := PopFurnace;
end;
end;

procedure TRThreadManage.DispTask;
var
p: pTaskItem;
begin
p := PopTask;
while (p <> nil) do begin
if AcquireThread(p.Task, true, 5) = nil then begin
AddTask(p);
Break;
end;
p := PopTask;
end;
end;

procedure TRThreadManage.DoErr(Method: string; E: Exception);
begin
DoMsg(Format(MSG_E, [Method, E.Message]));
end;

procedure TRThreadManage.DoMsg(Msg: string);
begin
if Assigned(FOnManageMsg) then
FOnManageMsg(Msg)
else
OutputDebugString(PChar(Msg));
end;

procedure TRThreadManage.ExtendPool;
var
I, C, FC: Integer;
begin
if ((not FExtended) or (FAutoExtend)) then Exit;

FC := FreeCount;
C := FMaxThread - FC - BusyCount;
FC := FMinFreeThread - FC;
if FC > C then
FC := C;

for I := 1 to FC do
AddFree;
end;

function TRThreadManage.GetBusyCount: Integer;
begin
FPoolLock.Acquire;
try
Result := FBusyCurPool.Count + FBusyBackPool.Count;
finally
FPoolLock.Release;
end;
end;

function TRThreadManage.GetFreeCount: Integer;
begin
FPoolLock.Acquire;
try
Result := FFreePool.Count;
finally
FPoolLock.Release;
end;
end;

function TRThreadManage.GetFurnaceCount: Integer;
begin
FPoolLock.Acquire;
try
Result := FFurnacePool.Count;
finally
FPoolLock.Release;
end;
end;

function TRThreadManage.GetWaitTaskCount: Integer;
begin
FPoolLock.Acquire;
try
Result := FTaskList.Count;
finally
FPoolLock.Release;
end;
end;

function TRThreadManage.PopBusy: TRThreadBase;
begin
Result := nil;
try
Result := TRThreadBase(PopItem(FBusyCurPool));
except
On E: Exception do
DoErr('ThreadManage.PopBusyThread', E);
end;
end;

function TRThreadManage.PopFree: TRThreadBase;
begin
Result := nil;
try
Result := TRThreadBase(PopItem(FFreePool));
except
On E: Exception do
DoErr('ThreadManage.PopFreeThread', E);
end;
end;

function TRThreadManage.PopFurnace: TRThreadBase;
begin
Result := nil;
try
Result := TRThreadBase(PopItem(FFurnacePool));
except
On E: Exception do
DoErr('ThreadManage.PopFurnaceThread', E);
end;
end;

function TRThreadManage.PopItem(AList: TOrderedList): Pointer;
begin
Result := nil;
FPoolLock.Acquire;
try
if AList.Count > 0 then
Result := AList.Pop;
finally
FPoolLock.Release;
end;
end;

function TRThreadManage.PopTask: pTaskItem;
begin
try
Result := pTaskItem(PopItem(FTaskList));
except
On E: Exception do
DoErr('ThreadManage.PopFurnaceThread', E);
end;
end;

procedure TRThreadManage.PushItem(AList: TOrderedList; AItem: Pointer);
begin
if AItem = nil then Exit;
FPoolLock.Acquire;
try
AList.Push(AItem);
finally
FPoolLock.Release;
end;
end;

procedure TRThreadManage.Scan(AThread: TRThreadBase; var Release: Boolean; var
SleepTime: Integer);
begin
try
try
if ((AThread <> FScanThread) and (FScanThread <> nil)) then
CheckSycnObj;
ExtendPool;
DispTask;
CheckPool;
DestroyThread;
ShrinkPool;
FScanThread := AThread;
Release := False;
SleepTime := FScanInterval;
finally
if not FActive then Release := true;
end;
except
On E: Exception do
DoErr('ThreadManage.Scan', E);
end;
end;

procedure TRThreadManage.SetActive(const Value: Boolean);
begin
if FActive <> Value then
begin
if (Value) then begin
if (FScanThread = nil) then
AcquireThread(Scan, true)
end else
FScanThread := nil;
FActive := Value;
end;
end;

procedure TRThreadManage.SetExtended(const Value: Boolean);
begin
if FExtended <> Value then
begin
FExtended := Value;
end;
end;

procedure TRThreadManage.SetMaxThread(const Value: Integer);
begin
if FMaxThread <> Value then
begin
FMaxThread := Value;
end;
end;

procedure TRThreadManage.SetMinFreeThread(Value: Integer);
begin
if FMinFreeThread <> Value then
begin
FMinFreeThread := Value;
end;
end;

procedure TRThreadManage.SetShrinked(const Value: Boolean);
begin
if FShrinked <> Value then
begin
FShrinked := Value;
end;
end;

procedure TRThreadManage.ShrinkPool;
var
I: Integer;
begin
if (not FShrinked) then Exit;
if FreeCount >= FMaxThread - 1 then
for I := 1 to FreeCount - FMinFreeThread do begin
DoMsg('收缩线程池一个线程被丢弃到回收池');
AddFurnace(PopFree);
end;
end;

procedure TRThreadManage.SwapBusyPool;
var
T: TStack;
begin
T := FBusyBackPool;
FBusyBackPool := FBusyCurPool;
FBusyCurPool := T;
end;

{
******************************** TajThreadTList ********************************
}
constructor TajThreadTList.Create;
begin
inherited Create;
FList := TList.Create;
FLock := TajCriticalSection.Create;
end;

destructor TajThreadTList.Destroy;
begin
FLock.Free;
FList.Free;
inherited Destroy;
end;

function TajThreadTList.Acquire: TList;
begin
FLock.Acquire;
Result := FList;
end;

function TajThreadTList.Add(AItem: Pointer): Integer;
begin
FLock.Acquire;
try
Result := FList.Add(AItem);
finally
FLock.Release;
end;
end;

procedure TajThreadTList.Clear;
begin
FLock.Acquire;
try
FList.Clear;
finally
FLock.Release;
end;
end;

procedure TajThreadTList.Delete(Index: Integer);
begin
FLock.Acquire;
try
FList.Delete(Index);
finally
FLock.Release;
end;
end;

function TajThreadTList.GetCount: Integer;
begin
FLock.Acquire;
try
Result := FList.Count;
finally
FLock.Release;
end;
end;

function TajThreadTList.GetItems(Index: Integer): Pointer;
begin
FLock.Acquire;
try
Result := FList[Index];
finally
FLock.Release;
end;
end;

function TajThreadTList.IndexOf(Item: Pointer): Integer;
begin
FLock.Acquire;
try
Result := FList.IndexOf(Item);
finally
FLock.Release;
end;
end;

function TajThreadTList.Pop: Pointer;
begin
Result := nil;
FLock.Acquire;
try
if FList.Count > 0 then begin
Result := FList.Items[FList.Count - 1];
FList.Delete(FList.Count - 1);
end;
finally
FLock.Release;
end;
end;

function TajThreadTList.PopFirst: Pointer;
begin
Result := nil;
FLock.Acquire;
try
if FList.Count > 0 then begin
Result := FList.First;
FList.Delete(0);
end;
finally
FLock.Release;
end;
end;

procedure TajThreadTList.Push(AItem: Pointer);
begin
Add(AItem);
end;

procedure TajThreadTList.PushFirst(AItem: Pointer);
begin
FLock.Acquire;
try
FList.Insert(0, AItem);
finally
FLock.Release;
end;
end;

procedure TajThreadTList.Release;
begin
FLock.Release;
end;

function TajThreadTList.Remove(Item: Pointer): Integer;
begin
FLock.Acquire;
try
Result := FList.Remove(Item);
finally
FLock.Release;
end;
end;

procedure TajThreadTList.SetItems(Index: Integer; Value: Pointer);
begin
FLock.Acquire;
try
FList[Index] := Value;
finally
FLock.Release;
end;
end;

end.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: