您的位置:首页 > 编程语言 > Delphi

TAxThread - Inter thread message based communication - Delphi

2014-10-08 20:37 671 查看
http://www.cybletter.com/index.php?id=3

http://www.cybletter.com/index.php?id=30

Source Code

http://www.cybletter.com/index.php?s=file_download&id=3

Full Paper

www.cybletter.com/index.php?s=file_download&id=4

Alexandr Štefek
Military Academy in Brno
alexandr@stefek.cz

Abstract

This paper present implementation of parallel execution.

If we use parallel proces in programs we have to solve synchronization.

We show the effective implementation of parallel execution without using critical sections, semaphores, mutexes or events.

Thread

The thread is system object defined on platform Win32.

SDK defines some method for thread creating and handling.

In real implementation must be defined the procedure for all parallel procedures.

If in system Win32 thread create new window handle then all messages are handled by this thread.

For this fact commes the idea of problem solution.

We can simply create window handle on executing thread.

When this thread executes loop for message handling it is possible to send special message to thread window handle.

The parameters of message can be method to execute and parameter for this method.

But method has 8 bytes and the message parameter only 4.

So we have allocate memory block, copy method to this block and send adress of allocated block.

Coding the method

We want to create class that has method for sync parallel and async parallel execution of methods.

Now we show, how to code the 8 byte method to 4 byte adress of method.

function TAxThread.NotifyEventToPointer( Proc : TNotifyEvent ) : Longint;
var
Method : TMethod absolute Proc;
PMethod : ^TMethod;
begin
New( PMethod );

PMethod^ := Method;
Result := Longint( PMethod );
end;

procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject );
begin
InterlockedIncrement( FMethodsToExecute );
PostMessageParallel( CM_EXECPROC_WORK_TRHREAD, NotifyEventToPointer( Proc ), Longint( ParamSender ) );
end;

procedure TAxThread.ExecProcedure( var Message : TMessage );
var
PMethod : ^TMethod;
Method : TMethod;
Event : TNotifyEvent absolute Method;
begin
PMethod := Pointer( message.WParam );
Method := PMethod^;

Event( TObject( message.LParam ) );

Dispose( PMethod );

if FThreadID = GetCurrentThreadId then
InterlockedDecrement( FMethodsToExecute );
end;


NotifyEventToPointer is method for copying method to memory block.

Result of this method is adress of memory block.

Method ExeProcedure takes WParam of message, convert it back to method

and call decoded method with parametr defined by LParam of message.

Thread loop

We have to define thread message loop.

Delphi define basic thread class TThread.

This class has virtual method execute.

Defined class TAxThread is inherited from TThread.

Method TAxThread.Execute is overrided;

procedure TAxThread.Execute;
var
Msg : TMsg;
Done : Boolean;
begin
CreateHandleParallel;
FThreadID := GetCurrentThreadId;
while not Terminated do
begin
if Done then
begin
FIdleData := nil;
WaitMessage;
end;
while ProcessMessage( Msg ) do { loop };
Idle( FIdleData, Done );
end;
end;


At first the class has to create handle (CreateHandleParallel).

Loop while waits for messages and handles incomming messages (ProcessMessage).

function TAxThread.ProcessMessage( var Msg : TMsg ) : Boolean;
begin
Result := False;
if PeekMessage( Msg, 0, 0, 0, PM_REMOVE ) thenbegin
Result := True;
if Msg.Message <> WM_QUIT then
begin
TranslateMessage( Msg );
DispatchMessage( Msg );
end
else
Terminate;
end;
end;


Calling DispatchMessage dispatch current message to objects for execution.

If incomming method is CM_EXECUTE (defined in Delphi), then method ExecProcedure is called.

Sync and async execution

All parallel processing is sended to execution by message.

If we use for sending the API function SendMessage then the execution is synchronized

(actual thread is suspend, the context is switched, message is immediately handled and control is returned to sending thread).

The API function PostMessage puts the message to message queue and continue in execution.

When the message is peek from queue, is handled and method is executed.

// Asynchro execute on parallel thread
procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent;
ParamSender : TObject );
begin
InterlockedIncrement( FMethodsToExecute );
PostMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;

// Synchro execute on parallel thread
procedure TAxThread.SyncExecuteParallel( Proc : TNotifyEvent;
ParamSender : TObject );
begin
InterlockedIncrement( FMethodsToExecute );
SendMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;


Introduced method AsyncExecuteParallel is used for async parallel execution

(execution on selected thread) of method Proc with parameter ParamSender.

Method SyncExecuteParallel runs method Proc with parameter ParamSender synchronously (waits for execution).

The thread defines method for execution on main thread (AsyncExecuteMain, SyncExecuteMain).

Using

For example of using define class

type
TMainForm = class( TForm )
btnRandomize : TButton;
imgResult : TImage;
pbProgress : TProgressBar;
btnMulti : TButton;
procedure FormCreate( Sender : TObject );
procedure FormDestroy( Sender : TObject );
procedure btnRandomizeClick( Sender : TObject );
procedure btnMultiClick( Sender : TObject );
private
{ Private declarations }
FExecutingThread : TAxThread;
public
{ Public declarations }
procedure RandomizeBMP( Data : TObject );
procedure DoUpdate( Data : TObject );
procedure Progress( Data : TObject );
end;


Private variable FExecutingThread is thread on witch the metods will be executed.

There are three public method in form of TNotifyEvent (can be executed on selected thread).

procedure TMainForm.btnRandomizeClick( Sender : TObject );
var
PomBMP : TBitmap;
begin
if FExecutingThread = nil then
FExecutingThread := TAxThread.Create;
PomBMP := TBitmap.Create;
PomBMP.Width := 200;
PomBMP.Height := 200;
PomBMP.PixelFormat := pf24bit;
FExecutingThread.AsyncExecuteParallel( RandomizeBMP, PomBMP );
end;


When user clicked on button then method btnRandomizeClick is called.

Method calls AsyncExecuteParallel.

Main thread continue in responsing to user interaction.

When the thread contrext is switched, FExecutingThread begins execute then method RandomizeBMP.

procedure TMainForm.RandomizeBMP( Data : TObject );
var
CurrentThread : TAxThread;
PomBMP : TBitmap;
I : Longint;
X : Longint;
Y : Longint;
Color : Longint;
begin
if not( Data is TBitmap ) then
Exit;
PomBMP := Data as TBitmap;
CurrentThread := TAxThread.GetCurrentThread;
I := 0;
try
PomBMP.Canvas.Lock;
while not CurrentThread.Terminated do
begin
Inc( I );
if I > MaxPoints then
Break;
X := Random( 200 );
Y := Random( 200 );
Color := Random( 256 ) * 256 * 256 + Random( 256 ) * 256 + Random( 256 );
if ( I mod ( MaxPoints div 100 ) ) = 0 then
CurrentThread.SyncExecuteMain( Progress, TObject( I ) );
// slow for demonstration
PomBMP.Canvas.Pixels[ X, Y ] := Color;
end;
finally
PomBMP.Canvas.Unlock;
CurrentThread.AsyncExecuteMain( DoUpdate, PomBMP );
end;
end;


Method RandomizeBMP decode parameter as Bitmap and fills it with some random points.

When randomize is finished, the thread notify main thread (CurrentThread.AsyncExecuteMain).


Conclusion

We present implementation of thread class for async (sync) execution.

The designed library has very effective method for developing of parallel computing.

unit AxThreads2;

interface

uses
Windows, Classes, SysUtils, Controls, Forms, Messages;

const
CM_EXECPROC = $8FFF;

type
TIdleEvent = procedure( var Data : TObject; var Done : Boolean ) of object;

TAxThread = class( TThread )
private
FThreadID : Longint;
FOnIdle : TIdleEvent;
FIdleData : TObject;
FWndHandleParallel : LongWord;
FWndHandleMain : LongWord;

FMethodsToExecute : Longint;
procedure SetOnIdle( const Value : TIdleEvent );
protected
procedure CreateHandleParallel;
procedure DestroyHandleParallel;
procedure CreateHandleMain;
procedure DestroyHandleMain;

procedure MessageProcedure( var Message : TMessage ); virtual;

procedure ExecProcedure( var Message : TMessage ); message CM_EXECPROC;
procedure Execute; override;
procedure Idle( var Data : TObject; var Done : Boolean ); virtual;

function ProcessMessage( var Msg : TMsg ) : Boolean;

function NotifyEventToPointer( Proc : TNotifyEvent ) : Longint;
public
constructor Create;
destructor Destroy; override;

// Posts a null message to parallel thread
procedure WakeUp;

class function GetCurrentThread : TAxThread;
// sends a message to main thread
function SendMessageMain( Msg, WParam, LParam : LongWord ) : Longint;
// posts a message to main thread
function PostMessageMain( Msg, WParam, LParam : LongWord ) : LongBool;

// sends a message to parallel thread
function SendMessageParallel( Msg, WParam, LParam : LongWord ) : Longint;
// posts a message to parallel thread
function PostMessageParallel( Msg, WParam, LParam : LongWord ) : LongBool;

// for compatibility
function SendMessage( Msg, WParam, LParam : LongWord ) : Longint;
function PostMessage( Msg, WParam, LParam : LongWord ) : LongBool;

// Synchro execute on parallel thread
procedure SyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject );
// Asynchro execute on parallel thread
procedure AsyncExecuteParallel( Proc : TNotifyEvent;
ParamSender : TObject );

// Synchro execute on main thread
procedure SyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject );
// Asynchro execute on main thread
procedure AsyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject );

// for compatibility
procedure SyncExecute( Proc : TNotifyEvent; ParamSender : TObject );
procedure AsyncExecute( Proc : TNotifyEvent; ParamSender : TObject );

// handle for main thread
property WndHandleMain : LongWord read FWndHandleMain;

// handle for parallel thread
property WndHandleParallel : LongWord read FWndHandleParallel;

// idle event for parallel thread
property OnIdle : TIdleEvent read FOnIdle write SetOnIdle;

property MethodsToExecute : Longint read FMethodsToExecute;

property Terminated;
end;

implementation

{ TAxThread }
var
AxThreads : TThreadList;

procedure TAxThread.CreateHandleParallel;
begin
FWndHandleParallel := AllocateHWnd( MessageProcedure );
end;

procedure TAxThread.MessageProcedure( var Message : TMessage );
begin
Dispatch( message );
end;

procedure TAxThread.DestroyHandleParallel;
begin
DeallocateHWnd( FWndHandleParallel );
FWndHandleParallel := 0;
end;

constructor TAxThread.Create;
begin
// this must by synchonized because of MainThread Context
FMethodsToExecute := 0;

inherited Create( False );
Synchronize( CreateHandleMain );

if AxThreads = nil then
AxThreads := TThreadList.Create;

AxThreads.Add( Self );
end;

destructor TAxThread.Destroy;
begin
Terminate;
WakeUp;
inherited;
DestroyHandleParallel;
DestroyHandleMain;

AxThreads.Remove( Self );
end;

procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent;
ParamSender : TObject );
begin
InterlockedIncrement( FMethodsToExecute );
PostMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;

procedure TAxThread.SyncExecuteParallel( Proc : TNotifyEvent;
ParamSender : TObject );
begin
InterlockedIncrement( FMethodsToExecute );
SendMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;

procedure TAxThread.ExecProcedure( var Message : TMessage );
var
PMethod : ^TMethod;
Method : TMethod;
Event : TNotifyEvent absolute Method;
begin
PMethod := Pointer( message.WParam );

Method := PMethod^;
Event( TObject( message.LParam ) );

Dispose( PMethod );
if FThreadID = GetCurrentThreadId then
InterlockedDecrement( FMethodsToExecute );
end;

procedure TAxThread.Execute;
var
Msg : TMsg;
Done : Boolean;
begin
CreateHandleParallel;
FThreadID := GetCurrentThreadId;

while not Terminated do
begin
if Done then
begin
FIdleData := nil;
WaitMessage;
end;
while ProcessMessage( Msg ) do { loop };
Idle( FIdleData, Done );
end;
end;

procedure TAxThread.CreateHandleMain;
begin
FWndHandleMain := AllocateHWnd( MessageProcedure );
end;

procedure TAxThread.DestroyHandleMain;
begin
DeallocateHWnd( FWndHandleMain );
FWndHandleMain := 0;
end;

procedure TAxThread.AsyncExecuteMain( Proc : TNotifyEvent;
ParamSender : TObject );
begin
PostMessageMain( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;

procedure TAxThread.SyncExecuteMain( Proc : TNotifyEvent;
ParamSender : TObject );
begin
SendMessageMain( CM_EXECPROC, NotifyEventToPointer( Proc ),
Longint( ParamSender ) );
end;

function TAxThread.PostMessageMain( Msg, WParam, LParam : LongWord ) : LongBool;
begin
Result := Windows.PostMessage( FWndHandleMain, Msg, WParam, LParam );
end;

function TAxThread.PostMessageParallel( Msg, WParam, LParam : LongWord )
: LongBool;
begin
while FWndHandleParallel = 0 do
SwitchToThread;

Result := Windows.PostMessage( FWndHandleParallel, Msg, WParam, LParam );
end;

function TAxThread.SendMessageMain( Msg, WParam, LParam : LongWord ) : Longint;
begin
Result := Windows.SendMessage( FWndHandleMain, Msg, WParam, LParam );
end;

function TAxThread.SendMessageParallel( Msg, WParam, LParam : LongWord )
: Longint;
begin
while FWndHandleParallel = 0 do
SwitchToThread;

Result := Windows.SendMessage( FWndHandleParallel, Msg, WParam, LParam );
end;

procedure TAxThread.Idle( var Data : TObject; var Done : Boolean );
begin
if Assigned( FOnIdle ) then
FOnIdle( Data, Done )
else
Done := True;
end;

function TAxThread.ProcessMessage( var Msg : TMsg ) : Boolean;
begin
Result := False;
if PeekMessage( Msg, 0, 0, 0, PM_REMOVE ) then
begin
Result := True;
if Msg.Message <> WM_QUIT then
begin
TranslateMessage( Msg );
DispatchMessage( Msg );
end
else
Terminate;
end;
end;

procedure TAxThread.SetOnIdle( const Value : TIdleEvent );
begin
FOnIdle := Value;
SendMessageParallel( 0, 0, 0 );
end;

procedure TAxThread.AsyncExecute( Proc : TNotifyEvent; ParamSender : TObject );
begin
AsyncExecuteParallel( Proc, ParamSender );
end;

procedure TAxThread.SyncExecute( Proc : TNotifyEvent; ParamSender : TObject );
begin
SyncExecuteParallel( Proc, ParamSender );
end;

function TAxThread.PostMessage( Msg, WParam, LParam : LongWord ) : LongBool;
begin
Result := PostMessageParallel( Msg, WParam, LParam );
end;

function TAxThread.SendMessage( Msg, WParam, LParam : LongWord ) : Longint;
begin
Result := SendMessageParallel( Msg, WParam, LParam );
end;

function TAxThread.NotifyEventToPointer( Proc : TNotifyEvent ) : Longint;
var
Method : TMethod absolute Proc;
PMethod : ^TMethod;
begin
New( PMethod );
PMethod^ := Method;
Result := Longint( PMethod );
end;

procedure TAxThread.WakeUp;
begin
PostMessageParallel( 0, 0, 0 );
end;

class function TAxThread.GetCurrentThread : TAxThread;
var
PomList : TList;
PomIndex : Longint;
ThreadID : Longint;
begin
Result := nil;
if AxThreads = nil then
Exit;

ThreadID := GetCurrentThreadId;
PomList := AxThreads.LockList;
try
for PomIndex := 0 to PomList.Count - 1 do
begin
Result := TAxThread( PomList[ PomIndex ] );
if Result.FThreadID = ThreadID then
Break;
Result := nil;
end;
finally
AxThreads.UnlockList;
end;
end;

end.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: