您的位置:首页 > 其它

Microsoft Orleans 之 开发人员指南

2016-07-02 01:42 513 查看

开发一个Grain

在开发Grain之前请先阅读Grains这篇文章

Grain接口

Grains通过调用各自定义在接口中的方法相互作用,一个grain实现了事先定义好的一个或多个接口,grain接口中的方法必须返回Task(如果没有返回值)或者Task<T>(如果有类型返回值)如下事例样本:
//anexampleofaGrainInterface
publicinterfaceIPlayerGrain:IGrainWithGuidKey
{
Task<IGameGrain>GetCurrentGame();
TaskJoinGame(IGameGraingame);
TaskLeaveGame(IGameGraingame);
}

//anexampleofaGrainclassimplementingaGrainInterface
publicclassPlayerGrain:Grain,IPlayerGrain
{
privateIGameGraincurrentGame;

//Gametheplayeriscurrentlyin.Maybenull.
publicTask<IGameGrain>GetCurrentGame()
{
returnTask.FromResult(currentGame);
}

//Gamegraincallsthismethodtonotifythattheplayerhasjoinedthegame.
publicTaskJoinGame(IGameGraingame)
{
currentGame=game;
Console.WriteLine("Player{0}joinedgame{1}",this.GetPrimaryKey(),game.GetPrimaryKey());
returnTaskDone.Done;
}

//Gamegraincallsthismethodtonotifythattheplayerhasleftthegame.
publicTaskLeaveGame(IGameGraingame)
{
currentGame=null;
Console.WriteLine("Player{0}leftgame{1}",this.GetPrimaryKey(),game.GetPrimaryKey());
returnTaskDone.Done;
}
}
Grain引用
一个grain引用是一个代理对象,实现了相同grain接口由相应的grain类实现。使用异步消息,grains之间实现全双工通讯,以及和客户端,grain引用是通过grain的identity并通过调用方法GrainFactory.GetGrain<T>()
来创建,开发人员使用它就像使用普通.net对象一样,它可以作为一个方法的返回值传递一个方法,如下:
在orleans客户端代码:[/code]
//constructthegrainreferenceofaspecificplayer
IPlayerGrainplayer=GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId);

在另外一个grain中使用:[/code]
//constructthegrainreferenceofaspecificplayer
IPlayerGrainplayer=GrainFactory.GetGrain<IPlayerGrain>(playerId);
Grain方法调用Orleans编程模型是一组异步编程模型,可以使用asyncandawait,更详细的文章请看这里
使用前面的例子来使用这个对象,代码如下:[/code]
//Invokingagrainmethodasynchronously
TaskjoinGameTask=player.JoinGame(this);
//The`await`keywordeffectivelyturnstheremainderofthemethodintoaclosurethatwillasynchronouslyexecuteuponcompletionoftheTaskbeingawaitedwithoutblockingtheexecutingthread.
awaitjoinGameTask;
//ThenextlineswillbeturnedintoaclosurebytheC#compiler.
players.Add(playerId);
也可以连接两个或者两个以上的Task然后组合一个Task来完成任务,这是一种比较有用的方式,等待所有的任务完成然后在统计,如下实例:[/code]
List<Task>tasks=newList<Task>();
ChirperMessagechirp=CreateNewChirpMessage(text);
foreach(IChirperSubscribersubscriberinFollowers.Values)
{
tasks.Add(subscriber.NewChirpAsync(chirp));
}
TaskjoinedTask=Task.WhenAll(tasks);
awaitjoinedTask;
[/code]

TaskDone.DoneUtilityProperty

有没有一个标准的方式返回一个void的返回值呢,答案是肯定的,Orleans为我们定义一个助手类:TaskDone.Done

客户端开发

一旦我们定义好一个grain的接口并且有相应的实现类,我们就可以开始编写我们的客户端代码,引入相应的dllOrleans.dllOrleansRuntimeInterfaces.dll几乎任何一个客户端都会涉及到grain工厂方法的使用,使用这个方法通过一个特殊的Id来引用,正如已经提到的grain不能被显式创建或者销毁。
GrainClient.Initialize();

//HardcodedplayerID
GuidplayerId=newGuid("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrainplayer=GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId);

IGameGraingame=player.CurrentGame.Result;
varwatcher=newGameObserver();
varobserver=GrainClient.GrainFactory.CreateObjectReference<IGameObserver>(watcher);
awaitgame.SubscribeForGameUpdates();
如果这个应用程序代码放在控制台应用程序的主线程中的话,你就需要调用wait()方法。
查看更多细节的关键概念部分关于使用任务执行调度和异常流的详细信息。查找或者创建一个Grains
通过调用GrainClient.Initialize()建立连接之后,在泛型工厂类中的静态方法可以用于获取一个grain的引用,例如:GrainClient.GrainFactory.GetGrain<IPlayerGrain>()
Grain接口作为GrainFactory.GetGrain<T>()的一个参数来传递。
给Grains发送消息
在orleans的编程模型中客户端与服务端的通讯类似于grain之间的通讯,区别是client的调用是可能是多线程调用,而grain之间被约束为单线程,客户端库使用TPL的线程池管理的延续和回调,
所以它是由客户端来管理自己的并发,使用任何同步构建适合其环境–锁、事件、TPL任务,等等。
获取通知
有一种情况,其中一个简单的消息/响应模式是不够的,客户端需要接收异步通知。例如,一个用户可能希望被通知当一个新的消息已发表的人。
观察者是一个单向异步接口继承自IGrainObserver,它的方法必须返回void,观察者调用一个grain的接口方法就是发出一个消息通知,除了没有返回值外,grain不依赖于任何结果,
Orleans将确保消息单向传递的,grain提供了相应订阅/发布通知的API
订阅通知必须首先创建一个实现了观察者接口的对象,然后在grainfactory里调用CreateObjectReference方法,转向grain对象的引用,然后可以将其传递给通知grain的订阅方法。
该模型也可以被其他grain用于接收异步通知。不像在客户端订阅的情况,订阅grain只是实现Observer接口作为一个面,并通过对自身的引用(e.g.
this.AsReference<IChirperViewer>
).
Example
这里是一个完整代码的例子
[/code]
namespacePlayerWatcher
{
classProgram
{
///<summary>
///Simulatesacompanionapplicationthatconnectstothegame
///thataparticularplayeriscurrentlypartof,andsubscribes
///toreceivelivenotificationsaboutitsprogress.
///</summary>
staticvoidMain(string[]args)
{
try
{
GrainClient.Initialize();

//HardcodedplayerID
GuidplayerId=newGuid("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrainplayer=GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId);
IGameGraingame=null;

while(game==null)
{
Console.WriteLine("Gettingcurrentgameforplayer{0}...",playerId);

try
{
game=player.CurrentGame.Result;
if(game==null)//Waituntiltheplayerjoinsagame
Thread.Sleep(5000);
}
catch(Exceptionexc)
{
Console.WriteLine("Exception:",exc.GetBaseException());
}
}

Console.WriteLine("Subscribingtoupdatesforgame{0}...",game.GetPrimaryKey());

//Subscribeforupdates
varwatcher=newGameObserver();
game.SubscribeForGameUpdates(GrainClient.GrainFactory.CreateObjectReference<IGameObserver>(watcher)).Wait();

//.Waitwillblockmainthreadsothattheprocessdoesn'texit.
//Updatesarriveonthreadpoolthreads.
Console.WriteLine("Subscribedsuccessfully.Press<Enter>tostop.");
Console.ReadLine();
}
catch(Exceptionexc)
{
Console.WriteLine("UnexpectedError:{0}",exc.GetBaseException());
}
}

///<summary>
///Observerclassthatimplementstheobserverinterface.
///Needtopassagrainreferencetoaninstanceofthisclasstosubscribeforupdates.
///</summary>
privateclassGameObserver:IGameObserver
{
//Receiveupdates
publicvoidUpdateGameScore(stringscore)
{
Console.WriteLine("Newgamescore:{0}",score);
}
}
}
}


运行应用程序
配置连接到Orleans
允许应用程序与外界交互的orleansgrains,框架包括客户端库。此客户端程序库可能由桌面或移动应用程序使用,或由呈现交互式网页或公开Web服务接口的前端服务器使用。
客户端库提供了一个API编写异步客户端于orleans交互。一旦客户端库连接到orleans的网关,客户端可以发送邮件到grains,接收响应,通过观察者得到grain异步通知。连接网关
建立一个连接,客户端通过调用GrainClient.Initialize(),他将连接到silo在配置文件(ClientConfiguration.xml)中指定的ip和端口,这个文件必须和
Orleans.dll放在同一个目录下,作为一种替代,你也可以通过编程的方式加载一个配置文件然后通过[code]GrainClient.Initialize()
初始化来使用。
配置客户端
在客户端配置文件中(ClientConfiguration.xml),指定网关端点的ip和端口,它需要去匹配silo中配置文件(OrleansConfiguration.xml)中指定的网关
[/code]
<ClientConfigurationxmlns="urn:orleans">
<GatewayAddress="<IPaddressorhostnameofsilo>"Port="30000"/>
</ClientConfiguration>
如果Orleans运行在WindowsAzure,客户端就会自动发现网关信息,而就不需要静态配置了,可以参考这个实例Azureapplicationsample配置silos
在OrleansConfiguration.xml配置文件中,ProxyingGateway代理网关元素指定了silo的endpoint网关(gateway),inter-silo的地址定义是通过Networking元素来定义的,必须区分开
且要跟它指定不同的端口。
[/code]
<?xmlversion="1.0"encoding="utf-8"?>
<OrleansConfigurationxmlns="urn:orleans">
<Defaults>
<NetworkingAddress=""Port="11111"/>
<ProxyingGatewayAddress=""Port="30000"/>
</Defaults>
</OrleansConfiguration>

grain持久化
grain持久化目标
1.允许不同的grain类型使用不同的存储类型providers(一个用Azure表,一个用SQLAzure)相同类型的存储提供程序,但具有不同的配置(两个使用Azure表,但一个使用存储帐户)。
2.允许存储提供程序实例配置可交换(例如,开发-测试-产品)只是配置文件的变化。
3.允许额外的存储供应商被写入后提供了一个框架,由orleans团队或其他人。
4.提供一个最小的生产级存储提供程序。
5.存储提供程序完全控制了在持久化支持存储中的数据状态数据的存储方式。
推论:Orleans没有提供一个全面的ORM的存储解决方案,但允在必要的时候许自定义存储提供程序支持指定一个ORM的要求。
grain持久化API
Grain类型可以用两种方式中的一种声明:
1.拓展Grain如果他们没有任何持久的状态,或如果他们将处理所有自己的持久状态,或者
2.Extend[code]Grain<T>如果他们有一些持久态,他们希望Orleans良运行时处理。
另外,通过拓展Grain<T>grain类型自动选择Orleans框架的系统管理的持久化。对于本节的其他部分,我们只会考虑选择#2/
Grain<T>,因为条件限制。
Grain状态存储
从Grain<T>继承的grain类(其中T是从GrainState自动导出的存储数据类型)将从指定的存储中自动加载它们的状态。Grain将标有[StorageProvider]特性指定用于读/写Grain状态的数据存储提供程序实例的命名。
[/code]
[StorageProvider(ProviderName="store1")]
publicclassMyGrain<MyGrainState>...
{
...
}
OrleansProvider管理框架提供了一种机制,以指定和注册silo配置文件不同的存储提供和存储操作。[/code]
<OrleansConfigurationxmlns="urn:orleans">
<Globals>
<StorageProviders>
<ProviderType="Orleans.Storage.MemoryStorage"Name="DevStore"/>
<ProviderType="Orleans.Storage.AzureTableStorage"Name="store1"
DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1"/>
<ProviderType="Orleans.Storage.AzureBlobStorage"Name="store2"
DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2"/>
</StorageProviders>
[/code]
配置存储providers

AzureTableStorage

<ProviderType="Orleans.Storage.AzureTableStorage"Name="TableStore"
DataConnectionString="UseDevelopmentStorage=true"/>
以下属性可以添加到<Provider/>元素来配置提供者:[/code]
DataConnectionString="..."
(强制)-Azure存储连接字符串。
TableName="OrleansGrainState"
(可选)-表存储中使用的表名称,默认OrleansGrainState
DeleteStateOnClear="false"
(可选)-如果是true,当清除grain时记录被删除,否则将写入一个空记录,默认为false
UseJsonFormat="false"
(可选)-如果为true,使用json序列化,使用Orleans的二进制序列化,默认
false
UseFullAssemblyNames="false"
(可选)-(如果
UseJsonFormat="true"
)如果为true则使用程序集全名,否则使用简单名,默认为
false
IndentJSON="false"
(可选)-(如果
UseJsonFormat="true"
)如果为true压缩json序列化,默认
false
注:表存储限制,state不应该超过64kbAzureBlobStorage
<ProviderType="Orleans.Storage.AzureTableStorage"Name="BlobStore"
DataConnectionString="UseDevelopmentStorage=true"/>
以下属性可以添加到<
Provider
/>元素来配置提供者:[/code]
DataConnectionString="..."
(强制)-Azure存储连接字符串。
ContainerName="grainstate"
(可选)-使用blob存储容器,默认
grainstate
UseFullAssemblyNames="false"
(可选)-(如果
UseJsonFormat="true"
)如果为true则使用程序集全名,否则使用简单名,默认为
false
IndentJSON="false"
(可选)-如果为true压缩json序列化,默认
false
MemoryStorage
<ProviderType="Orleans.Storage.MemoryStorage"Name="MemoryStorage"/>
注:provider持久状态保存到内存中,如果silo关闭就不复存在,所以仅工测试开发时使用。[/code]
NumStorageGrains="10"
(可选)-用于存储”状态“的grain数,默认
10
ShardedStorageProvider
<ProviderType="Orleans.Storage.ShardedStorageProvider"Name="ShardedStorage">
<Provider/>
<Provider/>
<Provider/>
</Provider>
用于写入在多个其他存储提供程序中共享的Grain状态数据的简单存储提供程序。一致的散列函数(默认JenkinsHash)来决定哪些碎片(按它们在配置文件中定义的顺序)是负责为指定的grain存储状态数据,然后通过适当的底层provider执行读/写/清除请求
注:JenkinsHash算法是一种算法,如果想要了解更多去网上搜索。存储provider注解

如果继承了Grain<T>的grain的类没有指定
[StorageProvider]
特性,那么就搜索默认的provider,如果没有找到,那么这将被视为一个丢弃的存储provider。
如果在silo配置文件中只有一个provider,它将被视为这个silo的默认provider。
一个了使用不存在但是在silo配置中定义的存储provider的grain,当silo加载器加载失败时,silo的其他部分仍然会加载,那么以后当调用这个grain时,将会抛出Orleans.Storage.BadProviderConfigException异常,指定这个grain无法加载。
存储provider实例使用一个给定的grain类型是由在grain类型的[StorageProvider]特性定义的存储提供程序名称的组合决定的,再加上provider的类型和在silo配置中确定确定的provider配置选项。
不同的grain类型可以使用不同的配置存储provider,即使两个都是同一类型的:例如:两种不同的Azure表存储提provider的实例,连接到不同的Azure存储账号(请参见上面的配置文件示例)。存储provider的所有配置细节都是静态定义的,在silo启动时读取到silo结构中,在这个时候没有什么机制可以动态地更新或改变一个silo使用的存储provider的列表。
然而,这是一个优先次序/工作负载的约束,而不是一个基本的设计约束。State存储APISgrain有两个主要的部分state/persistenceAPIs:Grain-to-RuntimeandRuntime-to-Storage-Provider.GrainState存储API
在Orleans运行状态存储功能将为grain提供grain的读写操作和自动填充/保存grain的GrainState数据对象。
在后台,通过适当的持久化provider配置将为grain提供这些功能连接(由Orleans客户创工具在生成的代码)。Grain状态的读写功能
当Grain激活时,它的状态将自动读取,但是,grain是负责明确地触发写入任何改变的晶粒状态,在必要时。
参见下面的错误处理机制的详细信息的故障模式部分(FailureModes)。
在OnActivateAsync()(等同于base.ReadStateAsync()的调用)方法的调用时将激活
GrainState
的自动读取,在任何调用grain的方法之前
GrainState
的状态是不会刷新,除非是,grain激活后的这个调用。
任何grain的方法调用时,可以要求Orleans运行时写当前grain的状态数据,激活指定的存储provider通过调用writestateasync()。当他们对状态数据进行重大更新时,grain负责显式地执行写操作。
通常,grain的方法将返回base.writestateasync()Task从grain方法返回结果Task作为最终的结果,但它并不需要遵循这个模式。在任何grain方法后,运行时不会自动更新储存的grain的状态。
在grain的任何一种方法或计时器回调处理程序中,grain可以request奥尔良运行时重新读取当前grain状态数据,通过调用base.readstateasync()激活从指定的存储provider。
这将完全覆盖当前存储在当前状态对象中的任何当前状态数据,这些数据是由持久存储的最新值读取的。
(这句没法翻译了,直接上原文)
Anopaqueprovider-specific
Etag
value(
string
)maybesetbyastorageprovideraspartofthegrainstatemetadatapopulatedwhenstatewasread.Someprovidersmaychoosetoleavethisas
null
iftheydonotuse
Etag
s.从概念上讲,[/code]
任何写操作期间Orleans运行时将以grain状态数据对象的深层副本供自己使用。在重写的情况下,运行时可以使用优化规则和启发式,以避免在某些情况下,在某些情况下执行一些或所有的深拷贝,提供预期的逻辑隔离语义被保留。

Grain状态的读写操作的示例代码

为了加入Orleans的grain状态持久性机制,必须拓展Grain类。
上述定义中的T将被此grain的特定应用程序特定的grain状态类所取代;见下面的示例。
grain类也应该有一个[storageprovider]特性告诉运行时存储provider(实例)使用这种类型的grain。

publicinterfaceMyGrainState:GrainState
{
publicintField1{get;set;}
publicstringField2{get;set;}
}[StorageProvider(ProviderName="store1")]
publicclassMyPersistenceGrain:Grain<MyGrainState>,IMyPersistenceGrain
{
...
}
[/code]
GrainStateRead在grain的OnActivateAsync()方法调用之前,grain的读取状态将会自动发生,没有应用程序代码使使这种情况发生是必需的。要查看这个状态,可以通过grain类的Grain<T>.State属性。GrainStateWrite在对grain的内存状态作出适当的改变,grain应该调用base.writestateasync()方法将更改写入持久存储,通过定义在存储provider的grain。此方法是异步的,并返回一个Task。
publicTaskDoWrite(intval)
{
State.Field1=val;
returnbase.WriteStateAsync();
}
[/code]
GrainStateRefresh如果一个grain希望显式地重新从provider读取这个grain的最新状态,grain应该调用
base.ReadStateAsync()
方法,这将重新加载grain的状态从持久存储里,通过定义的存储provider的grain类型,在grain状态存储器复制以往任何将被覆盖和取代base.readstateasync()任务完成时。
publicasyncTask<int>DoRead()
{
awaitbase.ReadStateAsync();
returnState.Field1;
}
grain状态持久化操作的失效模式
grain状态读取操作的失效模式在初始读取该特定grain的状态数据时,存储提供程序返回的故障将导致该grain的激活操作失败;既然这样,没有必要再去调用grain的OnActivateAsync()在生命周期内,在grain激活过程中,由于任何其他故障,将原始请求原路返回。
常见的存储provider为一个特定的grain读取状态数据会导致
ReadStateAsync()
Task调用失败
。grain可以选择处理或忽略,失败的Task,在Orleans就像任何其他的Task。
任何试图在加载silo期间没有正确加载provider的框架内的grain发送消息,都将会收到一个异常Orleans.BadProviderConfigExceptiongrain状态写入操作的故障模式
Failuresencounteredbythestorageprovidertowritestatedataforaparticulargrainwillresultinthe
WriteStateAsync()
Task
tobefaulted.Usually,
thiswillmeanthegraincallwillbefaultedbacktotheclientcallerprovidedthe
WriteStateAsync()
Task
iscorrectlychainedintothefinalreturn
Task
forthisgrainmethod.However,
itwillbepossibleforcertainadvancedscenariostowritegraincodetospecificallyhandlesuchwriteerrors,
justliketheycanhandleanyotherfaulted
Task
.Grainsthatexecuteerror-handling/recoverycodemustcatchexceptions/faulted
WriteStateAsync()
Task
sandnotre-throwto
signifythattheyhavesuccessfullyhandledthewriteerror.注意:此段对于程序中的细节处理非常重要[/code]
StorageProviderFramework有编写额外的持久化provider的服务提供程序接口–
IStorageProvider。
持久性提供程序API包括读和写操作grainstate数据。
publicinterfaceIStorageProvider
{
LoggerLog{get;}
TaskInit();
TaskClose();TaskReadStateAsync(stringgrainType,GrainIdgrainId,GrainStategrainState);
TaskWriteStateAsync(stringgrainType,GrainIdgrainId,GrainStategrainState);
}
[/code]
StorageProviderSemantics任何试图执行写操作时,存储provider检测ETag违反约束导致写的Task被终端,就使用Orleans.InconsistentStateException包装底层存储异常。
publicclassInconsistentStateException:AggregateException
{
///<summary>TheEtagvaluecurrentlyheldinpersistentstorage.</summary>
publicstringStoredEtag{get;privateset;}
///<summary>TheEtagvaluecurrentlyheldinmemory,andattemptingtobeupdated.</summary>
publicstringCurrentEtag{get;privateset;}publicInconsistentStateException(
stringerrorMsg,
stringstoredEtag,
stringcurrentEtag,
ExceptionstorageException
):base(errorMsg,storageException)
{
this.StoredEtag=storedEtag;
this.CurrentEtag=currentEtag;
}publicInconsistentStateException(stringstoredEtag,stringcurrentEtag,ExceptionstorageException)
:this(storageException.Message,storedEtag,currentEtag,storageException)
{}
}

DataMapping个人存储provider必须决定如何最好地储存grain的状态–BLOB(各种格式/序列化格式)或列每场是显而易见的选择。对于table的基本存储provider编码状态数据字段到使用orlenas二进制序列化单表列。
[/code]
ApplicationBootstrappingwithinaSilo当silos上线在应用程序需要运行一些“自动执行”功能的几种情况。我们现在已经增加了支持这种自动运行功能通过配置“orleanssiloprovider”。例如:
<OrleansConfigurationxmlns="urn:orleans">
<Globals>
<BootstrapProviders>
<ProviderType="My.App.BootstrapClass1"Name="bootstrap1"/>
<ProviderType="My.App.BootstrapClass2"Name="bootstrap2"/>
</BootstrapProviders>
</Globals>
</OrleansConfiguration>
它也可以注册programaticaly引导供provider,通过调用:[/code]
publicvoidRegisterBootstrapProvider(stringproviderTypeFullName,stringproviderName,IDictionary<string,string>properties=null)publicvoidRegisterBootstrapProvider<T>(stringproviderName,IDictionary<string,string>properties=null)whereT:IBootstrapProvider
Orleans.Runtime.Configuration.GlobalConfiguration
class.这个启动项provider是实现了Orleans.Providers.IBootstrapProvider接口
当silo启动时,Orleans运行时就是实例化bootstrap类列表,在适当的运行上下文执行时调用他们的init方法,作为一个客户端发送消息到grains[/code]
TaskInit(
stringname,
IProviderRuntimeproviderRuntime,
IProviderConfigurationconfig)
例外情况是,当silo启动调用silo抛出异常时,这个silo就会停止。Thisfail-fastapproachisthestandardwaythatOrleanshandlessilostart-upissues,
andisintendedtoallowanyproblemswithsiloconfigurationand/orbootstraplogictobeeasily
detectedduringtestingphasesratherthanbeingsilentlyignoredandcausingunexpectedproblemslaterinthesilolifecycle.定时器和提醒(Timersandreminders)Orleans运行时提供了两个机制:定时器和提醒的功能,允许grain周期性的执行某一个行为。[/code]
Timers描述:定时器是用来创建周期性grain的行为,不需要跨多个激活的grain实例。它本质上是一组.NETSystem.Threading.Timer类,另外它是受单线程执行保护的grain激活。每个激活可能有与它相关联的零或更多的计时器,运行时在激活的运行时上下文中执行每个定时程序。用法:开始一个定时器,要使用Grain.RegisterTimer方法,返回一个实现IDisposable引用的实例
protectedIDisposableRegisterTimer(Func<object,Task>asyncCallback,objectstate,TimeSpandueTime,TimeSpanperiod)
asyncCallback当定时器到执行时间的时候调用的一个函数或者功能。[/code]
state当执行
asyncCallback时传入的一个对象
dueTime指定定时器开始执行时第一次执行的等待时间。
period指定定时器执行的时间间隔取消定时器的处理:当激活被deactivated或当一个故障发生时,一个定时器将停止触发,并且它的silo崩溃。重要的考虑因素:
[/code]Whenactivationcollectionisenabled,theexecutionofatimercallbackdoesnotchangetheactivation’sstatefromidletoinuse.Thismeansthatatimercannotbeusedtopostponedeactivationofotherwiseidleactivations.TheperiodpassedtoGrain.RegisterTimeristheamountoftimethatpassesfromthemomenttheTaskreturnedbyasyncCallbackisresolvedtothemomentthatthenextinvocationofasyncCallbackshouldoccur.ThisnotonlymakesitimpossibleforsuccessivecallstoasyncCallbacktooverlapbutalsomakesitsothatthelengthoftimeasyncCallbacktakestocompleteaffectsthefrequencyatwhichasyncCallbackisinvoked.ThisisanimportantdeviationfromthesemanticsofSystem.Threading.Timer.EachinvocationofasyncCallbackisdeliveredtoanactivationonaseparateturnandwillneverrunconcurrentlywithotherturnsonthesameactivation.Notehowever,asyncCallbackinvocationsarenotdeliveredasmessagesandarethusnotsubjecttomessageinterleavingsemantics.ThismeansthatinvocationsofasyncCallbackshouldbeconsideredtobehaveasifrunningonareentrantgrainwithrespecttoothermessagestothatgrain.

Reminders

提醒类似于定时器,但是有几个重要的区别描述:使用:提醒持续存在,并将继续触发在所有情况下(包括部分或全部集群重新启动)除非明确取消。提醒与一个grain,没有任何特定的激活.Ifagrainhasnoactivationassociatedwithitandareminderticks,onewillbecreated.e.g.:Ifanactivationbecomesidleandisdeactivated,areminderassociatedwiththesamegrainwillreactivatethegrainwhenitticksnext.Remindersaredeliveredbymessageandaresubjecttothesameinterleavingsemanticsasallothergrainmethods.提醒不应该被用于高频定时器,它们的周期应该在几分钟,几小时或几天内测量。

Configuration

提醒,是持久的,依赖于存储到功能。您必须指定在提醒子系统功能之前使用的存储支持。提醒功能是通过在服务器端配置的systemstore元控制。它与AzureTable或SQLServer存储一起协作。

<SystemStoreSystemStoreType="AzureTable"/>OR
<SystemStoreSystemStoreType="SqlServer"/>
如果你只是想提醒占个位置来运作,而不需要建立一个Azure帐户或SQL数据库,那么添加此元素的配置文件(“Globals”)会给你一个发展的唯一实现的提醒系统:[/code]
<ReminderServiceReminderServiceType="ReminderTableGrain"/>
使用:如果一个grain要使用提醒功能就必须实现接口IRemindable内的方法RecieveReminder
[/code]
TaskIRemindable.ReceiveReminder(stringreminderName,TickStatusstatus)
{
Console.WriteLine("Thanksforremindingme--Ialmostforgot!");
returnTaskDone.Done;
}
去启动一个提醒功能,使用Grain.RegisterOrUpdateReminder方法,然后他会返回IOrleansReminder的一个对象[/code]
protectedTask<IOrleansReminder>RegisterOrUpdateReminder(stringreminderName,TimeSpandueTime,TimeSpanperiod)
reminderName是一个唯一的标识,在grain上下文范围内。dueTime指定第一次提醒多久开始执行。period指定执行间隔。由于激活的生命周期就是grain唯一的生命周期,所以你必须明确的取消,使用Grain.UnregisterReminder:
protectedTaskUnregisterReminder(IOrleansReminderreminder)
Grain.RegisterOrUpdateReminder.方法的调用会返回提醒操作handler如果你想在一个持续的方式识别一个提醒,使用一个包含提醒的名称的字符串的IOrleansReminder实例不能保证有效期超出一个激活的寿命。
如果你只是想通过提醒的name来的到一个提醒实例的话,调用Grain.GetReminder方法。[/code]
protectedTask<IOrleansReminder>GetReminder(stringreminderName)
我们该用那个一个功能呢?
1.在下面几种情况使用定时器[/code]Itdoesn’tmatter(orisdesirable)thatthetimerceasestofunctioniftheactivationisdeactivatedorfailuresoccur.Iftheresolutionofthetimerissmall(e.g.reasonablyexpressibleinsecondsorminutes).Thetimercallbackcanbestartedfrom
Grain.OnActivateAsync
orwhenagrainmethodisinvoked.
2.这几种情况下使用提醒功能
Whentheperiodicbehaviorneedstosurvivetheactivationandanyfailures.Toperforminfrequenttasks(e.g.reasonablyexpressibleinminutes,hours,ordays).
组合提醒和定时器
Youmightconsiderusingacombinationofremindersandtimerstoaccomplishyourgoal.
Forexample,ifyouneedatimerwithasmallresolutionthatneedstosurviveacrossactivations,
youcanuseareminderthatrunseveryfiveminutes,whosepurposeistowakeup
agrainthatrestartsalocaltimerthatmayhavebeenlostduetoadeactivation.


OrleansStreams

Orleansv.1.0.0添加流扩展的编程模型支持,流扩展提供了一组抽象接口api,使它的流更简单和更强大的,流扩展允许开发人员以结构化的方式写操作在一系列事件上的应用程序。流provider的可扩展性模型使得在广泛的现有的队列技术的编程模型兼容和方便,例如:EventHubs,ServiceBus,AzureQueues,和ApacheKafka.而不必要写特殊代码或运行专用程序与这样的队列进行交互。我为什么要关心?如果你已经了解了一些关于流处理的技术如:EventHubs,Kafka,AzureStreamAnalytics,ApacheStorm,ApacheSparkStreaming,和ReactiveExtensions(Rx)你可能会问你为什么要关心,为什么我们需要另一个流处理系统和Actor是如何相关的流?“WhyOrleansStreams?”可以回答你的问题编程模型下面是一些Orleans流编程模型原则:在Orleans的体系里Orleansvirtualactors,Orleans流是虚拟的,也就是说,一个流总是存在的。它不是显式创建或销毁,它永远不会失败。流的身份ID识别,这只是逻辑名称由GUID字符串表示。Orleans流数据的生成将允许从时间和空间上减弱它们的依赖性。这意味着流的生产者和流的消费者可能会在不同的服务器上,在不同的时间,并将承受失败。Orleans的流是轻量级和动态的,Orleans流运行时的设计是处理高速大数据。Orleans流是动态绑定的,Orleans流运行时的设计处理的情况下,grain连接和断开流在一个较高的速度。Orleans流运行时透明地管理流消费的生命周期。当应用程序订阅一个流,然后它将接收流的事件,即使是在出现故障。Orleans流均匀分布在grain和工作的客户端。编程APIs
应用程序流访问APIs类似于众所周知的ReactiveExtensions(Rx)in.NET,通过使用[code]Orleans.Streams.IAsyncStream<T>
实现了
Orleans.Streams.IAsyncObserver<T>
Orleans.Streams.IAsyncObservable<T>
的接口。
在下面的一个典型例子产生的一些数据,这是一个HTTP请求的服务在云中运行的请求。Orleans客户端运行在前端服务器接收HTTP调用和发布数据流匹配装置:[/code]
publicasyncTaskOnHttpCall(DeviceEventdeviceEvent)
{
//Postdatadirectlyintodevice'sstream.
IStreamProviderstreamProvider=GrainClient.GetStreamProvider("myStreamProvider");
IAsyncStream<DeviceEventData>deviceStream=streamProvider.GetStream<DeviceEventData>(deviceEvent.DeviceId);
awaitchatStream.OnNextAsync(deviceEvent.Data);
}
另一个例子是在聊天的用户(如Orleans的grain)实施加入聊天室,得到一个处理流的聊天信息在这房间里的所有其他用户产生和订阅它。注意,聊天用户既不需要知道聊天室的grain本身(可能不会有这样的grain在我们的系统中),也不是在该组的其他用户产生消息。不用说,对产生的聊天流,用户不需要知道谁是目前订阅的流。这演示了如何聊天用户可以完全解耦的时间和空间。[/code]
publicclassChatUser:Grain
{
publicasyncTaskJoinChat(stringchatGroupName)
{
IStreamProviderstreamProvider=base.GetStreamProvider("myStreamProvider");
IAsyncStream<string>chatStream=streamProvider.GetStream<string>(chatGroupName);
awaitchatStream.SubscribeAsync((stringchatEvent)=>Console.Out.Write(chatEvent));
}
}
快速入门示例QuickStartSample(这里是一个演示的例子),StreamsProgrammingAPIs(流编程模型APIs)[/code]

StreamProviders

流可以通过各种形状和形式的物理信道,可以有不同的语义。Orleans流的设计是支持多种StreamProviders的概念,这是系统中的一个可拓展点,Orleans目前提供两种Streamproviders。基本的Tcp通信SimpleMessageStreamProvider和云队列AzureQueueStreamProvider,你可以在这里(StreamProviders)找到更消息的介绍

Stream意义

StreamSubsriptionSemantics:OrleansStreamsguaranteeSequentialConsistencyforStreamSubsriptionoperations.Specificaly,whenconsumersubscribestoastream,oncethe
Task
representingthesubsriptionoperationwassuccessfulyresolved,theconsumerwillseealleventsthatweregeneratedafterithassubscribed.Inaddition,Rewindablestreamsallowtosubscribefromanarbitrarypointintimeinthepastbyusing
StreamSequenceToken
(moredetailscanbefoundhere).IndividualStreamEventsDeliveryGuarantees:Individualeventdeliveryguaranteesdependonindividualstreamproviders.Someprovideonlybest-effortat-most-oncedelivery(suchasSimpleMessageStreams),whileothersprovideat-least-oncedelivery(suchasAzureQueueStreams).Itisevenpossibletobuildastreamproviderthatwillguaranteeexactly-oncedelivery(wedon’thavesuchaprovideryet,butitispossibletobuildonewiththeextensabilitymodel).EventsDeliveryOrder:Eventorderalsodependsonaparticularstreamprovider.InSMSstreams,theproducerexplicitellycontrolstheorderofeventsseenbytheconsumerbycontrollingthewayitpublishesthem.AzureQueuestreamsdonotguaranteeFIFOorder,sincetheunderlayingAzureQueuesdonotguaranteeorderinfailurecases.Applicationscanalsocontroltheirownstreamdeliveryordering,byusing
StreamSequenceToken
.流实施OrleansStreamsImplementation提供了一个高层次的内部实施概述。

Streams的可拓展性

OrleansStreamsExtensibility描述如何用新的功能扩展流。

CodeSamples

更多的例子:here.
更多的资料:
OrleansVirtualMeetupaboutStreamsOrleansStreamingPresentationfromVirtualMeetup

调试符号

在开发期间Orleans调试比较简单,直接附加进程,但是对于在生产环境来说,就无法断点调试了,采用跟踪是最好的办法。标记(Symbols):SymbolsforOrleansbinariesarepublishedtohttps://nuget.smbsrc.netsymbolsserver.AddittothelistofsymbolsserverintheVisualStudiooptionsunderDebugging/SymbolsfordebuggingOrleanscode.MakesurethereistralingslashintheURL.VisualStudio2015hasabugwithparsingit.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: