您的位置:首页 > Web前端

suasync包中Deferred类API及应用示例

2013-09-30 13:55 148 查看
publicfinalclassDeferred<T>extendsObject

Athread-safeimplementationofadeferredresultforeasyasynchronousprocessing.

ThisimplementationisbasedonTwisted'sPython
Deferred
API.

DeferredReference
Atutorial(Deferredindepth)
Sourcecodeof
defer.py


ThisAPIisasimpleandelegantwayofmanagingasynchronousanddynamic"pipelines"(processingchains)withouthavingtoexplicitlydefineafinitestatemachine.

Thetl;drversion

We'reallbusyanddon'talwayshavetimetoRTFMindetails.Pleasepayspecialattentiontotheinvariantsyoumustrespect.
Otherthanthat,here'sanexecutivesummaryofwhat
Deferred
offers:

A
Deferred
islikea
Future
withadynamic
Callback
chainassociatedtoit.
Whenthedeferredresultbecomesavailable,thecallbackchaingetstriggered.
Theresultofonecallbackinthechainispassedontothenext.
Whenacallbackreturnsanother
Deferred
,thenextcallbackinthechaindoesn'tgetexecuteduntilthatother
Deferred
resultbecomesavailable.
Thereareactuallytwocallbackchains.Oneisfornormalprocessing,theotherisforerrorhandling/recovery.A
Callback

thathandleserrorsiscalledan"errback".
Deferred
isanimportantbuildingblockforwritingeasy-to-useasynchronousAPIsinathread-safefashion.

Understandingtheconceptof
Deferred


Theideaisthata
Deferred
representsaresultthat'snotyetavailable.Anasynchronousoperation(I/O,RPC,whatever)hasbeenstartedandwillhanditsresult(beitsuccessfulornot)tothe
Deferred
inthefuture.Thekeydifference
betweena
Deferred
anda
Future
isthata
Deferred
hasacallbackchainassociatedtoit,whereaswithjusta
Future
youneedgettheresultmanuallyatsomepoint,whichposesproblemssuchas:Howdoyouknowwhentheresultisavailable?Whatiftheresultitself
dependsonanotherfuture?

Whenyoustartanasynchronousoperation,youtypicallywanttobecalledbackwhentheoperationcompletes.Iftheoperationwassuccessful,youwantyourcallbacktouseitsresulttocarryonwhatyouweredoingatthetimeyoustartedtheasynchronousoperation.
Iftherewasanerror,youwanttotriggersomeerrorhandlingcode.

Butthere'smoretoa
Deferred
thanasinglecallback.Youcanaddarbitrarynumberofcallbacks,whicheffectivelyallowsyoutoeasilybuildcomplexprocessingpipelinesinareallysimpleandelegantway.

Understandingthecallbackchain

Let'stakeatypicalexample.You'rewritingaclientlibraryforotherstouseyoursimpleremotestorageservice.Whenyouruserscallthe
get
methodinyourlibrary,youwanttoretrievesomepieceofdatafromyourremoteserviceandhandit
backtotheuser,butyouwanttodosoinanasynchronousfashion.

Whentheuserofyourclientlibraryinvokes
get
,youassemblearequestandsenditouttotheremoteserverthroughasocket.Beforesendingittothesocket,youcreatea
Deferred
andyoustoreitsomewhere,forexampleinamap,
tokeepanassociationbetweentherequestandthis
Deferred
.Youthenreturnthis
Deferred
totheuser,thisishowtheywillaccessthedeferredresultassoonastheRPCcompletes.

Soonerorlater,theRPCwillcomplete(successfullyornot),andyoursocketwillbecomereadable(ormaybeclosed,intheeventofafailure).Let'sassumefornowthateverythingworksasexpected,andthusthesocketisreadable,soyoureadtheresponse
fromthesocket.Atthispointyouextracttheresultoftheremote
get
call,andyouhanditouttothe
Deferred
youcreatedforthisrequest(remember,youhadtostoreitsomewhere,soyoucouldgiveitthedeferredresultonceyou
haveit).The
Deferred
thenstoresthisresultandtriggersanycallbackthatmayhavebeenaddedtoit.Theexpectationisthattheuserofyourclientlibrary,aftercallingyour
get
method,willadda
Callback

tothe
Deferred
yougavethem.Thisway,whenthedeferredresultbecomesavailable,you'llcallitwiththeresultinargument.

Sofarwhatwe'veexplainedisnothingmorethana
Future
withacallbackassociatedtoit.Butthere'smoreto
Deferred
thanjustthis.Let'sassumenowthatsomeoneelsewantstobuildacachinglayerontopofyourclientlibrary,
toavoidrepeatedly
get
tingthesamevalueoverandoveragainthroughthenetwork.Userswhowanttousethecachewillinvoke
get
onthecachinglibraryinsteadofdirectlycallingyourclientlibrary.

Let'sassumethatthecachinglibraryalreadyhasaresultcachedfora
get
call.Itwillcreatea
Deferred
,andimmediatelyhanditthecachedresult,anditwillreturnthis
Deferred
totheuser.Theuserwilladda

Callback
toit,whichwillbeimmediatelyinvokedsincethedeferredresultisalreadyavailable.Sotheentire
get
callcompletedvirtuallyinstantaneouslyandentirelyfromthesamethread.Therewasnocontextswitch(nootherthread
involved,noI/Oandwhatnot),nothingeverblocked,everythingjusthappenedreallyquickly.

Nowlet'sassumethatthecachinglibraryhasacachemissandneedstodoaremote
get
callusingtheoriginalclientlibrarydescribedearlier.TheRPCissentouttotheremoteserverandtheclientlibraryreturnsa
Deferred
tothe
cachinglibrary.Thisiswherethingsbecomeexciting.Thecachinglibrarycanthenadditsowncallbacktothe
Deferred
beforereturningittotheuser.Thiscallbackwilltaketheresultthatcamebackfromtheremoteserver,addittothecache
andreturnit.Asusual,theuserthenaddstheirowncallbacktoprocesstheresult.Sonowthe
Deferred
has2callbacksassociatedtoit:

1stcallback2ndcallback


Deferred:addtocache-->usercallback


WhentheRPCcompletes,theoriginalclientlibrarywillde-serializetheresultfromthewireandhanditouttothe
Deferred
.Thefirstcallbackwillbeinvoked,whichwilladdtheresulttothecacheofthecachinglibrary.Thenwhateverthe
firstcallbackreturnswillbepassedontothesecondcallback.Itturnsoutthatthecachingcallbackreturnsthe
get
responseunchanged,sothatwillbepassedontotheusercallback.

Nowit'sveryimportanttounderstandthatthefirstcallbackcouldhavereturnedanotherarbitraryvalue,andthat'swhatwouldhavebeenpassedtothesecondcallback.Thismaysoundweirdatfirstbutit'sactuallythekeybehind
Deferred
.

Toillustratewhy,let'scomplicatethingsabitmore.Let'sassumetheremoteservicethatservesthose
get
requestsisafairlysimpleandlow-levelstorageservice(think
memcached
),soitonlyworkswithbytearrays,itdoesn'tcarewhatthecontentsis.Sotheoriginalclientlibraryisonlyde-serializingthebytearrayfromthenetworkandhandingthatbytearraytothe
Deferred
.

Nowyou'rewritingahigher-levellibrarythatusesthisstoragesystemtostoresomeofyourcustomobjects.Sowhenyougetthebytearrayfromtheserver,youneedtofurtherde-serializeitintosomekindofanobject.Usersofyourhigher-levellibrary
don'tcareaboutwhatkindofremotestoragesystemyouuse,theonlythingtheycareaboutis
get
tingthoseobjectsasynchronously.Yourhigher-levellibraryisbuiltontopoftheoriginallow-levellibrarythatdoestheRPCcommunication.

Whentheusersofthehigher-levellibrarycall
get
,youcall
get
onthelower-levellibrary,whichissuesanRPCcallandreturnsa
Deferred
tothehigher-levellibrary.Thehigher-levellibrarythenaddsafirstcallbacktofurtherde-serializethebytearrayintoanobject.Thentheuserofthehigher-levellibraryaddstheirowncallbackthatdoessomethingwiththatobject.Sonow
wehavesomethingthatlookslikethis:

1stcallback2ndcallback


Deferred:de-serializetoanobject-->usercallback


Whentheresultcomesinfromthenetwork,thebytearrayisde-serializedfromthesocket.Thefirstcallbackisinvokedanditsargumentistheinitialresult,thebytearray.Sothefirstcallbackfurtherde-serializesitintosomeobject
thatitreturns.Thesecondcallbackistheninvokedanditsargumentistheresultofthepreviouscallback,thatisthede-serializedobject.

Nowbacktothecachinglibrary,whichhasnothingtodowiththehigherlevellibrary.Allitdoesis,givenanobjectthatimplementssomeinterfacewitha
get
method,itkeepsamapofwhateverarguments
get
receivestoan
Object
thatwascachedforthisparticular
get
call.Thankstothewaythecallbackchainworks,it'spossibletousethecachinglibrarytogetherwiththehigher-levellibrarytransparently.Userswhowanttousecachingsimplyneedtousethecachinglibrarytogetherwiththehigherlevel
library.Nowwhentheycall
get
onthecachinglibrary,andthere'sacachemiss,here'swhathappens,stepbystep:

1.Thecachinglibrarycalls
get
onthehigher-levellibrary.

2.Thehigher-levellibrarycalls
get
onthelower-levellibrary.

3.Thelower-levellibrarycreatesa
Deferred
,issuesouttheRPCcallandreturnsits
Deferred
.

4.Thehigher-levellibraryaddsitsownobjectde-serializationcallbacktothe
Deferred
andreturnsit.

5.Thecachinglibraryaddsitsowncache-updatingcallbacktothe
Deferred
andreturnsit.

6.Theusergetsthe
Deferred
andaddstheirowncallbacktodosomethingwiththeobjectretrievedfromthedatastore.

1stcallback2ndcallback3rdcallback


Deferred:de-serialize-->addtocache-->usercallback

result:(noneavailable)


Oncetheresponsecomesback,thefirstcallbackisinvoked,itde-serializestheobject,returnsit.Thecurrentresultofthe
Deferred
becomesthede-serializedobject.Thecurrentstateofthe
Deferred
isasfollows:

2ndcallback3rdcallback


Deferred:addtocache-->usercallback

result:de-serializedobject


Becausetherearemorecallbacksinthechain,the
Deferred
invokesthenextoneandgivesitthecurrentresult(thede-serializedobject)inargument.Thecallbackaddsthatobjecttoitscacheandreturnsitunchanged.

3rdcallback


Deferred:usercallback

result:de-serializedobject


Finally,theuser'scallbackisinvokedwiththeobjectinargument.

Deferred:(nomorecallbacks)

result:(whatevertheuser'scallbackreturned)


Ifyouthinkthisisbecominginteresting,readon,youhaven'treachedthemostinterestingthingabout
Deferred
yet.

BuildingdynamicprocessingpipelineswithDeferred

Let'scomplicatethepreviousexamplealittlebitmore.Let'sassumethattheremotestorageservicethatservesthose
get
callsisadistributedservicethatrunsonmanymachines.Thedataispartitionedovermanynodesandmovesaroundasnodes
comeandgo(duetomachinefailuresandwhatnot).Inordertoexecutea
get
call,thelow-levelclientlibraryfirstneedstoknowwhichserveriscurrentlyservingthatpieceofdata.Let'sassumethatthere'sanotherserver,whichispartofthat
distributedservice,thatmaintainsanindexandkeepstrackofwhereeachpieceofdatais.Thelow-levelclientlibraryfirstneedstolookupthelocationofthedatausingthatfirstserver(that'safirstRPC),thenretrievesitfromthestoragenode(that's
anotherRPC).Endusersdon'tcarethatretrievingdatainvolvesa2-stepprocess,theyjustwanttocall
get
andbecalledbackwhenthedata(abytearray)isavailable.

Thisiswherewhat'sprobablythemostusefulfeatureof
Deferred
comesin.Whentheusercalls
get
,thelow-levellibrarywillissueafirstRPCtotheindexservertolocatethepieceofdatarequestedbytheuser.Whenissuingthis
lookup
RPC,
a
Deferred
getscreated.Thelow-level
get
codeaddsafirstcallbacktoprocessthe
lookup
responseandthenreturnsittotheuser.

1stcallback2ndcallback


Deferred:indexlookup-->usercallback

result:(noneavailable)


Eventually,the
lookup
RPCcompletes,andthe
Deferred
isgiventhe
lookup
response.Sobeforetriggeringthefirstcallback,the
Deferred
willbeinthisstate:

1stcallback2ndcallback


Deferred:indexlookup-->usercallback

result:lookupresponse


Thefirstcallbackrunsandnowknowswheretofindthepieceofdatainitiallyrequested.Itissuesthe
get
requesttotherightstoragenode.Doingsocreates
another
Deferred
,let'scallit
(B)
,whichisthenreturnedbythe
indexlookup
callback.Andthisiswherethemagichappens.Nowwe'reinthisstate:

(A)2ndcallback|(B)

|

Deferred:usercallback|Deferred:(nomorecallbacks)

result:Deferred(B)|result:(noneavailable)


Becauseacallbackreturneda
Deferred
,wecan'tinvoketheusercallbackjustyet,sincetheuserdoesn'twanttheircallbackreceivea
Deferred
,theywantittoreceiveabytearray.Thecurrentcallbackgetspausedandstops
processingthecallbackchain.Thiscallbackchainneedstoberesumedwheneverthe
Deferred
ofthe
get
call[
(B)
]completes.Inordertoachievethat,acallbackisaddedtothatother
Deferred
thatwill
resumetheexecutionofthecallbackchain.

(A)2ndcallback|(B)1stcallback

|

Deferred:usercallback|Deferred:resume(A)

result:Deferred(B)|result:(noneavailable)


Once
(A)
addedthecallbackon
(B)
,itcanreturnimmediately,there'snoneedtowait,blockathreadoranythinglikethat.Sothewholeprocessofreceivingthe
lookup
responseandsendingoutthe
get
RPChappenedreallyquickly,withoutblockinganything.

Nowwhenthe
get
responsecomesbackfromthenetwork,theRPClayerde-serializesthebytearray,asusual,andhandsitto
(B)
:

(A)2ndcallback|(B)1stcallback

|

Deferred:usercallback|Deferred:resume(A)

result:Deferred(B)|result:bytearray


(B)
'sfirstandonlycallbackisgoingtosettheresultof
(A)
andresume
(A)
'scallbackchain.

(A)2ndcallback|(B)1stcallback

|

Deferred:usercallback|Deferred:resume(A)

result:bytearray|result:bytearray


Sonow
(A)
resumesitscallbackchain,andinvokestheuser'scallbackwiththebytearrayinargument,whichiswhattheywanted.

(A)|(B)1stcallback

|

Deferred:(nomorecb)|Deferred:resume(A)

result:(returnvalueof|result:bytearray

theuser'scb)


Then
(B)
movesontoitsnextcallbackinthechain,buttherearenone,so
(B)
isdonetoo.

(A)|(B)

|

Deferred:(nomorecb)|Deferred:(nomorecb)

result:(returnvalueof|result:bytearray

theuser'scb)


Thewholeprocessofreadingthe
get
response,resumingtheinitial
Deferred
andexecutingthesecond
Deferred
happenedallinthesamethread,sequentially,andwithoutblockinganything(providedthattheuser'scallbackdidn'tblock,asitmustnot).

Whatwe'vedoneisessentiallyequivalenttodynamicallybuildinganimplicitfinitestatemachinetohandlethelifecycleofthe
get
request.ThissimpleAPIallowsyoutobuildarbitrarilycomplexprocessingpipelinesthatmakedynamicdecisions
ateachstageofthepipelineastowhattodonext.

Handlingerrors

A
Deferred
hasinfactnotonebuttwocallbackchains.Thefirstchainisthe"normal"processingchain,andthesecondistheerrorhandlingchain.Twistedcallsanerrorhandlingcallbackan"errback",sowe'vekeptthattermhere.Whentheasynchronous
processingcompleteswithanerror,the
Deferred
mustbegiventhe
Exception
thatwascaughtinsteadofgivingittheresult(orifno
Exception

wascaught,onemustbecreatedandhandedtothe
Deferred
).Whenthecurrentresultofa
Deferred
isaninstanceof
Exception
,thenexterrbackisinvoked.
Asfornormalcallbacks,whatevertheerrbackreturnsbecomesthecurrentresult.Ifthecurrentresultisstillaninstanceof
Exception
,
thenexterrbackisinvoked.Ifthecurrentresultisnolongeran
Exception
,
thenextcallbackisinvoked.

Whenacallbackoranerrbackitselfthrowsanexception,itiscaughtbythe
Deferred
andbecomesthecurrentresult,whichmeansthatthenexterrbackinthechainwillbeinvokedwiththatexceptioninargument.Notethat
Deferred
will
onlycatch
Exception
s,notany
Throwable
or
Error
.

ContractandInvariants

Readthiscarefullyasthisisyourwarranty.

A
Deferred
canreceiveonlyoneinitialresult.
Onlyonethreadatatimeisgoingtoexecutethecallbackchain.
Eachactiontakenbyacallback
happens-beforethenextcallbackisinvoked.Inotherwords,ifacallbackchainmanipulatesavariable(andnooneelsemanipulatesit),nosynchronizationisrequired.
Thethreadthatexecutesthecallbackchainisthethreadthathandstheinitialresulttothe
Deferred
.Thisclassdoesnotcreateormanageanythreadorexecutor.
Assoonasacallbackisexecuted,the
Deferred
willloseitsreferencetoit.
Everymethodthataddsacallbacktoa
Deferred
doessoin
O(1)
.
A
Deferred
cannotreceiveitselfasaninitialorintermediateresult,asthiswouldcauseaninfiniterecursion.
Youmustnotbuildacycleofmutuallydependant
Deferred
s,asthiswouldcauseaninfiniterecursion(thankfully,itwillquicklyfailwitha
CallbackOverflowError
).
Callbacksanderrbackscannotreceivea
Deferred
inargument.Thisisbecausetheyalwaysreceivetheresultofapreviouscallback,andwhentheresultbecomesa
Deferred
,wesuspendtheexecutionofthecallbackchainuntilthe
resultofthatother
Deferred
isavailable.
Callbackscannotreceivean
Exception
inargument.Thisbecausethey'realwaysgiventotheerrbacks.
Usingthemonitorofa
Deferred
canleadtoadeadlock,sodon'tuseit.Inotherwords,writing

synchronized(some_deferred){...}

(oranythingequivalent)voidsyourwarranty.

简单示例:

packagetest;

importcom.stumbleupon.async.Callback;
importcom.stumbleupon.async.Deferred;

publicclassTestDeferred{

/**
*@paramargs
*@throwsException
*/
publicstaticvoidmain(String[]args)throwsException{

System.out.println("张三向李四借钱");
longl=lend().joinUninterruptibly();

System.out.println("张三向李四借钱"+l);
System.out.println("张三开始打牌");
}

publicstaticDeferred<Long>lend(){
System.out.println("李四说回家看看家里还有多少钱");

classGetCBimplementsCallback<Long,Long>{

@Override
publicLongcall(Longarg)throwsException{
returnarg;
}
}

returncheck().addCallback(newGetCB());
}

publicstaticDeferred<Long>check(){
System.out.println("李四在家查看还有多少钱");
try{
Thread.sleep(10*1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnDeferred.fromResult(newLong(1000));
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: