您的位置:首页 > 其它

Implementing the CLR Asynchronous Programming Model(转)

2011-07-26 14:11 459 查看
http://msdn.microsoft.com/en-us/magazine/cc163467.aspx

MSDN Magazine > Issues > 2007 > March > Implementing the CLR Asynchronous Programming M...

Concurrent Affairs

Implementing the CLR Asynchronous Programming Model

Jeffrey Richter

Code download available at:

ConcurrentAffairs2007_03.exe

(157 KB)

Browse the Code Online



Contents
The Heart of the APM: IAsyncResult

Implementing the APM

Testing and Performance

Conclusion

Slow
and unpredictable are words that typically characterize I/O operations.
When an application performs a synchronous I/O operation, the
application is basically giving up control to the device that is doing
the actual work. For example, if an application calls the StreamRead
method to read some bytes from a FileStream or NetworkStream, there is
no telling how much time will pass before that method returns. If the
file being read is on a local hard drive, then Read may return almost
immediately. If the remote server housing the file is offline, then the
Read method may wait several minutes before timing out and throwing an
exception. During this time, the thread making the synchronous request
is tied up. If that thread is the UI thread, the application is frozen
and stops responding to user input.

A
thread waiting for synchronous I/O to complete is blocked, which means
that thread is idle but is not allowed to perform useful work. To
improve scalability, many application developers create more threads.
Unfortunately, each thread introduces significant overhead such as its
kernel object, user-mode and kernel-mode stacks, increased context
switching, the calling of DllMain methods with thread attach/detach
notifications, and so on. The result is actually reduced scalability.

An
application that wishes to remain responsive to the user, improve
scalability and throughput, and increase reliability should not perform
I/O operations synchronously. Instead, the application should use the
common language runtime (CLR) Asynchronous Programming Model (APM) to
perform asynchronous I/O operations. Much has been written about how to
use the CLR APM, including Chapter 23 of my book CLR via C#, 2nd Edition (Microsoft Press®,
2006). However, I'm not aware of anything that has been written
explaining how to define a class with methods that implement the APM. So
I decided to focus on this topic for this column.

There
are basically four reasons why a developer would want to implement the
APM itself. First, you might be building a class that communicates
directly with hardware (such as a file system on a hard disk, a network,
a serial port, or a parallel port). As already mentioned, device I/O is
unpredictable and therefore an application should perform asynchronous
I/O operations when communicating with hardware in order for the
application to remain responsive, scalable, and reliable.

Fortunately, the Microsoft®
.NET Framework already includes classes that communicate with many
hardware devices. Thus, you would not need to implement the APM
yourself unless you were defining a class that communicated with a
hardware device not already supported by the Framework Class Library
(FCL), such as a parallel port. However, there are devices that are
supported by the FCL, yet certain sub-features are not supported. In
this situation, you might need to implement the APM if you wanted to
perform I/O operations. For example, the FCL does offer a FileStream
class that lets your application communicate with a disk, but the
FileStream doesn't let you access opportunistic locks (microsoft.com/msj/0100/win32/win320100.aspx), sparse file streams (microsoft.com/msj/1198/ntfs/ntfs.aspx), or other cool features offered by the NTFS file system. If you wanted to write P/Invoke wrappers to call the Win32®
APIs that exposes these features, you would want the wrappers to
support the APM, thus allowing the operations to be performed
asynchronously.

Second,
you might be building an abstraction layer over an already defined
class that communicates directly with hardware. Several examples of this
already exist in the .NET Framework. For instance, let's say you're
sending data to a Web service method. On the Web service client proxy
class, there is a method that accepts your arguments, which could be a
set of complex data structures. Internally, the method serializes these
complex data structures into an array of bytes. The byte array is then
sent over the wire by using a NetworkStream class (which already has the
ability to communicate with the hardware using asynchronous I/O).
Another example occurs when accessing a database. The ADO.NET SqlCommand
type offers BeginExecuteNonQuery, BeginExecuteReader, and other
BeginXxx methods that parse arguments to send data over a network to a
database. When the I/O completes, the corresponding EndExecuteNonQuery,
EndExecuteReader, and other EndXxx methods are called. Internally, these
EndXxx methods parse the resulted data and return rich data objects
back to the caller.

