CLR Via笔记之ThreadPool/Task
2011-11-18 13:46
218 查看
ThreadPool 是MTA设计,不像STA线程可以支持Com Interop互操作。而Task是ThreadPool的再封装简化,两者都支持异步取消的操作。
ThreadPool 通过CancellationTokenSource 取消线程操作
如果在取消时还想通知或调用的其它的方法,可以针对CancellationTokenSource.Token.Register 的注册回调的方法
Task并行任务的处理(Task,Task<> 同Action,Func<>相似),Task<>的返回结果为委托函数的返回结果
PS: Task有个IsCompleted属性,也可通过循环判断这个属性来阻塞线程。 但是循环判断比Wait更浪费性能,唤醒线程毕况要比轮询调度CPU要有效率,即使你在轮询判断中增加主线程Sleep也是额外的一种消耗。
这点和异步编程模型APM很像,比如在异步执行BeginInvoke时,你可以调用EndInvoke阻塞主线程,也可以根据BeginInvoke的IAsyncReult.IsCompleted属性阻塞线程。
Task用一系的属性来定位异步任务的是以何种情况完成的 IsFaulted/IsCompleted/IsCanceled. 因此除了任务属性直接判断一个任务的完成情况,我们还可以通过后续任务观察前一个任务是何种方式完成的,是否有异常情况。
Task取消任务的异步操作,同样是利用CancellationTokenSource
Task任务结束/中断/异常时等,以这个Task任务做为另一个任务开始的参数,可以通过Task.ContinueWith启动另一个任务执行。
PS:Task.ContinueWith有多个重载,其中第6个重载枚举参数TaskContinuationOptions 可以控制在多种情况下延续任务执行 ,比如在异常/完成/取消/才执行另外的一个任务
Task并行多个子任务(比如类似多线程下载的情况:不停的阻塞主线程任务,并且跟踪所有子线程完任务成情况),当然我们用线程也可以解决这些问题,但是 Task简化了这些代码,优化了线程资源使用与回收。
枚举TaskCreationOptions,指定了子任务以何种方式创建与执行。当然还可以利用TaskFactory实现上述方式
当然你还可以变态点这么写
Task还支持异步编程模型(APM),参照这段代码
Task.Factory.FromAsync的其中一个重载首个参数为异步模型BeginInvoke方法,第二个参数为EndInvoke方法,第三个参数object为首个方法的参数,第四个参数为任务类型枚举,其返回类型为第二个方法的返回类型。
Task的延续任务及工厂方式支持异步UI操作 TaskContinuationOptions.ExecuteSynchronously/ TaskScheduler.FromCurrentSynchronizationContext()
上面仅是CLR的读书笔记,想了解很多理论知识,可以参考CLR Via这本MVP联袂推荐的书。
Task相比手动ThreadPool/Thread编码 简化了一系列任务的并行与串行执行问题,以及子任务的执行情况的控制回调变得相对容易。下面是很久以前写的一个多线程同步数据的例子,原理就是通过线程任务集合对象的所有属性的Finish情况来阻塞等待执行最后的任务。4.0编程下Task方便了这些操作,但是3.5以下我们仍然只能这样实现。
ThreadPool 通过CancellationTokenSource 取消线程操作
private static void CancellingAWorkItem() { CancellationTokenSource cts = new CancellationTokenSource(); ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000)); //取消线程 cts.Cancel(); } private static void Count(CancellationToken token, Int32 countTo) { for (Int32 count = 0; count < countTo; count++) { //终止线程 if (token.IsCancellationRequested) { break; } Thread.Sleep(200); // For demo, waste some time } }
如果在取消时还想通知或调用的其它的方法,可以针对CancellationTokenSource.Token.Register 的注册回调的方法
CancellationTokenSource cts = new CancellationTokenSource(); cts.Token.Register(() => Console.WriteLine("已经取消方法执行1")); cts.Token.Register(() => Console.WriteLine("已经取消方法执行2")); //执行取消操作 cts.Cancel(); //上面注册的两个Console.WriteLine会被执行
Task并行任务的处理(Task,Task<> 同Action,Func<>相似),Task<>的返回结果为委托函数的返回结果
private static void DoTask() { //创建任务 Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000); t.Start(); //阻塞等待任务执行完成 t.Wait(); Console.WriteLine("The sum is: " + t.Result); // An Int32 value } private static Int32 Sum(Int32 n) { Int32 sum = 0; for (; n > 0; n--) checked { sum += n; } return sum; }
PS: Task有个IsCompleted属性,也可通过循环判断这个属性来阻塞线程。 但是循环判断比Wait更浪费性能,唤醒线程毕况要比轮询调度CPU要有效率,即使你在轮询判断中增加主线程Sleep也是额外的一种消耗。
这点和异步编程模型APM很像,比如在异步执行BeginInvoke时,你可以调用EndInvoke阻塞主线程,也可以根据BeginInvoke的IAsyncReult.IsCompleted属性阻塞线程。
Task用一系的属性来定位异步任务的是以何种情况完成的 IsFaulted/IsCompleted/IsCanceled. 因此除了任务属性直接判断一个任务的完成情况,我们还可以通过后续任务观察前一个任务是何种方式完成的,是否有异常情况。
Task.ContinueWith((task) => { var _error = task.Exception; }, TaskContinuationOptions.OnlyOnFaulted); Task.ContinueWith(task => { var _error = task.Exception; }, TaskContinuationOptions.OnlyOnCanceled);
Task取消任务的异步操作,同样是利用CancellationTokenSource
private static void CancelTask() { CancellationTokenSource cts = new CancellationTokenSource(); Task<Int32> t = new Task<Int32>(() => Sum(cts.Token, 10000), cts.Token); t.Start(); //取消异步任务 cts.Cancel(); try { //由于异步任务被取消,任务未正常完成,因此取任务结果会扔出AggregateException异常 Console.WriteLine("The sum is: " + t.Result); } catch (AggregateException ae) { ae.Handle(e => e is OperationCanceledException); Console.WriteLine("Sum was canceled"); } } private static Int32 Sum(CancellationToken ct, Int32 n) { Int32 sum = 0; for (; n > 0; n--) { ct.ThrowIfCancellationRequested(); checked { sum += n; } } return sum; }
Task任务结束/中断/异常时等,以这个Task任务做为另一个任务开始的参数,可以通过Task.ContinueWith启动另一个任务执行。
private static void ContinueWith() { Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000); t.Start(); Task cwt = t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result)); cwt.Wait(); }
PS:Task.ContinueWith有多个重载,其中第6个重载枚举参数TaskContinuationOptions 可以控制在多种情况下延续任务执行 ,比如在异常/完成/取消/才执行另外的一个任务
Task并行多个子任务(比如类似多线程下载的情况:不停的阻塞主线程任务,并且跟踪所有子线程完任务成情况),当然我们用线程也可以解决这些问题,但是 Task简化了这些代码,优化了线程资源使用与回收。
private static void ParentChildTaskDemo() { Task<Int32[]> parent = new Task<Int32[]>(() => { var results = new Int32[3]; new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start(); return results; }); //ContinueWith的任务必须等Parent下的所有子任务全部执行完成才会触发 var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine)); //开启并行多任务 parent.Start(); //等待所有任务执行完成 cwt.Wait(); }
枚举TaskCreationOptions,指定了子任务以何种方式创建与执行。当然还可以利用TaskFactory实现上述方式
Task<Int32[]> parent = new Task<Int32[]>(() => { var results = new Int32[3]; var cts = new CancellationTokenSource(); //var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); //var tf = new TaskFactory<Int32>(); //var tf = new TaskFactory<Int32>(cts.Token); var tf = new TaskFactory<Int32>(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously); var childTasks = new[] { tf.StartNew(() => results[0] = Sum(10000)), tf.StartNew(() => results[1] = Sum(20000)), tf.StartNew(() => results[2] = Sum(30000)) }; return results; });
当然你还可以变态点这么写
Task parent = Task.Factory.StartNew( () => Task.Factory.StartNew( () => Task.Factory.StartNew( ()=>Task.Factory.StartNew( o=>Console.WriteLine(o.ToString()),"测试" ))));
Task还支持异步编程模型(APM),参照这段代码
Task.Factory.FromAsync<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse, null, TaskCreationOptions.None);
Task.Factory.FromAsync的其中一个重载首个参数为异步模型BeginInvoke方法,第二个参数为EndInvoke方法,第三个参数object为首个方法的参数,第四个参数为任务类型枚举,其返回类型为第二个方法的返回类型。
Task的延续任务及工厂方式支持异步UI操作 TaskContinuationOptions.ExecuteSynchronously/ TaskScheduler.FromCurrentSynchronizationContext()
上面仅是CLR的读书笔记,想了解很多理论知识,可以参考CLR Via这本MVP联袂推荐的书。
Task相比手动ThreadPool/Thread编码 简化了一系列任务的并行与串行执行问题,以及子任务的执行情况的控制回调变得相对容易。下面是很久以前写的一个多线程同步数据的例子,原理就是通过线程任务集合对象的所有属性的Finish情况来阻塞等待执行最后的任务。4.0编程下Task方便了这些操作,但是3.5以下我们仍然只能这样实现。
public abstract class MultiThreadWork { public void StartThreadWork(DataTable dtQueue, int taskCount, string userId, string userName) { try { //初始线程范围条件 计算每个线程应该接收范围大小 int averageSize = dtQueue.Rows.Count / taskCount;//平均分配 int lastSize = averageSize + dtQueue.Rows.Count % taskCount;//剩余部分由最后一个线程完成 //生成参数线程对象 List<WorkItem> list = new List<WorkItem>(); for (int i = 0; i < taskCount; i++) { WorkItem item = new WorkItem(); item.DtQueue = dtQueue; item.IsFinish = false; item.UserId = userId; item.UserName = userName; item.StarIndex = averageSize * i;//每个线程接起始点 if (i < taskCount - 1) item.EndIndex = item.StarIndex + averageSize;//每个线程接收结束点 else item.EndIndex = item.StarIndex + lastSize; //最后一个结束点 list.Add(item); } //开启工作线程 foreach (WorkItem item in list) { ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), item); } //阻塞主线程 等待所有子线程结束 while (true) { //bool isAllFinish = true; if (!list.Exists(o => o.IsFinish == false)) break; else Thread.Sleep(500); } //通知订阅对象 AllWorkEventArgs args = new AllWorkEventArgs { DtQueue = dtQueue, IsError = false }; this.AllFinishNotify(args); } catch (Exception ex) { //通知订阅对象 AllWorkEventArgs args = new AllWorkEventArgs { DtQueue = dtQueue, IsError = true, ErrorInfo = ex.Message }; this.AllErrorNotify(args); } } /// <summary> /// 任务工作方式 /// </summary> /// <param name="state"></param> public virtual void DoWork(object state) { WorkItem item = state as WorkItem; try { //ClientFileService service = new ClientFileService(); //service.UploadFile(item.QueueList, item.StarIndex, item.EndIndex, item.UserId, item.UserName); //通知订阅对象 WorkEventArgs args = new WorkEventArgs(); args.IsError = false; args.WorkItem = item; this.SubFinishNotity(args); } catch (Exception ex) { //通知订阅对象 WorkEventArgs args = new WorkEventArgs(); args.IsError = true; args.ErrorInfo = ex.Message; args.WorkItem = item; this.SubErrorNotify(args); } finally { //子线程完成标志 item.IsFinish = true; } } #region 通知事件 /// <summary> /// 错误通知 /// </summary> public event EventHandler<WorkEventArgs> SubError; /// <summary> /// 全部下载完成通知 /// </summary> public event EventHandler<AllWorkEventArgs> AllFinished; /// <summary> /// 子线程完成通知 /// </summary> public event EventHandler<WorkEventArgs> SubFinished; public event EventHandler<AllWorkEventArgs> AllError; /// <summary> /// 全部任务完成触发事件通知方法 /// </summary> protected void AllFinishNotify(AllWorkEventArgs args) { if (AllFinished != null) { AllFinished(this, args); } } protected void AllErrorNotify(AllWorkEventArgs args) { if (AllError != null) { AllError(this, args); } } /// <summary> /// 子任务线程出错触发事件通知方法 /// <param name="args"></param> protected void SubErrorNotify(WorkEventArgs args) { if (SubError != null) { SubError(this, args); } } /// <summary> /// 子任务线程完成触发事件通知方法 /// </summary> /// <param name="args"></param> protected void SubFinishNotity(WorkEventArgs args) { if (SubFinished != null) { SubFinished(this, args); } } #endregion }
相关文章推荐
- 《CLR Via C# 第3版》笔记之(十九) - 任务(Task)
- 《CLR Via C# 第3版》笔记之(十八) - 线程池
- ThreadPoolTaskExecutor的配置使用
- c#的ThreadPool使用笔记(一)
- ThreadPoolTaskExecutor异常收集
- 《CLR via C#》笔记——AppDomain(1)
- ThreadPoolTaskExecutor的配置解释
- Spring线程池ThreadPoolTaskExecutor配置及图解
- c#的ThreadPool使用笔记(三)
- CLR via C# 学习笔记(2012/3/6)
- CLR Via C#笔记:自定义异常
- CLR via C# 学习笔记(2012/3/10)
- CLR Via C# 简单笔记之可空值类型
- CLR Via C# 学习笔记(6) 方法参数相关(out ref params)
- ThreadPoolTaskExecutor
- <NET CLR via c# 第4版>笔记 第19章 可空值类型
- 《CLR Via C# 第3版》笔记之(十四) - 泛型高级
- 【Darwin学习笔记】之TaskThread
- 读经典——《CLR via C#》(Jeffrey Richter著) 笔记_方法执行
- SPRING中的线程池ThreadPoolTaskExecutor