您的位置:首页 > 产品设计 > 产品经理

线程阶段性总结——APM,ThreadPool,Task,TaskScheduler ,CancellationTokenSource

2015-07-01 13:53 676 查看
不管我们使用thread,threadPool,task,还是APM异步,本质都是在使用多线程。对于新手来说,不太敢用多线程的原因,就我个人的体验来说,就是对多线程的异常捕获方式或时机缺乏了解,而一旦出现异常没有捕获,将会带来难以发现的bug,进而造成系统崩溃。而多线程本身也不是一朝一夕就能学好的,必须不断的去学习总结,所以我个人认为你要用一种线程模型,首先要对它有足够的了解,特别是对异常的捕获。如果你没有完全的把握,最好在实际开发中谨慎的用多线程。

1,APM异步编程模型。

采用BeginXXX和EndXXX方法。关于异常的捕捉,对于刚调用BeginXXX抛出的异常,异步操作可能还没有进入队列。这种异常一般可以忽略。对于进入异步操作时发生的异常,会将错误码放入IAsyncResult对象中,在我们调用EndXXX方法时,会将这个错误码转换成一个恰当的Exception再次抛出。所以对于APM编程模型来说,我们只用对EndXXX方法进行异常捕捉。伪代码:

Try
{
Result = someObj.EndXXX(IAsyncResult);
}
Catch(xxxException e)
{
//异常处理
}


注意事项:

1) 对于EndXXX方法的调用是必须的,否则可能会造成资源的泄漏,即使你可能不关心异步调用的返回结果,也要记住调用这个方法。

2) 只能调用一次EndXXX方法。

3) 调用EndXXX方法总是使用和BeginXXX时相同的对象。这里辨别的是引用,引用不同就被视为不同的对象。对于Delegate要补充一点,即使是相同签名的委托,它们被编译器编译成具体的类,这些类的类名是不一样的。

4) 不能取消异步I/O限制的异步操作。不要迷信这句话,他说的是I/O操作,是指的一个请求动作,如果我们的是多次请求,比如异步分块上传文件,是可以做取消功能的。

5) FCL中有许多的I/O操作类都实现了APM。如派生自System.IO.Stream的类,Socket,Dns,WebRequest,还有SqlCommand等等。它们都提供了BeginXXX和EndXXX方法。

6) 可以用APM来执行任何方法,我们只需要定义一个与方法签名一致的delegate,delegate编译后会生成一个BeginInvoke和EndInvoke方法来支持APM操作。

2,Thread & ThreadPool

Thread和ThreadPool发起异步的缺点:

1)没有内建的机制知道任务何时完成。

2)没法得到任务的返回值。

Thread的开销太大,尽量用ThreadPool,除非你要显示指定你的thread为前台线程或要对线程设置优先级,否则就不要用thread。

注意:线程池是由所有的AppDomain共享的。一个CLR维持一个线程池。

3,Task

Task的引入,解决了上面的两个问题。

1)Task可通过Wait()方法来等待任务的完成。这个方法是阻塞的。

2)通过Task.Result可以得到返回结果。在Result内部调用了Wait方法,所以查询这个属性是阻塞的。

3)对于任务函数的未处理异常,会被包装成AggregateException异常抛出。可捕捉Wait()方法和Result属性。通过AggregateException的InnerExceptions可以进一步查询具体的异常。

4)Task的静态方法WaitAny和WaitAll可以等待多个任务返回。同样可以捕捉这两个方法的异常。

5)对于没有调用Wait,Result,Exception来查询未处理异常的情况,例如:只调用了Task.Start方法。Task对象被回收时,Finalize方法会再次抛出这个异常终止进程。可以向TaskScheduler.UnobservedTaskException事件注册一个方法,来处理这类异常。通过UnobservedTaskExceptionEventArgs的SetObserved方法,可以忽略掉这个异常,使进程不会终止。

6)构造Task时,可以传递CancellationToken对象,以支持取消。如果是任务函数,通过调用CancellationToken.ThrowIfCancellationRequested 抛出的异常,类型是OperationCanceledException。如果任务函数没有传递CancellationToken对象,那么抛出的异常是TaskCanceledException,相当于任务级别的取消。

7) Task的ContinueWith方法可以在第一个任务完成时开启第二个任务。这个功能很强大,ContinueWith方法并不阻塞调用线程,它是异步的。我们可以在ContinueWith中写一个事件回调方法,它可以起到事件完成通知的作用。但它只能收到任务完成的通知,要实现任务进度的更新通知,到目前为止,task依然做不到。

Task<int> t = new Task<int>(() => Sum(100));

t.Start();
t.ContinueWith(task => Console.WriteLine("result:" + task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
t.ContinueWith(task => Console.WriteLine("canceled"), TaskContinuationOptions.OnlyOnCanceled);
t.ContinueWith(task => Console.WriteLine("failed"), TaskContinuationOptions.OnlyOnFaulted);


对于上面的这一窜代码,如果有未处理异常,同样会造成进程终止。你同样可以用TaskScheduler.UnobservedTaskException事件注册一个方法来处理。

8)Task可以指定子任务,子任务没有完成,父任务的ContinueTask也不会执行。关于异常和上面的处理方法一样。因为这个也是不阻塞的,未处理异常暂时也只能在TaskScheduler.UnobservedTaskException里处理。

Task<Int32[]> parent = new Task<int[]>(() =>
{
Int32[] result = new Int32[3];
new Task<Int32>(() => result[0] = Sum(100), TaskCreationOptions.AttachedToParent).Start();
new Task<Int32>(() => result[1] = Sum(200), TaskCreationOptions.AttachedToParent).Start();
new Task<Int32>(() => result[2] = Sum(300), TaskCreationOptions.AttachedToParent).Start();

return result;
});

parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, num => Console.WriteLine(num)));

parent.Start();


9)TaskFactroy可以简化一组相似Task的创建工作。

Task parent = new Task(() =>
{
CancellationTokenSource cts = new CancellationTokenSource();

TaskFactory<Int32> tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

//create three child task
var childTasks = new[]{
tf.StartNew(()=>Sum(cts.Token,100)),
tf.StartNew(()=>Sum(cts.Token,200)),
tf.StartNew(()=>Sum(cts.Token,Int32.MaxValue))
};

//when one failed,cancel the other
for (int i = 0; i < childTasks.Length; i++)
{
childTasks[i].ContinueWith(task => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
}

//display the maxvalue
tf.ContinueWhenAll(
childTasks, completeTasks => completeTasks.Where(
task => !task.IsCanceled && !task.IsFaulted).Max(t => t.Result),
CancellationToken.None).ContinueWith(task => Console.WriteLine("the max is:" + task.Result));

});

//show exception
parent.ContinueWith(p =>
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("error occours:");
foreach (var e in p.Exception.Flatten().InnerExceptions)
{
sb.AppendLine(e.Message);
}
Console.WriteLine(sb.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);

parent.Start();


4,对于协作取消要用CancellationTokenSource类。

1)CancellationTokenSource.Token方法返回CancellationToken,可以将CancellationToken传入我们的工作方法,并查询CancellationToken.IsCancellationRequested属性来获得操作是否已取消。取消的情况下,可以结束工作方法。

2)一般在主线程调用取消方法。CancellationToeknSource.Cancel。

3)取消时可以加入回调方法,通过CancellationToken.Register方法注册。对于回调方法抛出的异常,可以捕捉Cancel方法,异常会被包装到AggregateException异常中,查询InnerExceptions可的异常的详细信息。

4)CancellationTokenSource的静态方法CreateLinkedTokenSource可以创建一个关联的CreateLinkedTokenSource对象。任意其中的一个CreateLinkedTokenSource被取消,这个关联的CreateLinkedTokenSource就会被取消。

5,任务调度器。

分为线程池任务调度器(thread pool task scheduler)和同步上下文任务调度器(synchroliazation context task scheduler)。其中同步上下文任务调度器能将所有的任务调度给UI线程,这对于更新界面的异步操作相当有用!默认的调度器是线程池任务调度器。

非UI线程更新UI界面会报错,可以用下面的方法,指定同步上下文任务调度器:

TaskScheduler syncSch = TaskScheduler.FromCurrentSynchronizationContext();

Task<int> t = new Task<int>(() => Sum(100));

//update UI with Synchronizationcontext
t.ContinueWith(task => Text = task.Result.ToString(), syncSch);

t.Start();


6,非UI线程更新UI界面的方式总结

详见我的另一篇文章:

/article/5184274.html

7,Parallel

这个类提供了For,Foreach,Invoke静态方法。它内部封装了Task类。主要用于并行计算。

private void ParallelTest2()
{
for (int i = 1; i < 5; i++)
{
Console.WriteLine(DoWork(i));
}
//和上面的代码等价,但是是多线程并行执行的,注意这里的结束index不包含5
var plr = Parallel.For(1, 5, i => Console.WriteLine(DoWork(i)));
}

private int DoWork(int num)
{
int sum = 0;
for (int i = 0; i <= num; i++)
{
sum += i;
}
return sum;
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: