您的位置:首页 > 理论基础 > 数据结构算法

读书笔记:《C#并行编程高级教程》-第五章:协调数据结构

2014-06-10 23:55 239 查看
《C#并行编程高级教程》-第五章:协调数据结构

本章主要讲如何同步任务已经多线程中共享变量,应用到的是一些经典的同步原语,.NET 4.0新引进的轻量级协调数据结构。

轻量级同步原语

.NET 4.0 System.Threading中提供了6个轻量级的同步原语:

Barrier:容许多个任务同步他们不同阶段上的并发工作;

CountdownEvent:简化了fork和join情形,表示非常轻量级的原语,每一个任务完成后都会减少计数,直到减少到0再开始制定的任务。

ManualResetEventSlim:容许很多任务等待直到另一个任务手工发出事件句柄。如果预计等待时间很短应该使用此类,较其对应重量级类:ManualResetEvent的性能要高。

SemaphoreSlim:限制能够并发访问某个资源或资源池的任务个数。对应重量级:Semaphore

SpinLock:容许一个任务自旋直到获得一个互斥锁,以保证一次只有一个任务能够访问所定的变量、对象或者区域。与此类似的有System.Threading.Monitor。SpinLock是一个struct,要是使用大量的锁,而且希望能够最小化对象的分配,那么使用SpinLock可以减少内存的需求以及垃圾回收的压力。

SpinWait:容许一个任务执行基于自旋的等待,直到指定的条件得到满足。可以使用SpinWait指定一个等待时间,在等待时间之前是自旋,若超过了指定时间还没有满足条件,那么进入基于内核的等待,也就是“基于自旋的等待->(超过指定时间)->基于内核的等待”。

Barrier

需要运行一组任务,每个任务都有一连串的阶段,而且每个阶段的运行都要等到其他所有任务的前一阶段运行完毕才能开始,这样就可以考虑Barrier来同步这类工作。可以在多个阶段中使用同一个Barrier实例。

int participantes=4; // the member of taking part in
Task[] tasks=new Task[participantes];
int timeout=2000;  //if wait 2s,quit wait and go on
var cts=new System.Threading.CancellationTokenSource();
vat ct=cts.Token;
//构造函数
Barrier barrier=new Barrier(participantes,(_barrier)=>
{  //dosomething.....//阶段后操作代码  }    );

for(int i=0;i<participantes;i++)
{
tasks[i]=Task.Factory.StartNew(   (num)=>{
//dosomething:Phase1
try    //扑捉阶段后操作代码中的异常,可以对每个SignalAndWait的都使用
{
barrier.SignalAndWait(timeout);
}
catch(BarrierPostPhaseException bppex)
{ //do somenthing.........}

//dosomething:Phase2
barrier.SignalAndWait(timeout);
//dosomething:Phase3
barrier.SignalAndWait(timeout);
//more phase....
},i,ct);
}

var finalTask=Task.Factory.ContinueWhenAll(tasks, (alltasks)=>{
//do something..
barrier.Dispose();
} );
关键:参与者使用barrier.SignalAndWait(timeout);像屏障发送自己已经完成某个阶段的信号,并且等待其他任务都执行完该阶段(其中timeout表示若是等待的时间超过timeout则不会继续等待,而是执行下一个阶段)。当屏障接收到所有的参与者,participantes个,完成某个阶段的信号后,就会执行“阶段后代码”,然后barrier.CurrentPhaseNumber会递增其值,表示要运行新的阶段,然后再解除阻塞每一个参与者的Task实例,然后所有参与者开始下一个阶段。(自己推断:取消阻塞后,所有参与者可能同时执行,见P175)

如果屏障在“阶段后代码”中抛出异常,那么可以try SignalAndWait捕获这个异常,并且在每个参与者中都可以捕捉到。也可以使用这个机制向参与者传递信息,但是抛出异常开销很大,并非良好的传递消息方式。

Task.Fcatory.ContineWhenAll也是一种等待所有的任务都完成后再继续某个任务的方法,当运行一组两三个阶段且只运行一次的的任务时比屏障更为方便。

使用屏障时,其他同步机制也一样,包含超时和取消是一件非常重要的事。因为代码中的错误或者不可预知的事情都可能导致线程永远等待。如本例中timeout和ct

屏障也可以动态添加参与者,可以并发的并行调用这些函数,但不可以在“阶段后操作代码”中调用。函数见MSDN

屏障在使用完后一定要注销。

互斥锁:lock,Monitor,SpinLock

lock和Monitor是提供互斥锁的首选,但是某些情况下,比如,锁的时间很短粒度很细时,其他互斥锁原语,比如SpinLock,可以提供更好的性能。

lock和Monitor只能锁定引用类型。Monitor对于非引用类型会采用装箱。

MSDN:使用
Monitor 锁定对象(即引用类型)而不是值类型。 将值类型变量传递给
Enter 时,它被装箱为对象。 如果再次将相同的变量传递给
Enter,则它被装箱为一个单独对象,而且线程不会阻止。 在这种情况下,Monitor 本应保护的代码未受保护。
此外,将变量传递给 Exit 时,也创建了另一个单独对象。
因为传递给 Exit 的对象和传递给
Enter 的对象不同,Monitor 将引发
SynchronizationLockException。

lock关键字也会调用Monitor的方法,二者等价

lock(myobject)
{
//do something with myobject
}
//下面代码等价
bool lockTaken=false;
try
{
Monitor.Enter(myobject,ref lockTaken); //一直等到获得锁
//do something with myobject
}
finally
{
if(lockTaken)
Monitor.Exit(myobject);
}


应该避免锁定外部对象,因为这样可能会导致难以解决的死锁问题。也应该避免跨成员或类的边界获得或者释放一个锁。基于同样的原因,在获得一个所时应该避免调用位置的代码。而且,临界区的代码应该尽可能的简单,必须是需要串行化的代码。

可以使用超时锁来避免死锁。如果在指定的时间无法获得一个锁,那可以可以通过CancellationToken实例来取消导致问题的任务。

使用超时锁:

try
{
Monitor.TryEnter(myobject,2000,ref lockTaken);  //设置超时时间2s,若等待获得锁超时则不会再等待获得锁,
if(!lockTaken)  //等待超时,没有获得锁
{
throw new TimeoutException(.....);
}
//do something with myobject
}
finally
{
if(lockTaken)
Monitor.Exit(myobject);
}


有时候,使用并发集合可以避免使用锁。

自旋锁(SpinLock)

应用于持有锁的时间总是非常短,而且锁的粒度很精细,那么使用自旋锁比其他锁的机制更好的性能。

var s1=new SpinLock(false);
//.....
bool lockTaken=false;
try
{
s1.Enter(ref lockTaken);
//spinlock获得锁,但并没有指定锁定对象。
//临界区
//do something 操作多线程共享对象
}
finally
{
if(lockTaken)
{
s1.Exit(false);
}
}
构造函数中的false:表示禁用作为调试目的而使用的跟踪线程ID的选项。这样可以获得最佳的性能。Exit中的false:表示不用产生内存屏障,这样可以通过牺牲公平性来获得性能提升。

不要将SpinLock声明为只读字段,如果这样:会导致每次调用这个字段都返回SpinLock的一个新副本,而不是原始的那个,所有对enter方法的调用都能成功获得锁。

使用超时:s1.TryEnter(2000,ref lockTaken);

基于自旋的等待(SpinWait)

如果等待某个条件满足需要的时间很短,而且您不希望发生昂贵的上下文切换,那么基于自旋的等待SpinWait是一种很好的替换方案,它的开销非常小。

长时间的自旋并不是很好的做法,因为自旋会阻塞更高优先级的线程及其相关的任务,还会阻塞垃圾回收器。相反,如果自旋的时间过长,SpinWait会让出底层线程的时间片,并触发上下文切换。SpinLock能够判断何时停止自旋并触发一次上下文切换。SpinLock只不过是对SpinWait的简单包装。

SpinWait:自旋直到满足条件

System.Threading.SpinWait.SpinUntil( Func(Boolean),int timeout ),在委托所定义的Func为true之前,SpinWait实例会一直自旋等待。或者到达定义的超时时间timeout

SpinWait并没有设计为让多个任务或者线程并发使用,因此,如果多个任务或线程要通过SpinWait的方法进行自旋,那么每一个任务或线程都应该使用自己的SpinWait实例。然而,因为SpinWait为后退式自旋逻辑,您不应该再多个线程中使用SpinWait。

SpinWait在单核计算机上运行时总要出让处理器,所以在单核计算机上没什么用。

当等待的时间非常短的时候,SpinWait.SpinUntil方法提供了智能的行为,能够获得更好的结果。

Volatile:避免编译器因为假定某个字段只会被一个线程访问而做的优化。所以可以保证:当共享变量被不同的线程访问和更新且没有锁和原子操作的时候,最新的值总能在共享变量中表示出来

private static volatile boo b;

轻量级的手动重置事件:ManualResetEventSlim

手动重置事件对象有两个状态:true(设置/发出信号)和false(取消信号)。通过这个对象可以从一个任务向另一个任务发送信号表示发生了一个事件。当一个任务发生信号,另一个任务可以通过wait函数等待一个事件对象的状态收到信号。wait函数会阻塞当前任务和底层的线程,直到收到信号或者超时。

ManualResetEventSlim通过封装了手动重置事件的事件等待句柄提供了自旋等待和基于内核等待的组合,可以使用这个类的实例在任务之间发送信号,并且等待事件的发生。

//false:初始状态;100:回退到基于内核等待前自旋的次数
private ManualResetEventSlim mres=new ManualResetEventSlim(false,100);

void Fun1()
{
//.....
try
{
//...
//发出信号,将事件设置为true,解除调用wait阻塞的线程
mres.Set();
//....
}
finally
{
//...
//取消信号,将事件设置为false
mres.Reset();
//...

}
}

void Fun2()
{
//.....
//等待信号,并且设置超时时间为1s
bool b=mres.Wait(1000);
//...
}

Main()
{
//...
Parallel.Invoke(
()=>Fun1(),
()=>Fun2()
);
//...
mres.Dispose();  //使用完后一定要释放
}


如果需要跨进程或者跨AppDomain的同步,那么就必须使用ManualResetEvent,而不能使用ManualResetEventSlim。

限制资源的并发访问:SemaphoreSlim

当需要对访问一个资源或资源池的并发任务或线程数量作出限制的时候,表示技术信号量SemaphoreSlim就非常有用。

每当一个任务进入信号量的时候,计数器减一,当减到0的时候,新到达的请求会被阻塞,直到有其他任务离开信号量。每当一个任务离开信号量的时候,计数器加1,等待的任务可以进入信号量。信号量会降低可扩展性,而且信号量的目的就是如此,所以必须确定需要如此时才使用信号量。通过SemaphoreSlim实例控制共享资源,每个任务都必须使用SempjoremapSlim实例的方法进入(wait)或者释放(release)信号量。

int MAX_MACHINES=5; //最大访问数为5
SemaphoreSlim ss=new SemaphoreSlim(MAX_MACHINES);
//......
// in a Task
{
//........
ss.Wait(2000); //超时时间:2s;等待计数量不为0
//......
// finally
ss.Release();   //释放信号量:一定要确保释放了信号量,即使在发生异常的情况
}
//..........
ss.Dispose();   //使用完后抛弃
如果需要跨进程或者跨AppDomain的同步,那么就必须使用Semaphore,而不能使用SemaphoreSlim。

通过CountdownEvent简化动态fork和join场景

CountdownEvent实例带有一个初始的信号计数。每当一个任务完成工作的时候,这个任务都会发出一个CountdownEvent实例信号(调用实例方法Singnal),并将其信号计数递减1.调用这个CountdownEvent实例的Wait方法的任务将会被阻塞,直到信号计数达到0。

int nums=5; //要五个任务完成
int newNums=6;
CountdownEvent ce=new CountdownEvent(nums);

//把需要等待的任务数重置为newNums
//ce.Reset(newNums);

//in tasks
{
//...
ce.Signal();
//...
}

//in the wait task
{
//...
ce.Wait(); //阻塞直到计数到达0
//...
}

//finally
ce.Dispose();


AddCount:将当前计数加1或者增加由参数所指定的值。

Reset:将剩下的信号数(还需要这么多次信号才将调用wait方法的任务或线程接触阻塞)重置为初试计数或者重置为传给它的int参数所指定的值。Reset 不是线程安全的,不能与此实例的其他成员同时使用。只有在没有其他任务或者线程访问CountdownEvent的时候才能调用这个方法。

原子操作System.Threading.Interlocked
对于整数类型,不能使用lock和Monitor。可以使用SpinLock,是一个比较好的方案。

System.Threading.Interlocked类为多个任务或线程共享的变量提供了原子操作。使用Interlocked类提供的原子操作的最大好处在于他们的开销非常低,不需要获得锁,非常高效,而且是线程安全的。

Interlocked.Increment方法能够在一个原子的线程安全的操作中增加一个整数类型变量的值并存储新的值。

int total=0;
//...
//in task
{
//...
Interlocked.Increment(ref total); //total+1
//...
}


在32位系统上,只有通过Interlocked.Read读取值的时候,对64位值的读操作才是原子操作。

还有一些其他函数,见MSDN
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: