您的位置:首页 > 编程语言 > C#

一个进程间同步和通讯的 C# 框架

2013-12-31 14:05 621 查看

0.背景简介

微软在.NET框架中提供了多种实用的线程同步手段,其中包括monitor
类及reader-writer锁。但跨进程的同步方法还是非常欠缺。另外,目前也没有方便的线程间及进程间传递消息的方法。例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息。为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯。这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到www.cdrnet.net/projects/threadmsg/.

这个框架的目的是:

封装性:通过MSMQ消息队列发送消息的线程无需关心消息是发送到另一个线程还是另一台机器。
简单性:向其他进程发送消息只需调用一个方法。

注意:我删除了本文中全部代码的XML注释以节省空间。如果你想知道这些方法和参数的详细信息,请参考附件中的代码。

1.先看一个简单例子

使用了这个库后,跨进程的消息传递将变得非常简单。我将用一个小例子来作示范:一个控制台程序,根据参数可以作为发送方也可以作为接收方运行。在发送程序里,你可以输入一定的文本并发送到信箱内(返回key),接收程序将显示所有从信箱内收到的消息。你可以运行无数个发送程序和接收程序,但是每个消息只会被具体的某一个接收程序所收到。

viewsource

print?

01
[Serializable]
02
struct
Message
03
{
04
public

string
Text;
05
}
06
07
class
Test
08
{
09
IMailBoxmail;
10
11
public

Test()
12
{
13
mail=
new
ProcessMailBox(
"TMProcessTest"
,1024);
14
}
15
16
public

void
RunWriter()
17
{
18
Console.WriteLine(
"Writerstarted"
);
19
Messagemsg;
20
while
(
true
)
21
{
22
msg.Text=Console.ReadLine();
23
if
(msg.Text.Equals(
"exit"
))
24
break
;
25
mail.Content=msg;
26
}
27
}
28
29
public

void
RunReader()
30
{
31
Console.WriteLine(
"Readerstarted"
);
32
while
(
true
)
33
{
34
Messagemsg=(Message)mail.Content;
35
Console.WriteLine(msg.Text);
36
}
37
}
38
39
[STAThread]
40
static

void
Main(
string
[]args)
41
{
42
Testtest=
new
Test();
43
if
(args.Length>0)
44
test.RunWriter();
45
else
46
test.RunReader();
47
}
48
}
信箱一旦创建之后(这上面代码里是ProcessMailBox
),接收消息只需要读取Content属性,发送消息只需要给这个属性赋值。当没有数据时,获取消息将会阻塞当前线程;发送消息时如果信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,整个程序是完全基于中断的,并且不会过度占用CPU(不需要进行轮询)。发送和接收的消息可以是任意支持序列化(Serializable)的类型。

然而,实际上暗地里发生的事情有点复杂:消息通过内存映射文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在pagefile
里面产生虚拟文件。对这个虚拟文件的访问是通过win32信号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需要声明Serializable属性。内存映射文件和
win32信号量都需要调用NT内核的方法。多得了
.NET框架中的Marshal
类,我们可以避免编写不安全的代码。我们将在下面讨论更多的细节。

2..NET里面的跨线程/进程同步

线程/进程间的通讯需要共享内存或者其他内建机制来发送/接收数据。即使是采用共享内存的方式,也还需要一组同步方法来允许并发访问。

同一个进程内的所有线程都共享公共的逻辑地址空间(堆)。对于不同进程,从win2000开始就已经无法共享内存。然而,不同的进程可以读写同一个文件。WinAPI提供了多种系统调用方法来映射文件到进程的逻辑空间,及访问系统内核对象(会话)指向的
pagefile里面的虚拟文件。无论是共享堆,还是共享文件,并发访问都有可能导致数据不一致。我们就这个问题简单讨论一下,该怎样确保线程/进程调用的有序性及数据的一致性。

2.1线程同步

.NET框架和C#提供了方便直观的线程同步方法,即
monitor类和lock语句(本文将不会讨论
.NET框架的互斥量)。对于线程同步,虽然本文提供了其他方法,我们还是推荐使用
lock语句。

viewsource

print?

01
void
Work1()
02
{
03
NonCriticalSection1();
04
Monitor.Enter(
this
);
05
try
06
{
07
CriticalSection();
08
}
09
finally
10
{
11
Monitor.Exit(
this
);
12
}
13
NonCriticalSection2();
14
}
viewsource

print?

1
void
Work2()
2
{
3
NonCriticalSection1();
4
lock
(
this
)
5
{
6
CriticalSection();
7
}
8
NonCriticalSection2();
9
}
Work1和Work2是等价的。在C#里面,很多人喜欢第二个方法,因为它更短,且不容易出错。

2.2跨线程信号量

信号量是经典的同步基本概念之一(由EdsgerDijkstra
引入)。信号量是指一个有计数器及两个操作的对象。它的两个操作是:获取(也叫P或者等待),释放(也叫V或者收到信号)。信号量在获取操作时如果计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。虽然信号量的原理很简单,但是实现起来有点麻烦。好在,内建的
monitor类有阻塞特性,可以用来实现信号量。

viewsource

print?

01
public
sealed
class
ThreadSemaphore:ISemaphore
02
{
03
private

int
counter;
04
private

readonly
int

max;
05
06
public

ThreadSemaphore():
this
(0,
int
.Max){}
07
public

ThreadSemaphore(
int

initial):
this
(initial,
int
.Max){}
08
public

ThreadSemaphore(
int

initial,
int

max)
09
{
10
this
.counter=Math.Min(initial,max);
11
this
.max=max;
12
}
13
14
public

void
Acquire()
15
{
16
lock
(
this
)
17
{
18
counter--;
19
if
(counter<0&&!Monitor.Wait(
this
))
20
throw

new
SemaphoreFailedException();
21
}
22
}
23
24
public

void
Acquire(TimeSpantimeout)
25
{
26
lock
(
this
)
27
{
28
counter--;
29
if
(counter<0&&!Monitor.Wait(
this
,timeout))
30
throw

new
SemaphoreFailedException();
31
}
32
}
33
34
public

void
Release()
35
{
36
lock
(
this
)
37
{
38
if
(counter>=max)
39
throw

new
SemaphoreFailedException();
40
if
(counter<0)
41
Monitor.Pulse(
this
);
42
counter++;
43
}
44
}
45
}
信号量在复杂的阻塞情景下更加有用,例如我们后面将要讨论的通道(channel)。你也可以使用信号量来实现临界区的排他性(如下面的
Work3),但是我还是推荐使用内建的lock
语句,像上面的Work2那样。

请注意:如果使用不当,信号量也是有潜在危险的。正确的做法是:当获取信号量失败时,千万不要再调用释放操作;当获取成功时,无论发生了什么错误,都要记得释放信号量。遵循这样的原则,你的同步才是正确的。Work3
中的finally语句就是为了保证正确释放信号量。注意:获取信号量(s.Acquire())的操作必须放到try语句的外面,只有这样,当获取失败时才不会调用释放操作。

viewsource

print?

01
ThreadSemaphores=
new

ThreadSemaphore(1);
02
void
Work3()
03
{
04
NonCriticalSection1();
05
s.Acquire();
06
try
07
{
08
CriticalSection();
09
}
10
finally
11
{
12
s.Release();
13
}
14
NonCriticalSection2();
15
}

2.3跨进程信号量

为了协调不同进程访问同一资源,我们需要用到上面讨论过的概念。很不幸,.NET中的
monitor类不可以跨进程使用。但是,win32API提供的内核信号量对象可以用来实现跨进程同步。
RobinGalloway-Lunn介绍了怎样将
win32的信号量映射到
.NET中(见
UsingWin32SemaphoresinC#
)。我们的实现也类似:

viewsource

print?

01
[DllImport(
"kernel32"
,EntryPoint=
"CreateSemaphore"
,
02
SetLastError=
true
,CharSet=CharSet.Unicode)]
03
internal
static
extern
uint
CreateSemaphore(
04
SecurityAttributesauth,
int
initialCount,
05
int

maximumCount,
string

name);
06
07
[DllImport(
"kernel32"
,EntryPoint=
"WaitForSingleObject"
,
08
SetLastError=
true
,CharSet=CharSet.Unicode)]
09
internal
static
extern
uint
WaitForSingleObject(
10
uint

hHandle,
uint

dwMilliseconds);
11
12
[DllImport(
"kernel32"
,EntryPoint=
"ReleaseSemaphore"
,
13
SetLastError=
true
,CharSet=CharSet.Unicode)]
14
[
return

:MarshalAs(UnmanagedType.VariantBool)]
15
internal
static
extern
bool
ReleaseSemaphore(
16
uint

hHandle,
int

lReleaseCount,
out

int
lpPreviousCount);
17
18
[DllImport(
"kernel32"
,EntryPoint=
"CloseHandle"
,SetLastError=
true
,
19
CharSet=CharSet.Unicode)]
20
[
return

:MarshalAs(UnmanagedType.VariantBool)]
21
internal
static
extern
bool
CloseHandle(
uint

hHandle);
viewsource

print?

01
public
class
ProcessSemaphore:ISemaphore,IDisposable
02
{
03
private

uint
handle;
04
private

readonly
uint

interruptReactionTime;
05
06
public

ProcessSemaphore(
string

name):
this
(
07
name,0,
int
.MaxValue,500){}
08
public

ProcessSemaphore(
string

name,
int

initial):
this
(
09
name,initial,
int
.MaxValue,500){}
10
public

ProcessSemaphore(
string

name,
int
initial,
11
int

max,
int
interruptReactionTime)
12
{
13
this
.interruptReactionTime=(
uint
)interruptReactionTime;
14
this
.handle=NTKernel.CreateSemaphore(
null
,initial,max,name);
15
if
(handle==0)
16
throw

new
SemaphoreFailedException();
17
}
18
19
public

void
Acquire()
20
{
21
while
(
true
)
22
{

//looped0.5stimeouttomakeNT-blockedthreadsinterruptable.
23
uint

res=NTKernel.WaitForSingleObject(handle,
24
interruptReactionTime);
25
try

{System.Threading.Thread.Sleep(0);}
26
catch
(System.Threading.ThreadInterruptedExceptione)
27
{
28
if
(res==0)
29
{
//Rollback
30
int

previousCount;
31
NTKernel.ReleaseSemaphore(handle,1,
out

previousCount);
32
}
33
throw

e;
34
}
35
if
(res==0)
36
return
;
37
if
(res!=258)
38
throw

new
SemaphoreFailedException();
39
}
40
}
41
42
public

void
Acquire(TimeSpantimeout)
43
{
44
uint

milliseconds=(
uint
)timeout.TotalMilliseconds;
45
if
(NTKernel.WaitForSingleObject(handle,milliseconds)!=0)
46
throw

new
SemaphoreFailedException();
47
}
48
49
public

void
Release()
50
{
51
int

previousCount;
52
if
(!NTKernel.ReleaseSemaphore(handle,1,
out
previousCount))
53
throw

new
SemaphoreFailedException();
54
}
55
56
#regionIDisposableMember
57
public

void
Dispose()
58
{
59
if
(handle!=0)
60
{
61
if
(NTKernel.CloseHandle(handle))
62
handle=0;
63
}
64
}
65
#endregion
66
}
有一点很重要:win32中的信号量是可以命名的。这允许其他进程通过名字来创建相应信号量的句柄。为了让阻塞线程可以中断,我们使用了一个(不好)的替代方法:使用超时和
Sleep(0)。我们需要中断来安全关闭线程。更好的做法是:确定没有线程阻塞之后才释放信号量,这样程序才可以完全释放资源并正确退出。

你可能也注意到了:跨线程和跨进程的信号量都使用了相同的接口。所有相关的类都使用了这种模式,以实现上面背景介绍中提到的封闭性。需要注意:出于性能考虑,你不应该将跨进程的信号量用到跨线程的场景,也不应该将跨线程的实现用到单线程的场景。

3.跨进程共享内存:内存映射文件

我们已经实现了跨线程和跨进程的共享资源访问同步。但是传递/接收消息还需要共享资源。对于线程来说,只需要声明一个类成员变量就可以了。但是对于跨进程来说,我们需要使用到
win32API提供的内存映射文件(MemoryMappedFiles,简称MMF)。使用
MMF和使用win32信号量差不多。我们需要先调用
CreateFileMapping方法来创建一个内存映射文件的句柄:

viewsource

print?

01
[DllImport(
"Kernel32.dll"
,EntryPoint=
"CreateFileMapping"
,
02
SetLastError=
true
,CharSet=CharSet.Unicode)]
03
internal
static
extern

IntPtrCreateFileMapping(
uint

hFile,
04
SecurityAttributeslpAttributes,
uint
flProtect,
05
uint

dwMaximumSizeHigh,
uint

dwMaximumSizeLow,
string

lpName);
06
07
[DllImport(
"Kernel32.dll"
,EntryPoint=
"MapViewOfFile"
,
08
SetLastError=
true
,CharSet=CharSet.Unicode)]
09
internal
static
extern

IntPtrMapViewOfFile(IntPtrhFileMappingObject,
10
uint

dwDesiredAccess,
uint

dwFileOffsetHigh,
11
uint

dwFileOffsetLow,
uint

dwNumberOfBytesToMap);
12
13
[DllImport(
"Kernel32.dll"
,EntryPoint=
"UnmapViewOfFile"
,
14
SetLastError=
true
,CharSet=CharSet.Unicode)]
15
[
return

:MarshalAs(UnmanagedType.VariantBool)]
16
internal
static
extern
bool
UnmapViewOfFile(IntPtrlpBaseAddress);
viewsource

print?

01
public
static
MemoryMappedFileCreateFile(
string

name,
02
FileAccessaccess,
int
size)
03
{
04
if
(size<0)
05
throw

new
ArgumentException(
"Sizemustnotbenegative"
,
"size"
);
06
07
IntPtrfileMapping=NTKernel.CreateFileMapping(0xFFFFFFFFu,
null
,
08
(
uint
)access,0,(
uint
)size,name);
09
if
(fileMapping==IntPtr.Zero)
10
throw

new
MemoryMappingFailedException();
11
12
return

new
MemoryMappedFile(fileMapping,size,access);
13
}
我们希望直接使用pagefile中的虚拟文件,所以我们用-1(0xFFFFFFFF)
来作为文件句柄来创建我们的内存映射文件句柄。我们也指定了必填的文件大小,以及相应的名称。这样其他进程就可以通过这个名称来同时访问该映射文件。创建了内存映射文件后,我们就可以映射这个文件不同的部分(通过偏移量和字节大小来指定)到我们的进程地址空间。我们通过
MapViewOfFile系统方法来指定:

viewsource

print?

01
public

MemoryMappedFileViewCreateView(
int

offset,
int
size,
02
MemoryMappedFileView.ViewAccessaccess)
03
{
04
if
(
this
.access==FileAccess.ReadOnly&&access==
05
MemoryMappedFileView.ViewAccess.ReadWrite)
06
throw

new
ArgumentException(
07
"Onlyreadaccesstoviewsallowedonfileswithoutwriteaccess"
,
08
"access"
);
09
if
(offset<0)
10
throw

new
ArgumentException(
"Offsetmustnotbenegative"
,
"size"
);
11
if
(size<0)
12
throw

new
ArgumentException(
"Sizemustnotbenegative"
,
"size"
);
13
IntPtrmappedView=NTKernel.MapViewOfFile(fileMapping,
14
(
uint
)access,0,(
uint
)offset,(
uint
)size);
15
return

new
MemoryMappedFileView(mappedView,size,access);
16
}
在不安全的代码中,我们可以将返回的指针强制转换成我们指定的类型。尽管如此,我们不希望有不安全的代码存在,所以我们使用Marshal
类来从中读写我们的数据。偏移量参数是用来从哪里开始读写数据,相对于指定的映射视图的地址。

viewsource

print?

01
public
byte
ReadByte(
int

offset)
02
{
03
return

Marshal.ReadByte(mappedView,offset);
04
}
05
public
void
WriteByte(
byte

data,
int
offset)
06
{
07
Marshal.WriteByte(mappedView,offset,data);
08
}
09
10
public
int
ReadInt32(
int

offset)
11
{
12
return

Marshal.ReadInt32(mappedView,offset);
13
}
14
public
void
WriteInt32(
int

data,
int
offset)
15
{
16
Marshal.WriteInt32(mappedView,offset,data);
17
}
18
19
public
void
ReadBytes(
byte
[]data,
int
offset)
20
{
21
for
(
int

i=0;i<data.Length;i++)
22
data[i]=Marshal.ReadByte(mappedView,offset+i);
23
}
24
public
void
WriteBytes(
byte
[]data,
int
offset)
25
{
26
for
(
int

i=0;i<data.Length;i++)
27
Marshal.WriteByte(mappedView,offset+i,data[i]);
28
}
但是,我们希望读写整个对象树到文件中,所以我们需要支持自动进行序列化和反序列化的方法。

viewsource

print?

01
public
object
ReadDeserialize(
int

offset,
int

length)
02
{
03
byte
[]binaryData=
new
byte
[length];
04
ReadBytes(binaryData,offset);
05
System.Runtime.Serialization.Formatters.Binary.BinaryFormatterformatter
06
=

new
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
07
System.IO.MemoryStreamms=
new
System.IO.MemoryStream(
08
binaryData,0,length,
true
,
true
);
09
object

data=formatter.Deserialize(ms);
10
ms.Close();
11
return

data;
12
}
13
public
void
WriteSerialize(
object

data,
int

offset,
int
length)
14
{
15
System.Runtime.Serialization.Formatters.Binary.BinaryFormatterformatter
16
=

new
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
17
byte
[]binaryData=
new
byte
[length];
18
System.IO.MemoryStreamms=
new
System.IO.MemoryStream(
19
binaryData,0,length,
true
,
true
);
20
formatter.Serialize(ms,data);
21
ms.Flush();
22
ms.Close();
23
WriteBytes(binaryData,offset);
24
}
请注意:对象序列化之后的大小不应该超过映射视图的大小。序列化之后的大小总是比对象本身占用的内存要大的。我没有试过直接将对象内存流绑定到映射视图,那样做应该也可以,甚至可能带来少量的性能提升。

4.信箱:在线程/进程间传递消息

这里的信箱与Email及NT
中的邮件槽(Mailslots)无关。它是一个只能保留一个对象的安全共享内存结构。信箱的内容通过一个属性来读写。如果信箱内容为空,试图读取该信箱的线程将会阻塞,直到另一个线程往其中写内容。如果信箱已经有了内容,当一个线程试图往其中写内容时将被阻塞,直到另一个线程将信箱内容读取出去。信箱的内容只能被读取一次,它的引用在读取后自动被删除。基于上面的代码,我们已经可以实现信箱了。

4.1跨线程的信箱

我们可以使用两个信号量来实现一个信箱:一个信号量在信箱内容为空时触发,另一个在信箱有内容时触发。在读取内容之前,线程先等待信箱已经填充了内容,读取之后触发空信号量。在写入内容之前,线程先等待信箱内容清空,写入之后触发满信号量。注意:空信号量在一开始时就被触发了。

viewsource

print?

01
public
sealed
class

ThreadMailBox:IMailBox
02
{
03
private

object
content;
04
private

ThreadSemaphoreempty,full;
05
06
public

ThreadMailBox()
07
{
08
empty=
new
ThreadSemaphore(1,1);
09
full=
new
ThreadSemaphore(0,1);
10
}
11
12
public

object
Content
13
{
14
get
15
{
16
full.Acquire();
17
object

item=content;
18
empty.Release();
19
return

item;
20
}
21
set
22
{
23
empty.Acquire();
24
content=value;
25
full.Release();
26
}
27
}
28
}

4.2跨进程信箱

跨进程信箱与跨线程信箱的实现基本上一样简单。不同的是我们使用两个跨进程的信号量,并且我们使用内存映射文件来代替类成员变量。由于序列化可能会失败,我们使用了一小段异常处理来回滚信箱的状态。失败的原因有很多(无效句柄,拒绝访问,文件大小问题,Serializable属性缺失等等)。

viewsource

print?

01
public
sealed
class

ProcessMailBox:IMailBox,IDisposable
02
{
03
private

MemoryMappedFilefile;
04
private

MemoryMappedFileViewview;
05
private

ProcessSemaphoreempty,full;
06
07
public

ProcessMailBox(
string

name,
int
size)
08
{
09
empty=
new
ProcessSemaphore(name+
".EmptySemaphore.MailBox"
,1,1);
10
full=
new
ProcessSemaphore(name+
".FullSemaphore.MailBox"
,0,1);
11
file=MemoryMappedFile.CreateFile(name+
".MemoryMappedFile.MailBox"
,
12
MemoryMappedFile.FileAccess.ReadWrite,size);
13
view=file.CreateView(0,size,
14
MemoryMappedFileView.ViewAccess.ReadWrite);
15
}
16
17
public

object
Content
18
{
19
get
20
{
21
full.Acquire();
22
object

item;
23
try

{item=view.ReadDeserialize();}
24
catch
(Exceptione)
25
{
//Rollback
26
full.Release();
27
throw

e;
28
}
29
empty.Release();
30
return

item;
31
}
32
33
set
34
{
35
empty.Acquire();
36
try

{view.WriteSerialize(value);}
37
catch
(Exceptione)
38
{
//Rollback
39
empty.Release();
40
throw

e;
41
}
42
full.Release();
43
}
44
}
45
46
#regionIDisposableMember
47
public

void
Dispose()
48
{
49
view.Dispose();
50
file.Dispose();
51
empty.Dispose();
52
full.Dispose();
53
}
54
#endregion
55
}
到这里我们已经实现了跨进程消息传递(IPC)所需要的组件。你可能需要再回头本文开头的那个例子,看看
ProcessMailBox应该如何使用。

5.通道:基于队列的消息传递

信箱最大的限制是它们每次只能保存一个对象。如果一系列线程(使用同一个信箱)中的一个线程需要比较长的时间来处理特定的命令,那么整个系列都会阻塞。通常我们会使用缓冲的消息通道来处理,这样你可以在方便的时候从中读取消息,而不会阻塞消息发送者。这种缓冲通过通道来实现,这里的通道比信箱要复杂一些。同样,我们将分别从线程和进程级别来讨论通道的实现。

5.1可靠性

信箱和通道的另一个重要的不同是:通道拥有可靠性。例如:自动将发送失败(可能由于线程等待锁的过程中被中断)的消息转存到一个内置的容器中。这意味着处理通道的线程可以安全地停止,同时不会丢失队列中的消息。这通过两个抽象类来实现,
ThreadReliability和ProcessReliability。每个通道的实现类都继承其中的一个类。

5.2跨线程的通道

跨线程的通道基于信箱来实现,但是使用一个同步的队列来作为消息缓冲而不是一个变量。得益于信号量,通道在空队列时阻塞接收线程,在队列满时阻塞发送线程。这样你就不会碰到由入队/出队引发的错误。为了实现这个效果,我们用队列大小来初始化空信号量,用0来初始化满信号量。如果某个发送线程在等待入队的时候被中断,我们将消息复制到内置容器中,并将异常往外面抛。在接收操作中,我们不需要做异常处理,因为即使线程被中断你也不会丢失任何消息。注意:线程只有在阻塞状态才能被中断,就像调用信号量的获取操作(Aquire)方法时。

viewsource

print?

01
public
sealed
class

ThreadChannel:ThreadReliability,IChannel
02
{
03
private

Queuequeue;
04
private

ThreadSemaphoreempty,full;
05
06
public

ThreadChannel(
int

size)
07
{
08
queue=Queue.Synchronized(
new

Queue(size));
09
empty=
new
ThreadSemaphore(size,size);
10
full=
new
ThreadSemaphore(0,size);
11
}
12
13
public

void
Send(
object

item)
14
{
15
try

{empty.Acquire();}
16
catch
(System.Threading.ThreadInterruptedExceptione)
17
{
18
DumpItem(item);
19
throw

e;
20
}
21
queue.Enqueue(item);
22
full.Release();
23
}
24
25
public

void
Send(
object

item,TimeSpantimeout)
26
{
27
try

{empty.Acquire(timeout);}
28
...
29
}
30
31
public

object
Receive()
32
{
33
full.Acquire();
34
object

item=queue.Dequeue();
35
empty.Release();
36
return

item;
37
}
38
39
public

object
Receive(TimeSpantimeout)
40
{
41
full.Acquire(timeout);
42
...
43
}
44
45
protected

override
void

DumpStructure()
46
{
47
lock
(queue.SyncRoot)
48
{
49
foreach
(
object

item
in
queue)
50
DumpItem(item);
51
queue.Clear();
52
}
53
}
54
}

5.3跨进程通道

实现跨进程通道有点麻烦,因为你需要首先提供一个跨进程的缓冲区。一个可能的解决方法是使用跨进程信箱并根据需要将接收/发送方法加入队列。为了避免这种方案的几个缺点,我们将直接使用内存映射文件来实现一个队列。MemoryMappedArray
类将内存映射文件分成几部分,可以直接使用数组索引来访问。MemoryMappedQueue
类,为这个数组提供了一个经典的环(更多细节请查看附件中的代码)。为了支持直接以byte/integer
类型访问数据并同时支持二进制序列化,调用方需要先调用入队(Enqueue)/出队(Dequeue)操作,然后根据需要使用读写方法(队列会自动将数据放到正确的位置)。这两个类都不是线程和进程安全的,所以我们需要使用跨进程的信号量来模拟互斥量(也可以使用
win32互斥量),以此实现相互间的互斥访问。除了这两个类,跨进程的通道基本上和跨线程信箱一样。同样,我们也需要在
Send()中处理线程中断及序列化可能失败的问题。

viewsource

print?

001
public
sealed
class

ProcessChannel:ProcessReliability,IChannel,IDisposable
002
{
003
private

MemoryMappedFilefile;
004
private

MemoryMappedFileViewview;
005
private

MemoryMappedQueuequeue;
006
private

ProcessSemaphoreempty,full,mutex;
007
008
public

ProcessChannel(
int

size,
string

name,
int
maxBytesPerEntry)
009
{
010
int

fileSize=64+size*maxBytesPerEntry;
011
012
empty=
new
ProcessSemaphore(name+
".EmptySemaphore.Channel"
,size,size);
013
full=
new
ProcessSemaphore(name+
".FullSemaphore.Channel"
,0,size);
014
mutex=
new
ProcessSemaphore(name+
".MutexSemaphore.Channel"
,1,1);
015
file=MemoryMappedFile.CreateFile(name+
".MemoryMappedFile.Channel"
,
016
MemoryMappedFile.FileAccess.ReadWrite,fileSize);
017
view=file.CreateView(0,fileSize,
018
MemoryMappedFileView.ViewAccess.ReadWrite);
019
queue=
new
MemoryMappedQueue(view,size,maxBytesPerEntry,
true
,0);
020
if
(queue.Length<size||queue.BytesPerEntry<maxBytesPerEntry)
021
throw

new
MemoryMappedArrayFailedException();
022
}
023
024
public

void
Send(
object

item)
025
{
026
try

{empty.Acquire();}
027
catch
(System.Threading.ThreadInterruptedExceptione)
028
{
029
DumpItemSynchronized(item);
030
throw

e;
031
}
032
try

{mutex.Acquire();}
033
catch
(System.Threading.ThreadInterruptedExceptione)
034
{
035
DumpItemSynchronized(item);
036
empty.Release();
037
throw

e;
038
}
039
queue.Enqueue();
040
try

{queue.WriteSerialize(item,0);}
041
catch
(Exceptione)
042
{
043
queue.RollbackEnqueue();
044
mutex.Release();
045
empty.Release();
046
throw

e;
047
}
048
mutex.Release();
049
full.Release();
050
}
051
052
public

void
Send(
object

item,TimeSpantimeout)
053
{
054
try

{empty.Acquire(timeout);}
055
...
056
}
057
058
public

object
Receive()
059
{
060
full.Acquire();
061
mutex.Acquire();
062
object

item;
063
queue.Dequeue();
064
try

{item=queue.ReadDeserialize(0);}
065
catch
(Exceptione)
066
{
067
queue.RollbackDequeue();
068
mutex.Release();
069
full.Release();
070
throw

e;
071
}
072
mutex.Release();
073
empty.Release();
074
return

item;
075
}
076
077
public

object
Receive(TimeSpantimeout)
078
{
079
full.Acquire(timeout);
080
...
081
}
082
083
protected

override
void

DumpStructure()
084
{
085
mutex.Acquire();
086
byte
[][]dmp=queue.DumpClearAll();
087
for
(
int

i=0;i<dmp.Length;i++)
088
DumpItemSynchronized(dmp[i]);
089
mutex.Release();
090
}
091
092
#regionIDisposableMember
093
public

void
Dispose()
094
{
095
view.Dispose();
096
file.Dispose();
097
empty.Dispose();
098
full.Dispose();
099
mutex.Dispose();
100
}
101
#endregion
102
}

6.消息路由

我们目前已经实现了线程和进程同步及消息传递机制(使用信箱和通道)。当你使用阻塞队列的时候,有可能会遇到这样的问题:你需要在一个线程中同时监听多个队列。为了解决这样的问题,我们提供了一些小型的类:通道转发器,多用复用器,多路复用解码器和通道事件网关。你也可以通过简单的
IRunnable模式来实现类似的通道处理器。IRunnable模式由两个抽象类SingleRunnable和
MultiRunnable来提供(具体细节请参考附件中的代码)。

6.1通道转发器

通道转发器仅仅监听一个通道,然后将收到的消息转发到另一个通道。如果有必要,转发器可以将每个收到的消息放到一个信封中,并加上一个数字标记,然后再转发出去(下面的多路利用器使用了这个特性)。

viewsource

print?

01
public
class
ChannelForwarder:SingleRunnable
02
{
03
private

IChannelsource,target;
04
private

readonly
int

envelope;
05
06
public

ChannelForwarder(IChannelsource,
07
IChanneltarget,
bool
autoStart,
bool
waitOnStop)
08
:

base
(
true
,autoStart,waitOnStop)
09
{
10
this
.source=source;
11
this
.target=target;
12
this
.envelope=-1;
13
}
14
public

ChannelForwarder(IChannelsource,IChanneltarget,

15
int

envelope,
bool

autoStart,
bool

waitOnStop)
16
:

base
(
true
,autoStart,waitOnStop)
17
{
18
this
.source=source;
19
this
.target=target;
20
this
.envelope=envelope;
21
}

22
23
protected

override
void

Run()
24
{

//NOTE:IChannel.Sendisinterruptsaveand

25
//automaticallydumpstheargument.
26
if
(envelope==-1)
27
while
(running)
28
target.Send(source.Receive());
29
else
30
{
31
MessageEnvelopeenv;
32
env.ID=envelope;
33
while
(running)
34
{
35
env.Message=source.Receive();
36
target.Send(env);
37
}
38
}
39
}
40
}

6.2通道多路复用器和通道复用解码器

通道多路复用器监听多个来源的通道并将接收到的消息(消息使用信封来标记来源消息)转发到一个公共的输出通道。这样就可以一次性地监听多个通道。复用解码器则是监听一个公共的输出通道,然后根据信封将消息转发到某个指定的输出通道。

viewsource

print?

01
public
class
ChannelMultiplexer:MultiRunnable
02
{
03
private

ChannelForwarder[]forwarders;
04
05
public

ChannelMultiplexer(IChannel[]channels,
int
[]ids,
06
IChanneloutput,
bool
autoStart,
bool
waitOnStop)
07
{
08
int

count=channels.Length;
09
if
(count!=ids.Length)
10
throw

new
ArgumentException(
"ChannelandIDcountmismatch."
,
"ids"
);
11
12
forwarders=
new
ChannelForwarder[count];
13
for
(
int

i=0;i<count;i++)
14
forwarders[i]=
new
ChannelForwarder(channels[i],
15
output,ids[i],autoStart,waitOnStop);
16
17
SetRunnables((SingleRunnable[])forwarders);
18
}
19
}
20
21
public
class
ChannelDemultiplexer:SingleRunnable
22
{
23
private

HybridDictionarydictionary;
24
private

IChannelinput;
25
26
public

ChannelDemultiplexer(IChannel[]channels,
int
[]ids,
27
IChannelinput,
bool
autoStart,
bool
waitOnStop)
28
:

base
(
true
,autoStart,waitOnStop)
29
{
30
this
.input=input;
31
32
int

count=channels.Length;
33
if
(count!=ids.Length)
34
throw

new
ArgumentException(
"ChannelandIDcountmismatch."
,
"ids"
);
35
36
dictionary=
new
HybridDictionary(count,
true
);
37
for
(
int

i=0;i<count;i++)
38
dictionary.add(ids[i],channels[i]);
39
}
40
41
protected

override
void

Run()
42
{

//NOTE:IChannel.Sendisinterruptsaveand

43
//automaticallydumpstheargument.
44
while
(running)
45
{
46
MessageEnvelopeenv=(MessageEnvelope)input.Receive();
47
IChannelchannel=(IChannel)dictionary[env.ID];
48
channel.send(env.Message);
49
}
50
}
51
}

6.3通道事件网关

通道事件网关监听指定的通道,在接收到消息时触发一个事件。这个类对于基于事件的程序(例如GUI程序)很有用,或者在使用系统线程池(ThreadPool)来初始化轻量的线程。需要注意的是:使用
WinForms的程序中你不能在事件处理方法中直接访问UI控件,只能调用Invoke
方法。因为事件处理方法是由事件网关线程调用的,而不是UI线程。

viewsource

print?

01
public
class
ChannelEventGateway:SingleRunnable
02
{
03
private

IChannelsource;
04
public

event
MessageReceivedEventHandlerMessageReceived;
05
06
public

ChannelEventGateway(IChannelsource,
bool

autoStart,
07
bool

waitOnStop):
base
(
true
,autoStart,waitOnStop)
08
{
09
this
.source=source;
10
}
11
12
protected

override
void

Run()
13
{
14
while
(running)
15
{
16
object

c=source.Receive();
17
MessageReceivedEventHandlerhandler=MessageReceived;
18
if
(handler!=
null
)
19
handler(
this
,
new

MessageReceivedEventArgs(c));
20
}
21
}
22
}

7.比萨外卖店的例子

万事俱备,只欠东风。我们已经讨论了这个同步及消息传递框架中的大部分重要的结构和技术(本文没有讨论框架中的其他类如Rendezvous及Barrier)。就像开头一样,我们用一个例子来结束这篇文章。这次我们用一个小型比萨外卖店来做演示。下图展示了这个例子:四个并行进程相互之间进行通讯。图中展示了消息(数据)是如何使用跨进程通道在四个进程中流动的,且在每个进程中使用了性能更佳的跨线程通道和信箱。



一开始,一个顾客点了一个比萨和一些饮料。他调用了顾客(customer)接口的方法,向顾客订单(CustomerOrders)通道发送了一个下单(Order)消息。接单员,在顾客下单后,发送了两条配餐指令(分别对应比萨和饮料)到厨师指令(CookInstruction)通道。同时他通过收银(CashierOrder)通道将订单转发给收银台。收银台从价格中心获取总价并将票据发给顾客,希望能提高收银的速度
。与此同时,厨师将根据配餐指令将餐配好之后交给打包员工。打包员工处理好之后,等待顾客付款,然后将外卖递给顾客。

为了运行这个例子,打开4个终端(cmd.exe),用
"PizzaDemo.execook"启动多个厨师进程(多少个都可以),用
"PizzaDemo.exebackend"启动后端进程,用"PizzaDemo.exefacade"
启动顾客接口门面(用你的程序名称来代替PizzaDemo)。注意:为了模拟真实情景,某些线程(例如厨师线程)会随机休眠几秒。按下回车键就会停止和退出进程。如果你在进程正在处理数据的时候退出,你将可以在内存转存报告的结尾看到几个未处理的消息。在真实世界的程序里面,消息一般都会被转存到磁盘中,以便下次可以使用。

这个例子使用了上文中讨论过的几个机制。比如说,收银台使用一个通道复用器(ChannelMultiplexer)来监听顾客的订单和支付通道,用了两个信箱来实现价格服务。分发时使用了一个通道事件网关(ChannelEventGateway),顾客在食物打包完成之后马上会收到通知。你也可以将这些程序注册成
WindowsNT服务运行,也可以远程登录后运行。

8.总结

本文已经讨论了C#中如何基于服务的架构及实现跨进程同步和通讯。然后,这个不是唯一的解决方案。例如:在大项目中使用那么多的线程会引来严重的问题。这个框架中缺失的是事务支持及其他的通道/信箱实现(例如命名管道和TCPsockets)。这个框架中可能也有许多不足之处,请告诉我。

9.参考资料

操作系统概念(OperatingSystemConcepts),6thEdition,Silberschatz,Galvin,Gagne,ISBN0-471-48905-0

CodeProject:
UsingWin32SemaphoresinC#
ThreadMessaging.NET(一个开源库,包含了本文讨论过的类)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: