您的位置:首页 > 其它

.NET4.0并行计算技术基础——来自bitfan(数字世界一凡人)

2011-02-10 16:04 711 查看
.NET 4.0 并行计算技术基础(1)
.NET 4.0 并行计算技术基础(2)
.NET 4.0并行计算技术基础(3)
.NET 4.0并行计算技术基础(4)
.NET4.0并行计算技术基础(5)
.NET4.0并行计算技术基础(6)
.NET4.0并行计算技术基础(7)
.NET4.0并行计算技术基础(8)
.NET4.0并行计算技术基础(9)
.NET4.0并行计算技术基础(10)
.NET4.0并行计算技术基础(11)
.NET4.0并行计算技术基础(12)
结束语:迎接新一轮的技术进步浪潮

摘记:
一、任务的取消
ParallelLoopState.Break()方法,:在完成当前的这轮工作之后,不再执行后继的工作,但在当前这轮工作开始之前“已经在执行”的工作,则必须完成。
ParallelLoopState.Stop方法时,不但不会再创建新的线程执行并行循环,而且当前“已经在执行”的工作也应该被中止
Stop 和 Break 的方法的区别非常微妙,需要仔细体会,可以简单地用两句话来表达:
n ParallelLoopState.Stop 方法中止“当前”及“以后”的工作任务,会导致 ParallelLoopState 对象的 IsStop 属性值等于 true 。
n ParallelLoopState.Break() 方法仅中止“以后”的工作任务,会导致 ParallelLoopState 对象的 LowestBreakIteration 属性值等于 true 。
细则:
1.
ParallelOptions options = new ParallelOptions { CancellationToken = cts.Token };
Parallel.For(0, 10, options, i =>
{
if (options.CancellationToken.IsCancellationRequested == true)
{ return; }
});

2.
ParallelOptions opt = new ParallelOptions();
Parallel.For(10, 40, (int i, ParallelLoopState pls) =>
{
//检测一下是否需要取消并行循环
if (opt.CancellationToken.IsCancellationRequested)
{
pls.Stop();  //中止并行循环。
return;
}
//检测其他线程是否已中止并行循环
if (pls.IsStopped)
{
//提前中止并行循环需要执行的代码
return;
}
});

3.定义一个CancellationTokenSource,调用cts.Cancel()请求终止并行计算任务。但是必须进行设置声明
ParallelOptions options = new ParallelOptions { CancellationToken = cts.Token };
CancellationTokenSource cts = new CancellationTokenSource();

并且,在执行方法中,被取消任务后,再执行任务时,必须
cts = new CancellationTokenSource();//对于“外界”调用cts.Cancel()很重要 ,必须的

二、摘录
1..依据“阿姆达尔定律”得出的这个结论令人沮丧!但由此也可以得到一个重要的结论:如果希望使用并行计算来提升程序的性能,那么应尽可能地减少程序中串行代码的比例

2.并行计算带来的复杂性
上面所介绍的例子非常清晰地展示出并行程序设计的特殊性,并不是“并行”总比“串行”快的,到底怎样才能获得最大的并行加速系数,需要仔细地设计并行算法,并且应该在多个典型的软硬件环境中进行对比测试,最终才能得到理想的并行设计方案。
开发并行程序的关键在于要找到一个合适的任务分解方案,并行总要付出一定的代价,比如线程同步、线程通讯、同步缓冲数据等都是开发并行程序必须认真考虑的问题。

3.从性能比较的结果来看,并行计算并非总具有性能优势,这也提醒我们要注意并行计算的应用场合:
(1)每个数据项要执行的处理工作量很大,需要耗费较多的时间
(2)要处理的数据集合很大。

4.Parallel.For的工作原理:
TPL在Parallel.For方法内部创建了一个任务对象rootTask,然后调用此对象的RunSynchronously()方法以“同步”方式执行并行循环,注意,别被这里的单词“Synchronously(中文译为“同步地”)给欺骗了,此方法绝不是串行执行的。因为此方法接收一个参数,此参数引用一个任务调度器对象,由此调度器对象将任务进行分解,交由线程池中的线程执行,这是实现并行循环的关键!
任务交给线程池中的线程执行之后,Parallel.For方法调用rootTask .Wait()方法等待所有线程完成工作。最后,销毁rootTask对象。
通过仔细分析源码,我们明白了为何在串行代码中使用Parallel.For会出现“串行à并行à串行”这种执行顺序。
另外,我们还可以得到另一个结论:
使用Parallel启动的并行计算,在底层使用Task来完成。

5.Parallel的三个静态方法:
///1.使用Parallel.Invoke并行执行任务
Parallel.Invoke(
() => StatementA(),
() => StatementB(),
() => StatementC());

///2.使用Parallel.For并行访问数据(partition))
///要等到Parallel.For执行完后才会执行"其他语句"
///for
for (int i = 0; i < 100; i++)
{
DoWork(i);  //完成某些工作
}
///Parallel.For
Parallel.For(0, 100, (i) => DoWork(i));
//……其他语句

///3.使用Parallel.ForEach
///foreach
//循环迭代对象集合中的每个对象
foreach (var item in new string[] {"2","1"})
{
//Process(item); //处理单个对象
}
///Parallel.For
Parallel.ForEach(new string[] { "1", "2" }, item =>
{
//Process(item)
});


6.任务并行库的工作原理
任务由线程负责执行,为了获取较高的性能,TPL使用线程池中的线程,并且使用了一个与线程池直接集成的“任务调度器(Task Scheduler)”来负责分派工作任务给线程,这个调度器使用的任务分派策略称为“Work-stealing”。



如图 19‑16所示,线程池中的每个线程都拥有一个专有的(本地的)任务队列,当线程创建任务(即Task类的实例)时,默认设置下,这些任务被放入了线程本地工作队列中。
如果任务本身是通过调用ThreadPool.QueueUserWorkItem()添加的,则此任务会被添加到一个全局队列(global queue)中,这一全局队列就是图 19‑16中所示的“线程池任务队列”。
以下是任务调度器实现任务调度的基本过程:
当任务调度器开始分派任务时,它先检查一下创建此任务的线程是不是线程池中的线程(这种线程拥有一个本地的任务队列),如果不是,此任务被加入到线程池全局任务队列中,如果是,任务调度器检查此任务是否设置了TaskCreationOptions.PreferFairness标记,如果设置了,则此任务被加入到线程池全局任务队列中,否则,还是被放入到线程的本地队列中。
当一个线程开始执行时,它优先搜索自己的专有任务队列,当此队列为空时,它才会去搜索全局任务队列。由此可见,这种调度策略实际上是其于优先级的,本地工作队列比全局队列拥有更高的优先级。
上述这种默认的调度策略适用于绝大多数情况,但不可能是所有的情况,如果需要对线程本地队列和线程池全局队列中的任务一视同仁,在不改变调度策略的情况下(这个策略是由.NET为线程池所提供的默认调度器实现的,不可改),可以通过将需要“一视同仁”的Task任务直接放到线程池全局队列而不是线程本地队列中实现,其具体的实现方法就是在创建任务时,设置它的 TaskCreationOptions.PreferFairness标记。

提示:
如果并行执行是通过Parallel类的Invoke、For和ForEach方法启动的,则不能为其指定TaskCreationOptions.PreferFairness标记,只有在显式创建Task类的代码中可以设置此标记。下一小节将介绍如何直接使用Task类进行基于“任务”的并行编程。

下面对任务并行库的工作原理作一个小结。
简单地说:线程就是“工人”,它负责执行“任务”,任务由任务调度器负责分配。
任务调度器具有很强的智能性,它能自动协调各个任务的分配,不让“忙”的线程“忙死”,“闲”的线程“闲死”。从线程的角度看,由于有任务调度器的公平管理,所有线程都是“团结互助”的“雷锋”。
将线程之间合作的工作从线程自身的职责中“剥离”出来,交由任务调度器来统一协调管理,这是.NET 4.0并行计算任务库设计的一个关键点。如果让线程自身来负责处理工作任务的合理分配,必然会在线程函数内增加同步的代码,这会让整个软件系统变得复杂和难于调试。
我们可以适当地将TPL的这种设计思想引申到社会生活领域:如果将线程比喻为“政府官员”,那么,任务调度器就可以看成是一种“制度”,正是在“制度”的制约之下,“官员”才可能廉洁公正。
在现实社会中,指望贪官他们“良心”发现而自己“金盆洗手”是不现实的,必须建立起一种有效的制度,让所有官员都置于强有力的监督之下,“贪污”的行为自然会受到极大的制约。这是题外话了。

7.了解任务Task的状态
“风萧萧兮易水寒,壮士一去兮不复还”,与线程对象一样,每一个Task对象都会经历一个生命周期,在这个生命周期的每个特定阶段,对象处于一个特定的状态,并且不可能由后一个状态“回转”到前一个状态。简单地说,Task对象的生命是一条单行线,一旦上路,就只能往前走,直到生命的终结,期间绝无走回头路的可能。



如图 19‑17所示,Task对象拥有8个状态,这些状态之间可以相互转换。
其中,Created是起始状态,而Canceled、Faulted和RanToCompletion是3个终止状态,其余状态都是中间状态。
通过对Task类特定的方法的调用,Task对象会自动进行状态的转换。通常情况下软件工程师无需考虑这一转换过程,因为它们是由TPL基础架构直接管理的。
Task类提供了一个Status属性来表明当前对象所处的状态,但出于使用方便考虑,Task类另外还提供了3个相关属性用于确定对象是否处理3个终止状态之一:IsCanceled、IsFaulted和IsCompleted。

8.Task类的一些使用
///1.使用 ContinueWith
///TaskContinuationOptions 是一个枚举,可以使用它来指定“在何种情况下”才执行后继的工作任务。
///有未捕获的异常时、无捕获的异常时等的TaskContinuationOptions
Task task1 = new Task(() =>MethodA());
Task task2 = task1.ContinueWith((PrevTask) => MethodB(), TaskContinuationOptions.OnlyOnFaulted);
task1.Start();

///2.使用 Wait 系列方法1
Task[] tasks = new Task[3]
{
Task.Factory.StartNew(() => MethodA()),
Task.Factory.StartNew(() => MethodB()),
Task.Factory.StartNew(() => MethodC())
};
Task.WaitAll(tasks);
///使用 Wait 系列方法2
Task t1 = Task.Factory.StartNew(() => MethodA());
Task t2 = Task.Factory.StartNew(() => MethodA());
Task t3 = Task.Factory.StartNew(() => MethodA());
Task.WaitAll(t1, t2, t3);

///3.创建父子类型的任务
Task tskParent = new Task(() =>
{
// 父任务完成的工作
// 创建后继子任务并自动启动
Task.Factory.StartNew(() => MethodA());
Task.Factory.StartNew(() => MethodB());
Task.Factory.StartNew(() => MethodC());
});
// 启动父任务
tskParent.Start();
// 等待整个任务的完成
tskParent.Wait();

///4.非阻塞方式等待
///不管是使用 ContinueWith 还是 Wait 系列方法,调用这些方法的线程都会阻塞等待。
///如果不希望阻塞当前线程,可以通过轮询 Task 对象的 IsCompleted 属性来了解其是否完成,以下是框架代码
while (!task1.IsCompleted)
{

System.Threading.Thread.SpinWait(10000000); // 让当前线程时刻盯着前一任务的完成状态
// 可以安排进行其它工作
}
//task1 已完成,进行后继工作……


9.Task<TResult> 类派生自 Task 类.实现得到任务处理结果
/// 完成数据处理工作,结果将作为函数返回值
Func<object, long> del = delegate(object end)
{
long result= 0;
//... 数据处理代码略
return result;
};
Task<long> tsk = new Task<long>(()=>del(9));
// 启动运行
tsk.Start();
// 取回结果.如果工作任务还未执行完毕,则尝试取回结果的线程会阻塞等待
Console.Write(" 程序运行结果为 {0}", tsk.Result );


10.处理并行任务中引发的异常:
当某个Task对象引发了一个未被捕获的异常时,TPL会将此异常包装到一个特殊的AggregateException异常对象中。
AggregateException类的InnerExceptions属性包容了此轮并行代码中引发的所有异常。
一个典型的并行程序异常处理代码框架如下:
try
{
//启动一个Task对象(取名taskObject)
taskObject.Start();
//等待其工作结束
taskObject.Wait();
}
catch (AggregateException ae)
{
//处理并行代码中的异常
foreach (Exception ex in ae.InnerExceptions)
{
Console.WriteLine("{0}:{1}",ex.GetType(),ex.Message);
}
}

情况比较复杂的是,任务是可以嵌套的。比如一个并行任务可能会创建多个子任务,而这些子任务又会创建更多的“孙子任务”,由此构成一个任务对象的树型结构。
当这棵“任务对象树”中的任何一个节点(即任务对象自身)引发了一个未捕获的异常时,TPL都会为此任务对象创建一个AggregateException对象,把前述那个未捕获的异常对象添加到创建好的AggregateException对象的InnerExceptions集合中,然后,再把这一个AggregateException对象添加到其父任务对象所关联的AggregateException对象的InnerExceptions集合中(这段话比较拗口,请读者仔细阅读)。
这样一来,任务对象的嵌套就导致了AggregateException对象的嵌套,而这种嵌套还是递归进行的,这就给编写异常处理代码带来了麻烦,你必须“下钻”到AggregateException对象树的最底层才能得到真正的异常对象,以下是访问两层“异常树”的示例代码:
catch (AggregateException ae)
{
//处理并行代码中的异常
foreach (Exception ex in ae.InnerExceptions)
{
//“下钻”一层,处理子任务引发的异常
if(ex is AggregateException)
{
foreach(Exception innerEx in ex. InnerExceptions)

{ //……(代码略) }
}
else //本任务引发的异常
Console.WriteLine("{0}:{1}",ex.GetType(),ex.Message);
}
}

为了解决需要“递归”编写异常处理的问题,AggregateException类提供了一个将多层的AggregateException对象树“展平”为一层的方法——Flatten()。使用此方法时,异常处理代码不需要遍历AggregateException对象树:
catch (AggregateException ae)
{
ae. Flatten();
//处理并行代码中的异常
foreach (Exception ex in ae.InnerExceptions)
{
//异常处理代码……
}
}

经过“展平”之后,AggregateException. InnerExceptions将只包容具体的异常对象,不再包容嵌套的AggregateException对象。

11.屏蔽掉特定的异常:
AggregateException类提供了一个Handle()方法来实现:直接处理或忽略掉某种特定种类的异常,这时,肯定不需要将这些异常对象加入到AggregateException对象中。
catch (AggregateException ae)
{
ae.Handle((ex) =>
{
if (ex is DivideByZeroException)
return true;
else
return false;
});
throw ae;
}


12.PLINQ
默认情况下,PLINQ查询要处理的数据被认为“顺序无关紧要”.
AsParallel()或AsParallel<T>():将LINQ查询转为并行执行
ParallelEnumerable类的扩展方法AsSequential():强制将其转为串行模式
AsOrdered()和AsSequential()是不一样的.
AsSequential()强制PLINQ查询以串行方式执行
AsOrdered()仍是并行执行的,只不过并行执行的结果先被缓存起来,然后再按原始数据顺序进行排序,才得到最后的结果。
很明显,给PLINQ查询加上AsOrdered()子句将会影响到程序的性能,因此,尽量避免使用它。
在一些情况下,可以通过修改PLINQ查询的顺序避免使用AsOrdered()子句
///假设整数集合中的原始是排好序的,则以下PLINQ查询按顺序取出所有的偶数
var evenNums = from num in source.AsParallel().AsOrdered()
where num % 2 == 0
select num;
///如果对查询操作的顺序进行一下修改,会得到更好的性能:
var evenNums = from num in source.AsParallel()
where num % 2 == 0
orderby num
select num;


正因为并行程序开发、测试和调试都比串行程序要困难,所以一般都是先编写程序的串行版本,等其工作正常之后再将其升级替换为并行版本。
三.批注:
“.NET4.0并行计算技术基础(12)”“迎接新一轮的技术进步浪潮”摘记,内容查看原帖更易理解。重要内容包括:ParallelEnumerable类、ParallelQuery类、中途取消PLINQ操作、设置工作线程数
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: