14.并发与异步 - 3.C#5.0的异步函数 -《果壳中的c#》
2017-08-03 11:36
274 查看
14.5.2 编写异步函数
private static readonly Stopwatch Watch = new Stopwatch(); static void Main(string[] args) { Go(); Console.Read(); } private static async Task Go() { await PrintAnswerToLife(); Console.WriteLine("Done"); } private static async Task PrintAnswerToLife() // We can return Task instead of void { await Task.Delay(5000); int answer = 21 * 2; Console.WriteLine(answer); }
编译器会扩展异步函数,它会将任务返回给使用
TaskCompletionSource的代码,用于创建任务,然后再发送信号或异常终止。
除了这些细微区别,可以将
PrintAnswerToLife扩展为下面的等价功能:
private static Task PrintAnswerToLife() { var tcs = new TaskCompletionSource<object>(); var awaiter = Task.Delay(5000).GetAwaiter(); awaiter.OnCompleted(() => { try { awaiter.GetResult(); int answer = 21 * 2; Console.WriteLine(answer + " 耗时:" + Watch.ElapsedMilliseconds + "ms"); tcs.SetResult(null); } catch (Exception ex) { tcs.SetException(ex); } }); return tcs.Task; }
因此,当一个返回任务的异步方法结束时,执行过程会返回等待它的程序(通过一个延续)。
1.返回 Task<TResult>
async Task<int> GetAnswerToLife() { await Task.Delay (5000); int answer = 21 * 2; return answer; //返回类型是Task<int> 所以返回int }
在内部,这段代码向TaskCompletionSource发送一个值,而非null。
void Main() { Go(); } async Task Go() { await PrintAnswerToLife(); Console.WriteLine ("Done"); } async Task PrintAnswerToLife() { int answer = await GetAnswerToLife(); Console.WriteLine (answer); } async Task<int> GetAnswerToLife() { await Task.Delay (5000); int answer = 21 * 2; return answer; }
编译能够为异步函数创建任务,意味我们只需在 I/O 绑定代码底层方法中显式创建一个'TaskCompletionSource'实例。(CPU 绑定代码可以使用
Task.Run创建任务)
2.异步调用图的执行
为了确切理解执行过程,最好将代码重新排列:static async Task Go() { var task = PrintAnswerToLife(); await task; Console.WriteLine("Done"); } static async Task PrintAnswerToLife() { var task = GetAnswerToLife(); int answer = await task; Console.WriteLine(answer); } static async Task<int> GetAnswerToLife() { var task = Task.Delay(5000); await task; int answer = 21 * 2; return answer; }
await 会使执行过程返回它所等待的PrintAnswerToLife,然后再返回Go,它同样会等待并返回调用者。所有这些方法调用都在调用Go的线程上以同步方式执行;这是执行过程的主要同步阶段。
整个执行流程在每一个异步调用后都会等待。这样就可以在调用图中形成一个无并发或重叠的串行流。每一个await表达式都会执行中创建一个“缺口”,之后程序都可以在原处恢复执行。
3.并行性
调用一个异步方法,但是等待它,就可以使代码并行执行。前面例子,有一个按钮添加一个像下面这样的事件处理器Go:_buttion.Click += (sender, args) => Go();
尽管Go是一个异步方法,但是我们并没有等待它,事实上它正是利用并发性来实现快速响应UI:
我们可以使用相同的法则以并行方式执行两个异步操作:
var task1 = PrintAnswerToLife(); var task2 = PrintAnswerToLife(); await task1; await task2;
以这种方式创建的并发性可以支持UI线程或非UI线程上执行的操作,但是它们实现方式有所区别。这两种情况都可以在底层操作上(如
Task.Delay或
Task.Run生成的代码)实现真正的并发性。
在调用堆中,只有操作不通过同步上下文创建,在这之上的方法才可以实现真正的并发性;否则,它们就是前面介绍的伪并发性和简化的线程安全性,其中我们唯一能够优先使用的是
await语句。
例如,它允许我们定义一个共享域_x,然后不需要使用锁就可以在增加它的值:
private static async Task PrintAnswerToLife() { _x++; await Task.Delay(5000); return 21 * 2; }
(但是,这里不能假定_x在await前后均保持相同的值。)
14.5.3 异步Lambda表达式
就像普通的命名(named)方法可以采用异步方式执行一样:async Task NamedMethod() { await Task.Delay (1000); Console.WriteLine ("Foo"); }
只要添加async关键字,未命名(unnamed)方法也可以采用异步:
async void Main() { Func<Task> unnamed = async () => { await Task.Delay (1000); Console.WriteLine ("Foo1"); }; // We can call the two in the same way: await NamedMethod(); await unnamed(); }
异步lambda表达式可用于附加事件处理器:
myButton.Click += async (sender, args) => { await Task.Delay (1000); myButton.Content = "Done"; };
下面代码更简洁:
myButton.Click +=ButtonHandler; async void ButtonHandler(object sender, EventArgs args) { await Task.Delay (1000); myButton.Content = "Done"; }
异步lambda表达式也可以返回
Task<Result>:
Func<Task<int>> unnamed = async () => { await Task.Delay (1000); return 123; }; int answer = await unnamed();
14.5.4 WinRT异步方法
WinRT中,Task等价
IAsyncAction,
Task<TResult>等价
IAsyncOperation<TResult>。
两个类都通过
System.Runtime.WindowsRuntime.dll程序集的
AsTask扩展方法转换为
Task或
Task<TResult>。这个程序集也定义了一个
GetAwaiter方法,它可以操作
IAsyncAction和
IAsyncOperation<TResult>,他们可以直接执行等待操作。
Task<StorageFile> file = KnowFolders.DocumentsLibrary.CreateFileAsync("test.txt").AsTask();
或者:
StorageFile file = await KnowFolders.DocumentsLibrary.CreateFileAsync("test.txt");
14.5.5 异步与同步上下文
1.异常提交
2.OpertionStarted 和 OperationCompleted
14.5.6 优化
1.同步完成
异步方法可能会在等待之前返回,假设有下面这样方法,它会缓存下载的网页:static Dictionary<string,string> _cache = new Dictionary<string,string>(); async Task<string> GetWebPageAsync (string uri) { string html; if (_cache.TryGetValue (uri, out html)) return html; return _cache [uri] = await new WebClient().DownloadStringTaskAsync (uri); }
假设某个URI已经存在于缓存之中,那么执行过程会在等待发生之前返回调用者,同时这个方法会返回一个已发送信号的任务,这称为同步完成。
如果等待一个同步完成任务,那么执行过程不会返回调用者并通过一个延续弹回——相反,它会马上进入下一条语句。编译器会通过检查等待着的
IsCompleted属性来实现这种优化;换言之,无论何时执行等待:
Console.WriteLine(await GetWebPageAsync ("http://oreilly.com"));
在同步完成时,编译器会生成中止延续的代码:
var awaiter = GetWebPageAsync().GetAwaiter(); if (awaiter.IsCompleted) Console.WriteLine(awaiter.GetResult()); else awaiter.OnCompleted(()=>Console.WriteLine(awaiter.GetResult()));
编写从不等待的异步方法是允许的,但是编译器会发出警告:
async Task<string> Foo() {return "abc";}
在重写虚方法/抽象方法时,如果不需要实现异步处理,那么很适合使用这种方法。
实现相同结果的另一种方法是使用
Task.FromResult,它会返回一个已发送信号的任务。
Task<string> Too() { return Task.FromResult("abc"); }
如果从UI线程调用,GetWebPageAsync方法本身就具有线程安全性,在成功执行后多次调用这个方法(初始化多个并发下载),而且不用锁来保证缓存。
但是,多次处理同个URI,会生成多个冗余下载,最终更新同一个缓存记录(最后个覆盖前面)。如果没有错,那更高效的方式是让同一个URI的后续调用(异步)等待正在处理的请求。
还有一个简单方法(不需要锁或信号结构):
创建一个“未来”缓存(
Task<string>),代替字符串缓存:
static Dictionary<string,Task<string>> _cache = new Dictionary<string,Task<string>>(); Task<string> GetWebPageAsync (string uri) { Task<string> downloadTask; if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask; return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri); }
这里没有使用await,直接返回获得的任务。
如果重复调用GetWebPageAsync处理同一个URI,可以保证能获得同一个
Task<string>对象。(这样做另一个好处,降低GC负载)
2.避免过度回弹
ConfigureAwait的作用:使当前
async方法的
await后续操作不需要恢复到主线程(不需要保存线程上下文)。
对于循环中多次调用的方法,通过调用ConfigureAwait,可以避免重复回弹UI消息循环带来的开销。
void Main() { A(); } async void A() { await B(); } async Task B() { for (int i = 0; i < 1000; i++) await C().ConfigureAwait (false); } async Task C() { /*...*/ }
B方法和C方法撤销UI使用的简单线程安全模式,代码运行在UI线程上,而只能在await语句中优先占用。然而,A方法不受影响,它在启动之后就一直停留在UI线程。
14.6 异步模式
14.6.1 取消
通常要能够在并发操作启动后,取消这个操作(用户请求)。实现这个操作的简单方式是使用取消令牌,编写一个封装类:class CancellationToken { public bool IsCancellationRequested { get; private set; } public void Cancel() { IsCancellationRequested = true; } public void ThrowIfCancellationRequested() { if (IsCancellationRequested) throw new OperationCanceledException(); } }
当调用者想取消操作时,它会调用传递给Foo的取消令牌上的
Cancel。因此出现
OperationCanceledException异常。
例:
async void Main()
{
var token = new CancellationToken();
Task.Delay (5000).ContinueWith (ant => token.Cancel()); // Tell it to cancel in two seconds.
await Foo (token);
}
// This is a simplified version of the CancellationToken type in System.Threading:
class CancellationToken { public bool IsCancellationRequested { get; private set; } public void Cancel() { IsCancellationRequested = true; } public void ThrowIfCancellationRequested() { if (IsCancellationRequested) throw new OperationCanceledException(); } }
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000);
cancellationToken.ThrowIfCancellationRequested();
}
}
CLR提供一个
CancellationToken类型,然而它没有
Cancel()方法;
但是这个方法提供另一个类型
CancellationTokenSource。这种分离具有一定安全性:只能访问
CancellationToken对象的方法可以检查取消操作,但不能初始化取消操作。
CancellationTokenSource有一个
Token属性,可以返回一个
CancellationToken。
var cancelSource = new CancellationTokenSource(); Task.Delay(5000).ContinueWith(ant => cancelSource.Cancel()); await Foo (cancelSource.Token);
在CLR中,大多数异步方法提供了取消令牌,包括Delay。
public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken);
我们不需要再调用
ThrowIfCancellationRequested,因为Task.Delay已经包含这个操作。
同步方法也支持取消操作(如
Task.Wait方法)。这种情况,取消指令必须以异步方式执行(例如,在另一个任务中执行)。
例如:
var cancelSource = new CancellationTokenSource(5000); Task.Delay(5000).ContinueWith(ant => cancelSource.Cancel()); ...
Framework 4.5开始,创建
CancellationTokenSource可以指定一个时间间隔,表示一定时间段后初始化取消操作。
无论同步或者异步,最好指定一个超时时间:
var cancelSource = new CancellationTokenSource (5000); try { await Foo (cancelSource.Token); } catch (OperationCanceledException ex) { Console.WriteLine ("Cancelled"); }
CancellationToken结构提供一个
Register方法,可以用于注册一个回调代理,然后在取消操作发生时触发,它会返回一个对象,用于撤销注册。
IsCanceled返回
true,
IsFaulted返回
false。出现
OperationCanceledException异常,任务进入“已取消”状态。
14.6.2 进度报告
有时,异步操作需要在运行时报告进度。有一种简单的解决方法是给异步传入一个Action代理,然后进度发生变化时就会触发这个方法:
async void Main() { Action<int> progress = i => Console.WriteLine (i + " %"); await Foo (progress); } Task Foo (Action<int> onProgressPercentChanged) { return Task.Run (() => { for (int i = 0; i < 1000; i++) { if (i % 10 == 0) onProgressPercentChanged (i / 10); // 执行CPU绑定代码. } }); }
这段代码运行在控制台应用程序上,但是它不适合运行在富客户端场景,因为它可以从工作者线程报告进度,这可能会给使用者线程带来线程安全问题。
IProgress<T>
和Progress<T>
它们的作用是“包装”一个代理,这样UI应用程序就可以通过同步上下文安全地报告进度。这个接口只定义一个方法:
public interface IProgress<in T> { // 参数: // value: // 进度更新之后的值。 void Report(T value); }
Iprogress<T>用法很简单:
Task Foo (IProgress<int> onProgressPercentChanged) { return Task.Run (() => { for (int i = 0; i < 1000; i++) { if (i % 10 == 0) onProgressPercentChanged.Report (i / 10); // 执行CPU绑定代码. } }); }
Progress<T>类有一个构造方法,它可以接受
Action<T>类型包装的代理,
var progress = new Progress<int>(i => Console.WriteLine (i + " %")); await Foo (progress);
(
Progress<T>还有一个ProgressChanged事件,我们可以订阅这个事件,同时不要给构造函数传入一个操作代理)
在实例化
Progress<int>时,这个类会波桌同步上下文(如果有)。然后
Foo调用
Report时,它会通过上下文调用代理对象。
将替换为包含一系列属性的自定义类型,就可以在异步方法中实现更复杂的进度报告。
由
IProgress<T>生成的值一般是“废弃值”(例如,完成比或已下载字节),而由
IObserver<T>的
MoveNext生成的值通常由结果组成,这个正式调用它的初衷。
14.6.3 基于任务的异步模式(TAP)
一个TAP方法必须:返回一个“热”(正在运行)
Task或
Task<TReuslt>
拥有“Async”后缀
如果支持取消或进度报告,重载可接收取消令牌或
IProgress<T>
快速返回调用者
在I/O 绑定代码中不占用线程。
14.6.4 任务组合器
CLR包含两个任务组合器:Task.WhenAny和
Task.WhenAll。
我们假定以下方法:
async Task<int> Delay1() { await Task.Delay (1000); return 1; } async Task<int> Delay2() { await Task.Delay (2000); return 2; } async Task<int> Delay3() { await Task.Delay (3000); return 3; }
1.WhenAny
当任务组中任意一个任务完成,它就完成。下面任务会1秒内完成:async void Main() { Task<int> winningTask = await Task.WhenAny (Delay3(), Delay1(), Delay2()); Console.WriteLine ("Done"); Console.WriteLine (winningTask.Result); // 1 }
因为
Task.WhenAny本身会返回一个任务,所以我们要等待它,然后它会返回先完成的任务。这个例子完全不会阻塞——包括访问
Result属性的最后一行语句(因为winningTask已经完成)。但是,最好还是要等待任务(winningTask):
Console.WriteLine (await winningTask); // 1
因为这时任何异常都会重新抛出,而不需要包装一个
AggregateException异常中。事实上,我们可以进一步操作中同时执行两个
await:
int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3());
如果后面没有一个未完成任务出现错误,那么除非后面等待了这个任务,否则该异常将不会被捕捉到。
WhenAny适合用于应用操作超时时间或取消操作:
async void Main() { Task<string> task = SomeAsyncFunc(); //返回task Task winner = await (Task.WhenAny (task, Task.Delay(5000))); //返回Task.Delay(5000) if (winner != task) throw new TimeoutException(); string result = await task; // 解开结果/重新抛出异常 } async Task<string> SomeAsyncFunc() { await Task.Delay (10000); return "foo"; }
注意这个例子不同类型的任务去调用
WhenAny,所以完成的任务报告为一个普通
Task(而非
Task<string>)
2.WhenAll
当传入的所有任务完成时,它才完成。下面的任务会在3秒之后完成(同时演示了分叉/联合模式)await Task.WhenAll(Delay1(), Delay2(), Delay3());
不使用
WnenAll,而依次等待task1,task2和task3,也可以得到相似的结果:
Task task1 = Delay1(),task2 = Delay2(),task3 = Delay3(); await task1;await task2;await task3;
这种方式,除了三次等待效率低于一次等待外,区别:如果
task1出错,不执行task2/task3。而且异常无法处理。
相反,
Task.WhenAll只有在所有任务完成后才会完成——即使中间出现错误。如果出现多个错误,它们的异常会组合到任务的
AggregateException之中。
然而,等待组合的任务只能捕捉到第一个异常,所以如果要查看所有异常,则必须这样做:
Task task1 = Task.Run (() => { throw null; } ); Task task2 = Task.Run (() => { throw null; } ); Task all = Task.WhenAll (task1, task2); try { await all; } catch { Console.WriteLine (all.Exception.InnerExceptions.Count); // 2 }
结果输出为:2
使用类型为
Task<TResult>的任务调用WhenAll,会返回一个
Task<TResult[]>,这是所有任务的结果组合。如果执行等待操作时,那么这个结果会变成
TResult[]:
Task<int> task1 = Task.Run (() => 1); Task<int> task2 = Task.Run (() => 2); int[] results = await Task.WhenAll (task1, task2); // { 1, 2 }
下面一个例子,并行下载多个URI,然后计算它们的总下载大小:
async void Main() { int totalSize = await GetTotalSize ("http://www.qq.com http://www.weibo.com http://www.163.com".Split()); totalSize.Dump(); } async Task<int> GetTotalSize (string[] uris) { IEnumerable<Task<byte[]>> downloadTasks = uris.Select (uri => new WebClient().DownloadDataTaskAsync (uri)); byte[][] contents = await Task.WhenAll (downloadTasks); return contents.Sum (c => c.Length); }
字段代码效率不行,我们只能在每一个任务都完成之后才能处理字节数组。如果在下载之后马上将字节数组压缩为实际长度,那么效率会提高。这正式异步lambda发挥作用地方,因为我们在LINQ的Select查询操作符插入一个await表达式:
async Task<int> GetTotalSize (string[] uris) { IEnumerable<Task<int>> downloadTasks = uris.Select (async uri => (await new WebClient().DownloadDataTaskAsync (uri)).Length); //await .... Length int[] contentLengths = await Task.WhenAll (downloadTasks); return contentLengths.Sum(); }
3.自定义组合器
编写自定义的任务组合很实用。最简单的组合器可以接受一个任务,下面例子允许在特定超时时间里等待任意任务:async void Main() { string result = await SomeAsyncFunc().WithTimeout (TimeSpan.FromSeconds (2)); result.Dump(); } async Task<string> SomeAsyncFunc() { await Task.Delay (10000); return "foo"; } //Task<TResult> 扩展方法 public static class Extensions { public async static Task<TResult> WithTimeout<TResult> (this Task<TResult> task, TimeSpan timeout) { Task winner = await (Task.WhenAny (task, Task.Delay (timeout))); if (winner != task) throw new TimeoutException(); return await task; // 解开结果/重新抛出异常 } }
下面代码通过一个
CancellationToken“抛弃”一个任务:
public static class Extensions { public static Task<TResult> WithCancellation<TResult> (this Task<TResult> task, CancellationToken cancelToken) { var tcs = new TaskCompletionSource<TResult>(); var reg = cancelToken.Register (() => tcs.TrySetCanceled ()); task.ContinueWith (ant => { reg.Dispose(); if (ant.IsCanceled) tcs.TrySetCanceled(); else if (ant.IsFaulted) tcs.TrySetException (ant.Exception.InnerException); else tcs.TrySetResult (ant.Result); }); return tcs.Task; } }
任务组合器有时候可能很复杂,需要22章介绍的各种信号结构。
下面的组合器作用与WhenAll类似,唯一不同的是如果任意任务出现错误,那么最终任务也会马上出错:
async void Main() { Task<int> task2 = Task.Delay (5000).ContinueWith (ant => {return 53;}); Task<int> task1 = Task.Run (() => {throw null; return 42; } ); //--->未将对象引用为实例 int[] results = await WhenAllOrError (task1, task2); } async Task<TResult[]> WhenAllOrError<TResult> (params Task<TResult>[] tasks) { var killJoy = new TaskCompletionSource<TResult[]>(); foreach (var task in tasks) task.ContinueWith (ant => { if (ant.IsCanceled) killJoy.TrySetCanceled(); //尝试将底层Task <TResult>转换为已取消状态。 else if (ant.IsFaulted) killJoy.TrySetException (ant.Exception.InnerException); }); return await await Task.WhenAny (killJoy.Task, Task.WhenAll (tasks)); }
这里先创建一个TaskCompletionSource,它的唯一作用的终止出错的任务(此例)。因此,这里不会调用它的
SetResult方法,只会调用它的
TrySetCanceled和
TrySetException方法。
这个例子更适合
ContinueWith,而不是GetAwaiter().OnCompleted,因为我们不需要访问任务的结果,也不需要在此弹回UI线程。
相关文章推荐
- 14.并发与异步 - 2.任务Task -《果壳中的c#》
- 14.并发与异步 - 1.线程处理Thread -《果壳中的c#》
- GCD之同步函数、异步函数和串行队列、并发队列
- Understanding Unix/Linux Programming 笔记:chapter 14:线程机制:并发函数的使用
- C#使用多线程并发之异步委托IAsyncResult
- C# 通过委托+异步的方式约束函数的执行超时时间
- 基于异步队列的生产者消费者C#并发设计
- 14、C#基础整理(函数)
- 深入理解 c# 第一章 使用异步函数在windows form 中显示产品 c#5
- C# 控制文本并发访问
- C#调用openCV函数及其移植
- LiteHttp 第十节:异步并发与调度策略
- C#常用函数表[转]
- 深入理解并发/并行,阻塞/非阻塞,同步/异步
- C#常用函数表及Asp.net(C#)常用函数表
- c#.net常用函数和方法集
- c#中 使用javascript的alert函数
- C#常用类:文件异步下载、IIS管理、IIS目录管理、文件上传等
- C#操作word的一些函数
- 模仿TMALL搜索,下拉提示 优化 用户keypress停顿200毫秒间隔时,在执行异步取数据操作 通过underscore的函数debounce来实现