TAxThread - Inter thread message based communication - Delphi
2014-10-08 20:37
671 查看
http://www.cybletter.com/index.php?id=3
http://www.cybletter.com/index.php?id=30
Alexandr Štefek
Military Academy in Brno
alexandr@stefek.cz
If we use parallel proces in programs we have to solve synchronization.
We show the effective implementation of parallel execution without using critical sections, semaphores, mutexes or events.
SDK defines some method for thread creating and handling.
In real implementation must be defined the procedure for all parallel procedures.
If in system Win32 thread create new window handle then all messages are handled by this thread.
For this fact commes the idea of problem solution.
We can simply create window handle on executing thread.
When this thread executes loop for message handling it is possible to send special message to thread window handle.
The parameters of message can be method to execute and parameter for this method.
But method has 8 bytes and the message parameter only 4.
So we have allocate memory block, copy method to this block and send adress of allocated block.
Now we show, how to code the 8 byte method to 4 byte adress of method.
NotifyEventToPointer is method for copying method to memory block.
Result of this method is adress of memory block.
Method ExeProcedure takes WParam of message, convert it back to method
and call decoded method with parametr defined by LParam of message.
Delphi define basic thread class TThread.
This class has virtual method execute.
Defined class TAxThread is inherited from TThread.
Method TAxThread.Execute is overrided;
At first the class has to create handle (CreateHandleParallel).
Loop while waits for messages and handles incomming messages (ProcessMessage).
Calling DispatchMessage dispatch current message to objects for execution.
If incomming method is CM_EXECUTE (defined in Delphi), then method ExecProcedure is called.
If we use for sending the API function SendMessage then the execution is synchronized
(actual thread is suspend, the context is switched, message is immediately handled and control is returned to sending thread).
The API function PostMessage puts the message to message queue and continue in execution.
When the message is peek from queue, is handled and method is executed.
Introduced method AsyncExecuteParallel is used for async parallel execution
(execution on selected thread) of method Proc with parameter ParamSender.
Method SyncExecuteParallel runs method Proc with parameter ParamSender synchronously (waits for execution).
The thread defines method for execution on main thread (AsyncExecuteMain, SyncExecuteMain).
Private variable FExecutingThread is thread on witch the metods will be executed.
There are three public method in form of TNotifyEvent (can be executed on selected thread).
When user clicked on button then method btnRandomizeClick is called.
Method calls AsyncExecuteParallel.
Main thread continue in responsing to user interaction.
When the thread contrext is switched, FExecutingThread begins execute then method RandomizeBMP.
Method RandomizeBMP decode parameter as Bitmap and fills it with some random points.
When randomize is finished, the thread notify main thread (CurrentThread.AsyncExecuteMain).
We present implementation of thread class for async (sync) execution.
The designed library has very effective method for developing of parallel computing.
http://www.cybletter.com/index.php?id=30
Source Code
http://www.cybletter.com/index.php?s=file_download&id=3Full Paper
www.cybletter.com/index.php?s=file_download&id=4Alexandr Štefek
Military Academy in Brno
alexandr@stefek.cz
Abstract
This paper present implementation of parallel execution.If we use parallel proces in programs we have to solve synchronization.
We show the effective implementation of parallel execution without using critical sections, semaphores, mutexes or events.
Thread
The thread is system object defined on platform Win32.SDK defines some method for thread creating and handling.
In real implementation must be defined the procedure for all parallel procedures.
If in system Win32 thread create new window handle then all messages are handled by this thread.
For this fact commes the idea of problem solution.
We can simply create window handle on executing thread.
When this thread executes loop for message handling it is possible to send special message to thread window handle.
The parameters of message can be method to execute and parameter for this method.
But method has 8 bytes and the message parameter only 4.
So we have allocate memory block, copy method to this block and send adress of allocated block.
Coding the method
We want to create class that has method for sync parallel and async parallel execution of methods.Now we show, how to code the 8 byte method to 4 byte adress of method.
function TAxThread.NotifyEventToPointer( Proc : TNotifyEvent ) : Longint; var Method : TMethod absolute Proc; PMethod : ^TMethod; begin New( PMethod ); PMethod^ := Method; Result := Longint( PMethod ); end; procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); begin InterlockedIncrement( FMethodsToExecute ); PostMessageParallel( CM_EXECPROC_WORK_TRHREAD, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; procedure TAxThread.ExecProcedure( var Message : TMessage ); var PMethod : ^TMethod; Method : TMethod; Event : TNotifyEvent absolute Method; begin PMethod := Pointer( message.WParam ); Method := PMethod^; Event( TObject( message.LParam ) ); Dispose( PMethod ); if FThreadID = GetCurrentThreadId then InterlockedDecrement( FMethodsToExecute ); end;
NotifyEventToPointer is method for copying method to memory block.
Result of this method is adress of memory block.
Method ExeProcedure takes WParam of message, convert it back to method
and call decoded method with parametr defined by LParam of message.
Thread loop
We have to define thread message loop.Delphi define basic thread class TThread.
This class has virtual method execute.
Defined class TAxThread is inherited from TThread.
Method TAxThread.Execute is overrided;
procedure TAxThread.Execute; var Msg : TMsg; Done : Boolean; begin CreateHandleParallel; FThreadID := GetCurrentThreadId; while not Terminated do begin if Done then begin FIdleData := nil; WaitMessage; end; while ProcessMessage( Msg ) do { loop }; Idle( FIdleData, Done ); end; end;
At first the class has to create handle (CreateHandleParallel).
Loop while waits for messages and handles incomming messages (ProcessMessage).
function TAxThread.ProcessMessage( var Msg : TMsg ) : Boolean; begin Result := False; if PeekMessage( Msg, 0, 0, 0, PM_REMOVE ) thenbegin Result := True; if Msg.Message <> WM_QUIT then begin TranslateMessage( Msg ); DispatchMessage( Msg ); end else Terminate; end; end;
Calling DispatchMessage dispatch current message to objects for execution.
If incomming method is CM_EXECUTE (defined in Delphi), then method ExecProcedure is called.
Sync and async execution
All parallel processing is sended to execution by message.If we use for sending the API function SendMessage then the execution is synchronized
(actual thread is suspend, the context is switched, message is immediately handled and control is returned to sending thread).
The API function PostMessage puts the message to message queue and continue in execution.
When the message is peek from queue, is handled and method is executed.
// Asynchro execute on parallel thread procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); begin InterlockedIncrement( FMethodsToExecute ); PostMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; // Synchro execute on parallel thread procedure TAxThread.SyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); begin InterlockedIncrement( FMethodsToExecute ); SendMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end;
Introduced method AsyncExecuteParallel is used for async parallel execution
(execution on selected thread) of method Proc with parameter ParamSender.
Method SyncExecuteParallel runs method Proc with parameter ParamSender synchronously (waits for execution).
The thread defines method for execution on main thread (AsyncExecuteMain, SyncExecuteMain).
Using
For example of using define classtype TMainForm = class( TForm ) btnRandomize : TButton; imgResult : TImage; pbProgress : TProgressBar; btnMulti : TButton; procedure FormCreate( Sender : TObject ); procedure FormDestroy( Sender : TObject ); procedure btnRandomizeClick( Sender : TObject ); procedure btnMultiClick( Sender : TObject ); private { Private declarations } FExecutingThread : TAxThread; public { Public declarations } procedure RandomizeBMP( Data : TObject ); procedure DoUpdate( Data : TObject ); procedure Progress( Data : TObject ); end;
Private variable FExecutingThread is thread on witch the metods will be executed.
There are three public method in form of TNotifyEvent (can be executed on selected thread).
procedure TMainForm.btnRandomizeClick( Sender : TObject ); var PomBMP : TBitmap; begin if FExecutingThread = nil then FExecutingThread := TAxThread.Create; PomBMP := TBitmap.Create; PomBMP.Width := 200; PomBMP.Height := 200; PomBMP.PixelFormat := pf24bit; FExecutingThread.AsyncExecuteParallel( RandomizeBMP, PomBMP ); end;
When user clicked on button then method btnRandomizeClick is called.
Method calls AsyncExecuteParallel.
Main thread continue in responsing to user interaction.
When the thread contrext is switched, FExecutingThread begins execute then method RandomizeBMP.
procedure TMainForm.RandomizeBMP( Data : TObject ); var CurrentThread : TAxThread; PomBMP : TBitmap; I : Longint; X : Longint; Y : Longint; Color : Longint; begin if not( Data is TBitmap ) then Exit; PomBMP := Data as TBitmap; CurrentThread := TAxThread.GetCurrentThread; I := 0; try PomBMP.Canvas.Lock; while not CurrentThread.Terminated do begin Inc( I ); if I > MaxPoints then Break; X := Random( 200 ); Y := Random( 200 ); Color := Random( 256 ) * 256 * 256 + Random( 256 ) * 256 + Random( 256 ); if ( I mod ( MaxPoints div 100 ) ) = 0 then CurrentThread.SyncExecuteMain( Progress, TObject( I ) ); // slow for demonstration PomBMP.Canvas.Pixels[ X, Y ] := Color; end; finally PomBMP.Canvas.Unlock; CurrentThread.AsyncExecuteMain( DoUpdate, PomBMP ); end; end;
Method RandomizeBMP decode parameter as Bitmap and fills it with some random points.
When randomize is finished, the thread notify main thread (CurrentThread.AsyncExecuteMain).
Conclusion
We present implementation of thread class for async (sync) execution.The designed library has very effective method for developing of parallel computing.
unit AxThreads2; interface uses Windows, Classes, SysUtils, Controls, Forms, Messages; const CM_EXECPROC = $8FFF; type TIdleEvent = procedure( var Data : TObject; var Done : Boolean ) of object; TAxThread = class( TThread ) private FThreadID : Longint; FOnIdle : TIdleEvent; FIdleData : TObject; FWndHandleParallel : LongWord; FWndHandleMain : LongWord; FMethodsToExecute : Longint; procedure SetOnIdle( const Value : TIdleEvent ); protected procedure CreateHandleParallel; procedure DestroyHandleParallel; procedure CreateHandleMain; procedure DestroyHandleMain; procedure MessageProcedure( var Message : TMessage ); virtual; procedure ExecProcedure( var Message : TMessage ); message CM_EXECPROC; procedure Execute; override; procedure Idle( var Data : TObject; var Done : Boolean ); virtual; function ProcessMessage( var Msg : TMsg ) : Boolean; function NotifyEventToPointer( Proc : TNotifyEvent ) : Longint; public constructor Create; destructor Destroy; override; // Posts a null message to parallel thread procedure WakeUp; class function GetCurrentThread : TAxThread; // sends a message to main thread function SendMessageMain( Msg, WParam, LParam : LongWord ) : Longint; // posts a message to main thread function PostMessageMain( Msg, WParam, LParam : LongWord ) : LongBool; // sends a message to parallel thread function SendMessageParallel( Msg, WParam, LParam : LongWord ) : Longint; // posts a message to parallel thread function PostMessageParallel( Msg, WParam, LParam : LongWord ) : LongBool; // for compatibility function SendMessage( Msg, WParam, LParam : LongWord ) : Longint; function PostMessage( Msg, WParam, LParam : LongWord ) : LongBool; // Synchro execute on parallel thread procedure SyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); // Asynchro execute on parallel thread procedure AsyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); // Synchro execute on main thread procedure SyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject ); // Asynchro execute on main thread procedure AsyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject ); // for compatibility procedure SyncExecute( Proc : TNotifyEvent; ParamSender : TObject ); procedure AsyncExecute( Proc : TNotifyEvent; ParamSender : TObject ); // handle for main thread property WndHandleMain : LongWord read FWndHandleMain; // handle for parallel thread property WndHandleParallel : LongWord read FWndHandleParallel; // idle event for parallel thread property OnIdle : TIdleEvent read FOnIdle write SetOnIdle; property MethodsToExecute : Longint read FMethodsToExecute; property Terminated; end; implementation { TAxThread } var AxThreads : TThreadList; procedure TAxThread.CreateHandleParallel; begin FWndHandleParallel := AllocateHWnd( MessageProcedure ); end; procedure TAxThread.MessageProcedure( var Message : TMessage ); begin Dispatch( message ); end; procedure TAxThread.DestroyHandleParallel; begin DeallocateHWnd( FWndHandleParallel ); FWndHandleParallel := 0; end; constructor TAxThread.Create; begin // this must by synchonized because of MainThread Context FMethodsToExecute := 0; inherited Create( False ); Synchronize( CreateHandleMain ); if AxThreads = nil then AxThreads := TThreadList.Create; AxThreads.Add( Self ); end; destructor TAxThread.Destroy; begin Terminate; WakeUp; inherited; DestroyHandleParallel; DestroyHandleMain; AxThreads.Remove( Self ); end; procedure TAxThread.AsyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); begin InterlockedIncrement( FMethodsToExecute ); PostMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; procedure TAxThread.SyncExecuteParallel( Proc : TNotifyEvent; ParamSender : TObject ); begin InterlockedIncrement( FMethodsToExecute ); SendMessageParallel( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; procedure TAxThread.ExecProcedure( var Message : TMessage ); var PMethod : ^TMethod; Method : TMethod; Event : TNotifyEvent absolute Method; begin PMethod := Pointer( message.WParam ); Method := PMethod^; Event( TObject( message.LParam ) ); Dispose( PMethod ); if FThreadID = GetCurrentThreadId then InterlockedDecrement( FMethodsToExecute ); end; procedure TAxThread.Execute; var Msg : TMsg; Done : Boolean; begin CreateHandleParallel; FThreadID := GetCurrentThreadId; while not Terminated do begin if Done then begin FIdleData := nil; WaitMessage; end; while ProcessMessage( Msg ) do { loop }; Idle( FIdleData, Done ); end; end; procedure TAxThread.CreateHandleMain; begin FWndHandleMain := AllocateHWnd( MessageProcedure ); end; procedure TAxThread.DestroyHandleMain; begin DeallocateHWnd( FWndHandleMain ); FWndHandleMain := 0; end; procedure TAxThread.AsyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject ); begin PostMessageMain( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; procedure TAxThread.SyncExecuteMain( Proc : TNotifyEvent; ParamSender : TObject ); begin SendMessageMain( CM_EXECPROC, NotifyEventToPointer( Proc ), Longint( ParamSender ) ); end; function TAxThread.PostMessageMain( Msg, WParam, LParam : LongWord ) : LongBool; begin Result := Windows.PostMessage( FWndHandleMain, Msg, WParam, LParam ); end; function TAxThread.PostMessageParallel( Msg, WParam, LParam : LongWord ) : LongBool; begin while FWndHandleParallel = 0 do SwitchToThread; Result := Windows.PostMessage( FWndHandleParallel, Msg, WParam, LParam ); end; function TAxThread.SendMessageMain( Msg, WParam, LParam : LongWord ) : Longint; begin Result := Windows.SendMessage( FWndHandleMain, Msg, WParam, LParam ); end; function TAxThread.SendMessageParallel( Msg, WParam, LParam : LongWord ) : Longint; begin while FWndHandleParallel = 0 do SwitchToThread; Result := Windows.SendMessage( FWndHandleParallel, Msg, WParam, LParam ); end; procedure TAxThread.Idle( var Data : TObject; var Done : Boolean ); begin if Assigned( FOnIdle ) then FOnIdle( Data, Done ) else Done := True; end; function TAxThread.ProcessMessage( var Msg : TMsg ) : Boolean; begin Result := False; if PeekMessage( Msg, 0, 0, 0, PM_REMOVE ) then begin Result := True; if Msg.Message <> WM_QUIT then begin TranslateMessage( Msg ); DispatchMessage( Msg ); end else Terminate; end; end; procedure TAxThread.SetOnIdle( const Value : TIdleEvent ); begin FOnIdle := Value; SendMessageParallel( 0, 0, 0 ); end; procedure TAxThread.AsyncExecute( Proc : TNotifyEvent; ParamSender : TObject ); begin AsyncExecuteParallel( Proc, ParamSender ); end; procedure TAxThread.SyncExecute( Proc : TNotifyEvent; ParamSender : TObject ); begin SyncExecuteParallel( Proc, ParamSender ); end; function TAxThread.PostMessage( Msg, WParam, LParam : LongWord ) : LongBool; begin Result := PostMessageParallel( Msg, WParam, LParam ); end; function TAxThread.SendMessage( Msg, WParam, LParam : LongWord ) : Longint; begin Result := SendMessageParallel( Msg, WParam, LParam ); end; function TAxThread.NotifyEventToPointer( Proc : TNotifyEvent ) : Longint; var Method : TMethod absolute Proc; PMethod : ^TMethod; begin New( PMethod ); PMethod^ := Method; Result := Longint( PMethod ); end; procedure TAxThread.WakeUp; begin PostMessageParallel( 0, 0, 0 ); end; class function TAxThread.GetCurrentThread : TAxThread; var PomList : TList; PomIndex : Longint; ThreadID : Longint; begin Result := nil; if AxThreads = nil then Exit; ThreadID := GetCurrentThreadId; PomList := AxThreads.LockList; try for PomIndex := 0 to PomList.Count - 1 do begin Result := TAxThread( PomList[ PomIndex ] ); if Result.FThreadID = ThreadID then Break; Result := nil; end; finally AxThreads.UnlockList; end; end; end.
相关文章推荐
- Thread,Looper,Handler,Message,MessageQueue之间的关系
- Delphi Thread 线程代码分析(转载)
- delphi CreateAnonymousThread 匿名线程
- Message,MessageQueue,Looper,Handler详解(摘抄加总结) +HandlerThread
- Delphi 7 自定义消息框MessageMyDlg
- Handle inter-process communication between PhantomJS and Golang processes via hixie-76 websockets
- TExternalThread TThread -- Delphi -- Cannot terminate an externally created thread ?
- [Delphi学习]Thread类的创建及使用
- delphi 读网页线程TReadHtmlThread
- Use Named Pipes and Shared Memory for inter process communication with a child process or two
- Handler - Get message from WorkerThread
- delphi SysErrorMessage 函数和系统错误信息表
- 设计模式多线程方面之Thread-Per-Message 模式
- 理解Android消息处理系统 -- Handler & Message & Looper & ThreadLocal
- 子线程与主线程通信之Thread-Handler-Message
- TZC Intercommunication System
- Thread和Looper以及Handler和Message详解 Android开发必读
- Android:Handler+Looper+MessageQueue+Thread(线程间的通信)随记
- android Thread HandlerThread Looper Message MessageQueue Handler的关系
- TCommThread -- 在delphi线程中实现消息循环