Third,
your class might offer a method that could potentially take a long time
to execute. In this case, you might want to offer BeginXxx and EndXxx
methods, allowing the callers the convenience of the APM. Unlike the
previous examples, which ended up being I/O-bound operations, this time
your method is performing a compute-bound operation. Because the
operation is compute-bound, a thread must be used to perform the work;
you are just defining BeginXxx and EndXxx methods as a convenience to
users of your class.

Finally,
your class might wrap a Win32 method that performs synchronous I/O.
Unfortunately, there are a number of Win32 methods that perform I/O but
for which Win32 offers no way to perform these I/O operations
asynchronously. For example, the Win32 registry and event logging
functions could communicate with a local or remote registry/event log.
Microsoft could create asynchronous versions of these Win32 functions
allowing a thread not to block on them. However, as of today,
asynchronous versions of these Win32 functions do not exist. When I wrap
these kinds of methods with managed code, I always offer BeginXxx and
EndXxx methods so that I can do the right thing in managed code even
though my application is not as efficient as it could be because my
methods must have a thread block while Windows®
performs the synchronous I/O operation. However, if Microsoft ever adds
asynchronous versions of these methods to Win32, I could change my
wrappers to take advantage of the new methods, thereby gaining
efficiency, without having to change my client code at all.

The Heart of the APM: IAsyncResult

At the heart of the CLR APM is the IAsyncResult interface, which is defined as shown in Figure 1.



Figure 1 IAsyncResult

public interface IAsyncResult {
WaitHandle AsyncWaitHandle { get; } // For Wait-Until-Done technique
Boolean    IsCompleted     { get; } // For Polling technique
Object     AsyncState      { get; } // For Callback technique
Boolean    CompletedSynchronously { get; } // Almost never used
}


When
any BeginXxx method is called, this method must internally construct an
object whose type implements IAsyncResult and its four read-only
properties. This object identifies the status of the asynchronous
operation that has just been started. After the BeginXxx method returns
to application code, the application can query these properties to
determine whether the operation has completed. This object also contains
the state of the completed operation: a result value if the operation
completed successfully, or an exception if the operation failed to
complete. An application passes the IAsyncResult object to an EndXxx
method, which waits for the operation to complete (assuming it's not
finished yet). The EndXxx method either returns the result or throws the
exception letting the caller know the operation's result or error.

Figure 2
defines an AsyncResultNoResult class that implements the IAsyncResult
interface. This simple class can be used for asynchronous operations
that have no return value-specifically, the operation either succeeds or
fails. The Stream's BeginWrite and EndWrite methods are an example of
this. When you start an asynchronous write operation to a stream, the
result is either success or failure-the EndWrite method is prototyped as
returning void.



Figure 2 AsyncResultNoResult Class

internal class AsyncResultNoResult : IAsyncResult
{
// Fields set at construction which never change while
// operation is pending
private readonly AsyncCallback m_AsyncCallback;
private readonly Object m_AsyncState;

// Fields set at construction which do change after
// operation completes
private const Int32 c_StatePending = 0;
private const Int32 c_StateCompletedSynchronously = 1;
private const Int32 c_StateCompletedAsynchronously = 2;
private Int32 m_CompletedState = c_StatePending;

// Field that may or may not get set depending on usage
private ManualResetEvent m_AsyncWaitHandle;

// Fields set when operation completes
private Exception m_exception;

public AsyncResultNoResult(AsyncCallback asyncCallback, Object state)
{
m_AsyncCallback = asyncCallback;
m_AsyncState = state;
}

public void SetAsCompleted(
Exception exception, Boolean completedSynchronously)
{
// Passing null for exception means no error occurred.
// This is the common case
m_exception = exception;

// The m_CompletedState field MUST be set prior calling the callback
Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
completedSynchronously ? c_StateCompletedSynchronously :
c_StateCompletedAsynchronously);
if (prevState != c_StatePending)
throw new InvalidOperationException(
"You can set a result only once");

// If the event exists, set it
if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();

// If a callback method was set, call it
if (m_AsyncCallback != null) m_AsyncCallback(this);
}

public void EndInvoke()
{
// This method assumes that only 1 thread calls EndInvoke
// for this object
if (!IsCompleted)
{
// If the operation isn't done, wait for it
AsyncWaitHandle.WaitOne();
AsyncWaitHandle.Close();
m_AsyncWaitHandle = null;  // Allow early GC
}

// Operation is done: if an exception occured, throw it
if (m_exception != null) throw m_exception;
}

#region Implementation of IAsyncResult
public Object AsyncState { get { return m_AsyncState; } }

public Boolean CompletedSynchronously {
get { return Thread.VolatileRead(ref m_CompletedState) ==
c_StateCompletedSynchronously; }
}

public WaitHandle AsyncWaitHandle
{
get
{
if (m_AsyncWaitHandle == null)
{
Boolean done = IsCompleted;
ManualResetEvent mre = new ManualResetEvent(done);
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,
mre, null) != null)
{
// Another thread created this object's event; dispose
// the event we just created
mre.Close();
}
else
{
if (!done && IsCompleted)
{
// If the operation wasn't done when we created
// the event but now it is done, set the event
m_AsyncWaitHandle.Set();
}
}
}
return m_AsyncWaitHandle;
}
}

public Boolean IsCompleted {
get { return Thread.VolatileRead(ref m_CompletedState) !=
c_StatePending; }
}
#endregion
}


As
you can see, the AsyncResultNoResult class has a constructor that
accepts AsyncCallback and Object arguments that are used to start all
asynchronous operations. The constructor just saves these arguments in
private fields. IAsyncResult's AsyncState property returns the Object
field to the caller. The class defines an m_CompletedState field used to
implement IAsyncResult's IsCompleted and CompletedSynchronously
properties. It also defines an m_AsyncWaitHandle field to implement
IAsyncResult's AsyncWaitHandle property. Finally, the class defines an
m_exception field. This field is set when the operation completes. If
the operation completes successfully, then the field is set to null
(same as its initialized value); if the operation fails, then the field
is set to the Exception-derived object indicating the reason for
failure.

If
you analyze the AsyncResultNoResult class, you'll notice that the code
as a whole is very straightforward-except for the section that deals
with the m_AsyncWaitHandle field. This field, a reference to a
ManualResetEvent object, is only needed if the code starting the
asynchronous operation queries the AsyncWaitHandle property or if the
code calls the EndInvoke method before the operation has actually
completed executing. The most common and recommended way to use the APM
is to specify an AsyncCallback method that should automatically be
invoked when the operation completes. For this common usage, the
ManualResetEvent object is not necessary at all. As a result, I've gone
to great lengths to avoid creating and using this object unless the
code using an AsyncResultNoResult object absolutely needs it.

The
reason I go to such great lengths is because creating and using a
kernel object (such as a ManualResetEvent) is relatively expensive. For
more information about the performance hit of using kernel objects,
please see my October 2005 Concurrent Affairs column (msdn.microsoft.com/msdnmag/issues/05/10/ConcurrentAffairs).

When
the asynchronous operation completes, some piece of code must call
AsyncResultNoResult's SetAsCompleted method, passing in null if the
operation completed successfully or a reference to an Exception-derived
object if the operation failed. The code also indicates whether the
operation completed synchronously (almost never) or asynchronously
(almost always). This information is returned from IAsyncResult's
CompletedSynchronously property, but very few applications actually care
about it.

Internally,
SetAsCompleted saves the exception in the m_exception field and changes
the state of the m_completedSynchronously field. Then, if the manual
reset event object was created, it is set. Finally, if an AsyncCallback
method was specified when the AsyncResultNoResult object was
constructed, this method is called back, letting the application code
know that the asynchronous operation completed so that it can process
the result (or failure).

To
get the results of the operation, the application code will call some
EndXxx method that will, in turn, call AsyncResultNoResult's EndInvoke
method to determine whether the operation succeeded. If EndInvoke is
called prior to the operation completing, then EndInvoke uses the manual
reset event to suspend the calling thread until the operation has
completed. If the operation completes, EndInvoke either returns or
throws the exception that was saved off earlier when SetAsCompleted was
called.

Since
many asynchronous operations do have a return value, I also defined a
class to support this: AsyncResult<TResult> (see Figure 3).
This generic class is derived from AsyncResultNoResult and really just
adds support for a return value of type TResult. This support comes in
the form of a private field to hold the result (m_result), an overload
of the SetAsCompleted method that accepts a TResult value, and a new
EndInvoke method that waits for the operation to complete and then
returns the result if the operation succeeded or throws an exception if
the operation failed.



Figure 3 AsyncResult with a Return Value

internal class AsyncResult<TResult> : AsyncResultNoResult
{
// Field set when operation completes
private TResult m_result = default(TResult);

public AsyncResult(AsyncCallback asyncCallback, Object state) :
base(asyncCallback, state) { }

public void SetAsCompleted(TResult result,
Boolean completedSynchronously)
{
// Save the asynchronous operation's result
m_result = result;

// Tell the base class that the operation completed
// sucessfully (no exception)
base.SetAsCompleted(null, completedSynchronously);
}

new public TResult EndInvoke()
{
base.EndInvoke(); // Wait until operation has completed
return m_result;  // Return the result (if above didn't throw)
}
}


Also,
many BeginXxx methods accept arguments in addition to the AsyncCallback
and the Object. For example, the Socket class has a BeginAccept method
that takes IPAddress (address) and Int32 (port) arguments. If you want
to use the AsyncResultNoResult or AsyncResult<TResult> class with a
BeginXxx method that takes additional arguments, you'll want to define
your own type derived from either of these two base classes (depending
on whether your EndXxx method returns void). In your class, define
additional fields-one for each argument-and set them in your class's
constructor. Then the method that does the actual work can extract these
argument values from your class's fields when the time is right.

Implementing the APM

Now
that you see how to define a type that implements the IAsyncResult
interface, I'll show how to use my AsyncResult<TResult> and
AsyncResultNoResult classes. I defined a LongTask class (see Figure 4)
that offers a synchronous DoTask method that takes a long time to
execute and returns a DateTime instance indicating when the operation
completed.



Figure 4 LongTask Simulates Asynchronous I/O

internal sealed class LongTask
{
private Int32 m_ms;  // Milliseconds;

public LongTask(Int32 seconds)
{
m_ms = seconds * 1000;
}

// Synchronous version of time-consuming method
public DateTime DoTask()
{
Thread.Sleep(m_ms);  // Simulate time-consuming task
return DateTime.Now; // Indicate when task completed
}

// Asynchronous version of time-consuming method (Begin part)
public IAsyncResult BeginDoTask(AsyncCallback callback, Object state)
{
// Create IAsyncResult object identifying the
// asynchronous operation
AsyncResult<DateTime> ar = new AsyncResult<DateTime>(
callback, state);

// Use a thread pool thread to perform the operation
ThreadPool.QueueUserWorkItem(DoTaskHelper, ar);

return ar;  // Return the IAsyncResult to the caller
}

// Asynchronous version of time-consuming method (End part)
public DateTime EndDoTask(IAsyncResult asyncResult)
{
// We know that the IAsyncResult is really an
// AsyncResult<DateTime> object
AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;

// Wait for operation to complete, then return result or
// throw exception
return ar.EndInvoke();
}

// Asynchronous version of time-consuming method (private part
// to set completion result/exception)
private void DoTaskHelper(Object asyncResult)
{
// We know that it's really an AsyncResult<DateTime> object
AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;
try
{
// Perform the operation; if sucessful set the result
DateTime dt = DoTask();
ar.SetAsCompleted(dt, false);
}
catch (Exception e)
{
// If operation fails, set the exception
ar.SetAsCompleted(e, false);
}
}
}


As
a convenience, I also offer BeginDoTask and EndDoTask methods that
follow the CLR APM, allowing users to execute the DoTask method
asynchronously. When a user calls my BeginDoTask method, I construct an
AsyncResult<DateTime> object. Then I have a thread pool thread
call a small helper method, DoTaskHelper, which wraps a call to the
synchronous DoTask method.

The
DoTaskHelper method simply calls the synchronous version of the method
with a try block. If the DoTask method runs to completion without
failing (throwing an exception), then I call SetAsCompleted to set the
operation's return value. If the DoTask method throws an exception, then
DoTaskHelper's catch block will catch the exception and indicate that
the operation has completed by calling SetAsCompleted, passing in the
reference to the Exception-derived object.

The
application code calls LongTask's EndDoTask method to get the results
of the operation. All EndXxx methods are passed an IAsyncResult.
Internally, the EndDoTask method knows that the IAsyncResult object
passed to it is really an AsyncResult<DateTime> object, casts it,
and calls EndInvoke on it. As discussed earlier,
AsyncResult<TResult>'s EndInvoke method waits for the operation to
complete (if necessary) and then returns the result or throws an
exception indicating back to the caller the outcome of the asynchronous
operation.

Testing and Performance

The FunctionalTest method (see Figure 5)
shows some code that uses my implementation of the APM. It tests the
three rendezvous techniques offered by the APM: wait until done,
polling, and callback method. If you examine the code, you'll see that
it looks identical to any other usage of the APM that you've ever seen.
Of course, this is the point of the whole exercise.



Figure 5 Using LongTask

private static void FunctionalTest()
{
IAsyncResult ar;
LongTask lt = new LongTask(5);

// Prove that the Wait-until-done technique works
ar = lt.BeginDoTask(null, null);
Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));

// Prove that the Polling technique works
ar = lt.BeginDoTask(null, null);
while (!ar.IsCompleted)
{
Console.WriteLine("Not completed yet.");
Thread.Sleep(1000);
}
Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));

// Prove that the Callback technique works
lt.BeginDoTask(TaskCompleted, lt);
Console.ReadLine();
}

private static void TaskCompleted(IAsyncResult ar)
{
LongTask lt = (LongTask)ar.AsyncState;
Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));
Console.WriteLine("All done, hit Enter to exit app.");
}


The PerformanceTest method (see Figure 6)
compares the performance of my IAsyncResult implementation to the
implementation that the CLR provides when using a delegate's BeginInvoke
and EndInvoke methods. My implementation seems to perform better than
the FCL's current implementation, apparently due to it always
constructing a ManualResetEvent whenever it creates its IAsyncResult
object regardless of whether this event is needed by the application.



Figure 6 Testing IAsyncResult Performance

private const Int32 c_iterations = 100 * 1000; // 100 thousand
private static Int32 s_numDone;
private delegate DateTime DoTaskDelegate();

private static void PerformanceTest()
{
AutoResetEvent are = new AutoResetEvent(false);
LongTask lt = new LongTask(0);

Stopwatch sw;

s_numDone = 0;
sw = Stopwatch.StartNew();
for (Int32 n = 0; n < c_iterations; n++)
{
lt.BeginDoTask(delegate(IAsyncResult ar)
{
if (Interlocked.Increment(ref s_numDone) == c_iterations)
are.Set();
}, null);
}
are.WaitOne();
Console.WriteLine("AsyncResult Time: {0}", sw.Elapsed);
s_numDone = 0;
DoTaskDelegate doTaskDelegate = lt.DoTask;

sw = Stopwatch.StartNew();
for (Int32 n = 0; n < c_iterations; n++)
{
doTaskDelegate.BeginInvoke(delegate(IAsyncResult ar)
{
if (Interlocked.Increment(ref s_numDone) == c_iterations)
are.Set();
}, null);
}
are.WaitOne();
Console.WriteLine("Delegate    Time: {0}", sw.Elapsed);
}


Conclusion

I
think it is interesting to understand what is going on inside the CLR
when we use mechanisms such as the APM. After examining my
implementation here, you can get a sense of the size of IAsyncResult
objects, what their state is and how they manage their state. This
understanding can lead to improved ways of architecting your own
applications and to better performance.

In
this column, I used my IAsyncResult implementation to perform
compute-bound tasks using thread pool threads. In a future column, I'll
show how to use my IAsyncResult implementation with I/O-bound
operations.

Send your questions and comments for Jeffrey to
mmsync@microsoft.com.

Jeffrey Richter is a cofounder of Wintellect (www.Wintellect.com),
an architecture review, consulting and training firm. He is the author
of several books, including CLR via C# (Microsoft Press, 2006). Jeffrey
is also a contributing editor to MSDN Magazine and has been consulting
with Microsoft since 1990.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: