suasync包中Deferred类API及应用示例
2013-09-30 13:55
148 查看
publicfinalclassDeferred<T>extendsObject
Athread-safeimplementationofadeferredresultforeasyasynchronousprocessing.
ThisimplementationisbasedonTwisted'sPython
DeferredAPI.
Sourcecodeof
defer.py
ThisAPIisasimpleandelegantwayofmanagingasynchronousanddynamic"pipelines"(processingchains)withouthavingtoexplicitlydefineafinitestatemachine.
Thetl;drversion
We'reallbusyanddon'talwayshavetimetoRTFMindetails.Pleasepayspecialattentiontothe
Otherthanthat,here'sanexecutivesummaryofwhat
Deferredoffers:
A
Deferredislikea
Futurewithadynamic
Callbackchainassociatedtoit.
Whenthedeferredresultbecomesavailable,thecallbackchaingetstriggered.
Theresultofonecallbackinthechainispassedontothenext.
Whenacallbackreturnsanother
Deferred,thenextcallbackinthechaindoesn'tgetexecuteduntilthatother
Deferredresultbecomesavailable.
Thereareactuallytwocallbackchains.Oneisfornormalprocessing,theotherisforerrorhandling/recovery.A
Callback
thathandleserrorsiscalledan"errback".
Deferredisanimportantbuildingblockforwritingeasy-to-useasynchronousAPIsinathread-safefashion.
Understandingtheconceptof
Deferred
Theideaisthata
Deferredrepresentsaresultthat'snotyetavailable.Anasynchronousoperation(I/O,RPC,whatever)hasbeenstartedandwillhanditsresult(beitsuccessfulornot)tothe
Deferredinthefuture.Thekeydifference
betweena
Deferredanda
Futureisthata
Deferredhasacallbackchainassociatedtoit,whereaswithjusta
Futureyouneedgettheresultmanuallyatsomepoint,whichposesproblemssuchas:Howdoyouknowwhentheresultisavailable?Whatiftheresultitself
dependsonanotherfuture?
Whenyoustartanasynchronousoperation,youtypicallywanttobecalledbackwhentheoperationcompletes.Iftheoperationwassuccessful,youwantyourcallbacktouseitsresulttocarryonwhatyouweredoingatthetimeyoustartedtheasynchronousoperation.
Iftherewasanerror,youwanttotriggersomeerrorhandlingcode.
Butthere'smoretoa
Deferredthanasinglecallback.Youcanaddarbitrarynumberofcallbacks,whicheffectivelyallowsyoutoeasilybuildcomplexprocessingpipelinesinareallysimpleandelegantway.
Understandingthecallbackchain
Let'stakeatypicalexample.You'rewritingaclientlibraryforotherstouseyoursimpleremotestorageservice.Whenyouruserscallthe
getmethodinyourlibrary,youwanttoretrievesomepieceofdatafromyourremoteserviceandhandit
backtotheuser,butyouwanttodosoinanasynchronousfashion.
Whentheuserofyourclientlibraryinvokes
get,youassemblearequestandsenditouttotheremoteserverthroughasocket.Beforesendingittothesocket,youcreatea
Deferredandyoustoreitsomewhere,forexampleinamap,
tokeepanassociationbetweentherequestandthis
Deferred.Youthenreturnthis
Deferredtotheuser,thisishowtheywillaccessthedeferredresultassoonastheRPCcompletes.
Soonerorlater,theRPCwillcomplete(successfullyornot),andyoursocketwillbecomereadable(ormaybeclosed,intheeventofafailure).Let'sassumefornowthateverythingworksasexpected,andthusthesocketisreadable,soyoureadtheresponse
fromthesocket.Atthispointyouextracttheresultoftheremote
getcall,andyouhanditouttothe
Deferredyoucreatedforthisrequest(remember,youhadtostoreitsomewhere,soyoucouldgiveitthedeferredresultonceyou
haveit).The
Deferredthenstoresthisresultandtriggersanycallbackthatmayhavebeenaddedtoit.Theexpectationisthattheuserofyourclientlibrary,aftercallingyour
getmethod,willadda
Callback
tothe
Deferredyougavethem.Thisway,whenthedeferredresultbecomesavailable,you'llcallitwiththeresultinargument.
Sofarwhatwe'veexplainedisnothingmorethana
Futurewithacallbackassociatedtoit.Butthere'smoreto
Deferredthanjustthis.Let'sassumenowthatsomeoneelsewantstobuildacachinglayerontopofyourclientlibrary,
toavoidrepeatedly
gettingthesamevalueoverandoveragainthroughthenetwork.Userswhowanttousethecachewillinvoke
getonthecachinglibraryinsteadofdirectlycallingyourclientlibrary.
Let'sassumethatthecachinglibraryalreadyhasaresultcachedfora
getcall.Itwillcreatea
Deferred,andimmediatelyhanditthecachedresult,anditwillreturnthis
Deferredtotheuser.Theuserwilladda
Callbacktoit,whichwillbeimmediatelyinvokedsincethedeferredresultisalreadyavailable.Sotheentire
getcallcompletedvirtuallyinstantaneouslyandentirelyfromthesamethread.Therewasnocontextswitch(nootherthread
involved,noI/Oandwhatnot),nothingeverblocked,everythingjusthappenedreallyquickly.
Nowlet'sassumethatthecachinglibraryhasacachemissandneedstodoaremote
getcallusingtheoriginalclientlibrarydescribedearlier.TheRPCissentouttotheremoteserverandtheclientlibraryreturnsa
Deferredtothe
cachinglibrary.Thisiswherethingsbecomeexciting.Thecachinglibrarycanthenadditsowncallbacktothe
Deferredbeforereturningittotheuser.Thiscallbackwilltaketheresultthatcamebackfromtheremoteserver,addittothecache
andreturnit.Asusual,theuserthenaddstheirowncallbacktoprocesstheresult.Sonowthe
Deferredhas2callbacksassociatedtoit:
1stcallback2ndcallback
Deferred:addtocache-->usercallback
WhentheRPCcompletes,theoriginalclientlibrarywillde-serializetheresultfromthewireandhanditouttothe
Deferred.Thefirstcallbackwillbeinvoked,whichwilladdtheresulttothecacheofthecachinglibrary.Thenwhateverthe
firstcallbackreturnswillbepassedontothesecondcallback.Itturnsoutthatthecachingcallbackreturnsthe
getresponseunchanged,sothatwillbepassedontotheusercallback.
Nowit'sveryimportanttounderstandthatthefirstcallbackcouldhavereturnedanotherarbitraryvalue,andthat'swhatwouldhavebeenpassedtothesecondcallback.Thismaysoundweirdatfirstbutit'sactuallythekeybehind
Deferred.
Toillustratewhy,let'scomplicatethingsabitmore.Let'sassumetheremoteservicethatservesthose
getrequestsisafairlysimpleandlow-levelstorageservice(think
memcached),soitonlyworkswithbytearrays,itdoesn'tcarewhatthecontentsis.Sotheoriginalclientlibraryisonlyde-serializingthebytearrayfromthenetworkandhandingthatbytearraytothe
Deferred.
Nowyou'rewritingahigher-levellibrarythatusesthisstoragesystemtostoresomeofyourcustomobjects.Sowhenyougetthebytearrayfromtheserver,youneedtofurtherde-serializeitintosomekindofanobject.Usersofyourhigher-levellibrary
don'tcareaboutwhatkindofremotestoragesystemyouuse,theonlythingtheycareaboutis
gettingthoseobjectsasynchronously.Yourhigher-levellibraryisbuiltontopoftheoriginallow-levellibrarythatdoestheRPCcommunication.
Whentheusersofthehigher-levellibrarycall
get,youcall
getonthelower-levellibrary,whichissuesanRPCcallandreturnsa
Deferredtothehigher-levellibrary.Thehigher-levellibrarythenaddsafirstcallbacktofurtherde-serializethebytearrayintoanobject.Thentheuserofthehigher-levellibraryaddstheirowncallbackthatdoessomethingwiththatobject.Sonow
wehavesomethingthatlookslikethis:
1stcallback2ndcallback
Deferred:de-serializetoanobject-->usercallback
Whentheresultcomesinfromthenetwork,thebytearrayisde-serializedfromthesocket.Thefirstcallbackisinvokedanditsargumentistheinitialresult,thebytearray.Sothefirstcallbackfurtherde-serializesitintosomeobject
thatitreturns.Thesecondcallbackistheninvokedanditsargumentistheresultofthepreviouscallback,thatisthede-serializedobject.
Nowbacktothecachinglibrary,whichhasnothingtodowiththehigherlevellibrary.Allitdoesis,givenanobjectthatimplementssomeinterfacewitha
getmethod,itkeepsamapofwhateverarguments
getreceivestoan
Objectthatwascachedforthisparticular
getcall.Thankstothewaythecallbackchainworks,it'spossibletousethecachinglibrarytogetherwiththehigher-levellibrarytransparently.Userswhowanttousecachingsimplyneedtousethecachinglibrarytogetherwiththehigherlevel
library.Nowwhentheycall
getonthecachinglibrary,andthere'sacachemiss,here'swhathappens,stepbystep:
1.Thecachinglibrarycalls
getonthehigher-levellibrary.
2.Thehigher-levellibrarycalls
getonthelower-levellibrary.
3.Thelower-levellibrarycreatesa
Deferred,issuesouttheRPCcallandreturnsits
Deferred.
4.Thehigher-levellibraryaddsitsownobjectde-serializationcallbacktothe
Deferredandreturnsit.
5.Thecachinglibraryaddsitsowncache-updatingcallbacktothe
Deferredandreturnsit.
6.Theusergetsthe
Deferredandaddstheirowncallbacktodosomethingwiththeobjectretrievedfromthedatastore.
1stcallback2ndcallback3rdcallback
Deferred:de-serialize-->addtocache-->usercallback
result:(noneavailable)
Oncetheresponsecomesback,thefirstcallbackisinvoked,itde-serializestheobject,returnsit.Thecurrentresultofthe
Deferredbecomesthede-serializedobject.Thecurrentstateofthe
Deferredisasfollows:
2ndcallback3rdcallback
Deferred:addtocache-->usercallback
result:de-serializedobject
Becausetherearemorecallbacksinthechain,the
Deferredinvokesthenextoneandgivesitthecurrentresult(thede-serializedobject)inargument.Thecallbackaddsthatobjecttoitscacheandreturnsitunchanged.
3rdcallback
Deferred:usercallback
result:de-serializedobject
Finally,theuser'scallbackisinvokedwiththeobjectinargument.
Deferred:(nomorecallbacks)
result:(whatevertheuser'scallbackreturned)
Ifyouthinkthisisbecominginteresting,readon,youhaven'treachedthemostinterestingthingabout
Deferredyet.
BuildingdynamicprocessingpipelineswithDeferred
Let'scomplicatethepreviousexamplealittlebitmore.Let'sassumethattheremotestorageservicethatservesthose
getcallsisadistributedservicethatrunsonmanymachines.Thedataispartitionedovermanynodesandmovesaroundasnodes
comeandgo(duetomachinefailuresandwhatnot).Inordertoexecutea
getcall,thelow-levelclientlibraryfirstneedstoknowwhichserveriscurrentlyservingthatpieceofdata.Let'sassumethatthere'sanotherserver,whichispartofthat
distributedservice,thatmaintainsanindexandkeepstrackofwhereeachpieceofdatais.Thelow-levelclientlibraryfirstneedstolookupthelocationofthedatausingthatfirstserver(that'safirstRPC),thenretrievesitfromthestoragenode(that's
anotherRPC).Endusersdon'tcarethatretrievingdatainvolvesa2-stepprocess,theyjustwanttocall
getandbecalledbackwhenthedata(abytearray)isavailable.
Thisiswherewhat'sprobablythemostusefulfeatureof
Deferredcomesin.Whentheusercalls
get,thelow-levellibrarywillissueafirstRPCtotheindexservertolocatethepieceofdatarequestedbytheuser.Whenissuingthis
lookupRPC,
a
Deferredgetscreated.Thelow-level
getcodeaddsafirstcallbacktoprocessthe
lookupresponseandthenreturnsittotheuser.
1stcallback2ndcallback
Deferred:indexlookup-->usercallback
result:(noneavailable)
Eventually,the
lookupRPCcompletes,andthe
Deferredisgiventhe
lookupresponse.Sobeforetriggeringthefirstcallback,the
Deferredwillbeinthisstate:
1stcallback2ndcallback
Deferred:indexlookup-->usercallback
result:lookupresponse
Thefirstcallbackrunsandnowknowswheretofindthepieceofdatainitiallyrequested.Itissuesthe
getrequesttotherightstoragenode.Doingsocreates
another
Deferred,let'scallit
(B),whichisthenreturnedbythe
indexlookupcallback.Andthisiswherethemagichappens.Nowwe'reinthisstate:
(A)2ndcallback|(B)
|
Deferred:usercallback|Deferred:(nomorecallbacks)
result:Deferred(B)|result:(noneavailable)
Becauseacallbackreturneda
Deferred,wecan'tinvoketheusercallbackjustyet,sincetheuserdoesn'twanttheircallbackreceivea
Deferred,theywantittoreceiveabytearray.Thecurrentcallbackgetspausedandstops
processingthecallbackchain.Thiscallbackchainneedstoberesumedwheneverthe
Deferredofthe
getcall[
(B)]completes.Inordertoachievethat,acallbackisaddedtothatother
Deferredthatwill
resumetheexecutionofthecallbackchain.
(A)2ndcallback|(B)1stcallback
|
Deferred:usercallback|Deferred:resume(A)
result:Deferred(B)|result:(noneavailable)
Once
(A)addedthecallbackon
(B),itcanreturnimmediately,there'snoneedtowait,blockathreadoranythinglikethat.Sothewholeprocessofreceivingthe
lookupresponseandsendingoutthe
getRPChappenedreallyquickly,withoutblockinganything.
Nowwhenthe
getresponsecomesbackfromthenetwork,theRPClayerde-serializesthebytearray,asusual,andhandsitto
(B):
(A)2ndcallback|(B)1stcallback
|
Deferred:usercallback|Deferred:resume(A)
result:Deferred(B)|result:bytearray
(B)'sfirstandonlycallbackisgoingtosettheresultof
(A)andresume
(A)'scallbackchain.
(A)2ndcallback|(B)1stcallback
|
Deferred:usercallback|Deferred:resume(A)
result:bytearray|result:bytearray
Sonow
(A)resumesitscallbackchain,andinvokestheuser'scallbackwiththebytearrayinargument,whichiswhattheywanted.
(A)|(B)1stcallback
|
Deferred:(nomorecb)|Deferred:resume(A)
result:(returnvalueof|result:bytearray
theuser'scb)
Then
(B)movesontoitsnextcallbackinthechain,buttherearenone,so
(B)isdonetoo.
(A)|(B)
|
Deferred:(nomorecb)|Deferred:(nomorecb)
result:(returnvalueof|result:bytearray
theuser'scb)
Thewholeprocessofreadingthe
getresponse,resumingtheinitial
Deferredandexecutingthesecond
Deferredhappenedallinthesamethread,sequentially,andwithoutblockinganything(providedthattheuser'scallbackdidn'tblock,asitmustnot).
Whatwe'vedoneisessentiallyequivalenttodynamicallybuildinganimplicitfinitestatemachinetohandlethelifecycleofthe
getrequest.ThissimpleAPIallowsyoutobuildarbitrarilycomplexprocessingpipelinesthatmakedynamicdecisions
ateachstageofthepipelineastowhattodonext.
Handlingerrors
A
Deferredhasinfactnotonebuttwocallbackchains.Thefirstchainisthe"normal"processingchain,andthesecondistheerrorhandlingchain.Twistedcallsanerrorhandlingcallbackan"errback",sowe'vekeptthattermhere.Whentheasynchronous
processingcompleteswithanerror,the
Deferredmustbegiventhe
Exceptionthatwascaughtinsteadofgivingittheresult(orifno
Exception
wascaught,onemustbecreatedandhandedtothe
Deferred).Whenthecurrentresultofa
Deferredisaninstanceof
Exception,thenexterrbackisinvoked.
Asfornormalcallbacks,whatevertheerrbackreturnsbecomesthecurrentresult.Ifthecurrentresultisstillaninstanceof
Exception,
thenexterrbackisinvoked.Ifthecurrentresultisnolongeran
Exception,
thenextcallbackisinvoked.
Whenacallbackoranerrbackitselfthrowsanexception,itiscaughtbythe
Deferredandbecomesthecurrentresult,whichmeansthatthenexterrbackinthechainwillbeinvokedwiththatexceptioninargument.Notethat
Deferredwill
onlycatch
Exceptions,notany
Throwableor
Error.
ContractandInvariants
Readthiscarefullyasthisisyourwarranty.
A
Deferredcanreceiveonlyoneinitialresult.
Onlyonethreadatatimeisgoingtoexecutethecallbackchain.
Eachactiontakenbyacallback
happens-beforethenextcallbackisinvoked.Inotherwords,ifacallbackchainmanipulatesavariable(andnooneelsemanipulatesit),nosynchronizationisrequired.
Thethreadthatexecutesthecallbackchainisthethreadthathandstheinitialresulttothe
Deferred.Thisclassdoesnotcreateormanageanythreadorexecutor.
Assoonasacallbackisexecuted,the
Deferredwillloseitsreferencetoit.
Everymethodthataddsacallbacktoa
Deferreddoessoin
O(1).
A
Deferredcannotreceiveitselfasaninitialorintermediateresult,asthiswouldcauseaninfiniterecursion.
Youmustnotbuildacycleofmutuallydependant
Deferreds,asthiswouldcauseaninfiniterecursion(thankfully,itwillquicklyfailwitha
CallbackOverflowError).
Callbacksanderrbackscannotreceivea
Deferredinargument.Thisisbecausetheyalwaysreceivetheresultofapreviouscallback,andwhentheresultbecomesa
Deferred,wesuspendtheexecutionofthecallbackchainuntilthe
resultofthatother
Deferredisavailable.
Callbackscannotreceivean
Exceptioninargument.Thisbecausethey'realwaysgiventotheerrbacks.
Usingthemonitorofa
Deferredcanleadtoadeadlock,sodon'tuseit.Inotherwords,writing
synchronized(some_deferred){...}
(oranythingequivalent)voidsyourwarranty.
简单示例:
importcom.stumbleupon.async.Callback;
importcom.stumbleupon.async.Deferred;
publicclassTestDeferred{
/**
*@paramargs
*@throwsException
*/
publicstaticvoidmain(String[]args)throwsException{
System.out.println("张三向李四借钱");
longl=lend().joinUninterruptibly();
System.out.println("张三向李四借钱"+l);
System.out.println("张三开始打牌");
}
publicstaticDeferred<Long>lend(){
System.out.println("李四说回家看看家里还有多少钱");
classGetCBimplementsCallback<Long,Long>{
@Override
publicLongcall(Longarg)throwsException{
returnarg;
}
}
returncheck().addCallback(newGetCB());
}
publicstaticDeferred<Long>check(){
System.out.println("李四在家查看还有多少钱");
try{
Thread.sleep(10*1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnDeferred.fromResult(newLong(1000));
}
}
相关文章推荐
- JavaScript设计模式之装饰者模式定义与应用示例
- Struts2+Spring2+Hibernate3 web应用示例(一)
- 关于RDF的技术支持和应用部分示例
- SmartRoute应用-helloWorld示例
- 示例:Socket应用之简易聊天室
- JQuery Layer应用示例
- 简单的vue-resourse获取json并应用到模板示例
- Unity Container 应用示例
- PointKeyFrame应用示例
- HreoWinGauge2.0组件集应用示例----温度计(2)
- 【EM算法】在高斯混合模型中的应用及python示例
- SODBASE应用示例:通过Restful接口采集数据
- Kurento应用开发指南(以Kurento 5.0为模板) 之四:示例教程 一对一视频呼叫
- c# MongoDB 经纬度应用示例
- 基于三层架构的MVC模式应用示例源码
- libevent2 bufferevent应用示例
- 内存数据库Mongodb在NET中的应用示例
- linq to sql应用小示例
- checkedListBox应用示例
- AJAX源码应用示例2-解决AJAX返回中文乱码问题