scala Futures and Promises
2015-01-27 17:02
309 查看
By:PhilippHaller,AleksandarProkopec,HeatherMiller,ViktorKlang,RolandKuhn,andVojinJovanovic
Introduction
Futuresprovideanicewaytoreasonaboutperformingmanyoperationsinparallel–inanefficientandnon-blockingway.Theideaissimple,aFutureisasortofaplaceholderobjectthatyoucancreateforaresultthatdoesnotyetexist.Generally,theresultofthe
Futureiscomputedconcurrentlyandcanbelatercollected.Composingconcurrenttasksinthiswaytendstoresultinfaster,asynchronous,non-blockingparallelcode.
Bydefault,futuresandpromisesarenon-blocking,makinguseofcallbacksinsteadoftypicalblockingoperations.Tosimplifytheuseofcallbacksbothsyntacticallyandconceptually,Scalaprovidescombinatorssuchas
flatMap,
foreach,and
filterusedtocomposefuturesinanon-blockingway.Blockingisstillpossible-forcaseswhereitisabsolutelynecessary,futurescanbeblockedon(althoughthisisdiscouraged).
Futures
AFutureisanobjectholdingavaluewhichmaybecomeavailableatsomepoint.Thisvalueisusuallytheresultofsomeothercomputation:
Ifthecomputationhasnotyetcompleted,wesaythatthe
Futureisnotcompleted.
Ifthecomputationhascompletedwithavalueorwithanexception,wesaythatthe
Futureiscompleted.
Completioncantakeoneoftwoforms:
Whena
Futureiscompletedwithavalue,wesaythatthefuturewassuccessfullycompletedwiththatvalue.
Whena
Futureiscompletedwithanexceptionthrownbythecomputation,wesaythatthe
Futurewasfailedwiththatexception.
A
Futurehasanimportantpropertythatitmayonlybeassignedonce.Oncea
Futureobjectisgivenavalueoranexception,itbecomesineffectimmutable–itcanneverbeoverwritten.
Thesimplestwaytocreateafutureobjectistoinvokethe
futuremethodwhichstartsanasynchronouscomputationandreturnsafutureholdingtheresultofthatcomputation.Theresultbecomesavailableoncethefuturecompletes.
Notethat
Future[T]isatypewhichdenotesfutureobjects,whereas
futureisamethodwhichcreatesandschedulesanasynchronouscomputation,andthenreturnsafutureobjectwhichwillbecompletedwiththeresultofthatcomputation.
Thisisbestshownthroughanexample.
Let’sassumethatwewanttouseahypotheticalAPIofsomepopularsocialnetworktoobtainalistoffriendsforagivenuser.Wewillopenanewsessionandthensendarequesttoobtainalistoffriendsofaparticularuser:
importscala.concurrent._
importExecutionContext.Implicits.global
valsession=socialNetwork.createSessionFor("user",credentials)
valf:Future[List[Friend]]=future{
session.getFriends()
}
[/code]Above,wefirstimportthecontentsofthe
scala.concurrentpackagetomakethetype
Futureandtheconstruct
futurevisible.Wewillexplainthesecondimportshortly.
Wetheninitializeasessionvariablewhichwewillusetosendrequeststotheserver,usingahypothetical
createSessionFormethod.Toobtainthelistoffriendsofauser,arequesthastobesentoveranetwork,whichcantakealongtime.Thisisillustratedwiththecalltothemethod
getFriendsthatreturns
List[Friend].TobetterutilizetheCPUuntiltheresponsearrives,weshouldnotblocktherestoftheprogram–thiscomputationshouldbescheduledasynchronously.The
futuremethoddoesexactlythat–itperformsthespecifiedcomputationblockconcurrently,inthiscasesendingarequesttotheserverandwaitingforaresponse.
Thelistoffriendsbecomesavailableinthefuture
foncetheserverresponds.
Anunsuccessfulattemptmayresultinanexception.Inthefollowingexample,the
sessionvalueisincorrectlyinitialized,sothecomputationinthe
futureblockwillthrowa
NullPointerException.Thisfuture
fisthenfailedwiththisexceptioninsteadofbeingcompletedsuccessfully:
valsession=null
valf:Future[List[Friend]]=future{
session.getFriends
}
[/code]Theline
importExecutionContext.Implicits.globalaboveimportsthedefaultglobalexecutioncontext.Executioncontextsexecutetaskssubmittedtothem,andyoucanthinkofexecutioncontextsasthreadpools.Theyareessentialforthe
futuremethodbecausetheyhandlehowandwhentheasynchronouscomputationisexecuted.Youcandefineyourownexecutioncontextsandusethemwith
future,butfornowitissufficienttoknowthatyoucanimportthedefaultexecutioncontextasshownabove.
OurexamplewasbasedonahypotheticalsocialnetworkAPIwherethecomputationconsistsofsendinganetworkrequestandwaitingforaresponse.Itisfairtoofferanexampleinvolvinganasynchronouscomputationwhichyoucantryoutofthebox.Assumeyouhaveatextfileandyouwanttofindthepositionofthefirstoccurrenceofaparticularkeyword.Thiscomputationmayinvolveblockingwhilethefilecontentsarebeingretrievedfromthedisk,soitmakessensetoperformitconcurrentlywiththerestofthecomputation.
valfirstOccurrence:Future[Int]=future{
valsource=scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
[/code]
Callbacks
Wenowknowhowtostartanasynchronouscomputationtocreateanewfuturevalue,butwehavenotshownhowtousetheresultonceitbecomesavailable,sothatwecandosomethingusefulwithit.Weareofteninterestedintheresultofthecomputation,notjustitsside-effects.Inmanyfutureimplementations,oncetheclientofthefuturebecomesinterestedinitsresult,ithastoblockitsowncomputationandwaituntilthefutureiscompleted–onlythencanitusethevalueofthefuturetocontinueitsowncomputation.AlthoughthisisallowedbytheScala
FutureAPIaswewillshowlater,fromaperformancepointofviewabetterwaytodoitisinacompletelynon-blockingway,byregisteringacallbackonthefuture.Thiscallbackiscalledasynchronouslyoncethefutureiscompleted.Ifthefuturehasalreadybeencompletedwhenregisteringthecallback,thenthecallbackmayeitherbeexecutedasynchronously,orsequentiallyonthesamethread.
Themostgeneralformofregisteringacallbackisbyusingthe
onCompletemethod,whichtakesacallbackfunctionoftype
Try[T]=>U.Thecallbackisappliedtothevalueoftype
Success[T]ifthefuturecompletessuccessfully,ortoavalueoftype
Failure[T]otherwise.
The
Try[T]issimilarto
Option[T]or
Either[T,S],inthatitisamonadpotentiallyholdingavalueofsometype.However,ithasbeenspecificallydesignedtoeitherholdavalueorsomethrowableobject.Wherean
Option[T]couldeitherbeavalue(i.e.
Some[T])ornovalueatall(i.e.
None),
Try[T]isa
Success[T]whenitholdsavalueandotherwise
Failure[T],whichholdsanexception.
Failure[T]holdsmoreinformationthatjustaplain
Nonebysayingwhythevalueisnotthere.Inthesametime,youcanthinkof
Try[T]asaspecialversionof
Either[Throwable,T],specializedforthecasewhentheleftvalueisa
Throwable.
Comingbacktooursocialnetworkexample,let’sassumewewanttofetchalistofourownrecentpostsandrenderthemtothescreen.Wedosobycallingamethod
getRecentPostswhichreturnsa
List[String]–alistofrecenttextualposts:
importscala.util.{Success,Failure}
valf:Future[List[String]]=future{
session.getRecentPosts
}
fonComplete{
caseSuccess(posts)=>for(post<-posts)println(post)
caseFailure(t)=>println("Anerrorhasoccured:"+t.getMessage)
}
[/code]The
onCompletemethodisgeneralinthesensethatitallowstheclienttohandletheresultofbothfailedandsuccessfulfuturecomputations.Tohandleonlysuccessfulresults,the
onSuccesscallbackisused(whichtakesapartialfunction):
valf:Future[List[String]]=future{
session.getRecentPosts
}
fonSuccess{
caseposts=>for(post<-posts)println(post)
}
[/code]Tohandlefailedresults,the
onFailurecallbackisused:
valf:Future[List[String]]=future{
session.getRecentPosts
}
fonFailure{
caset=>println("Anerrorhasoccured:"+t.getMessage)
}
fonSuccess{
caseposts=>for(post<-posts)println(post)
}
[/code]The
onFailurecallbackisonlyexecutedifthefuturefails,thatis,ifitcontainsanexception.
Sincepartialfunctionshavethe
isDefinedAtmethod,the
onFailuremethodonlytriggersthecallbackifitisdefinedforaparticular
Throwable.Inthefollowingexampletheregistered
onFailurecallbackisnevertriggered:
valf=future{
2/0
}
fonFailure{
casenpe:NullPointerException=>
println("I'dbeamazedifthisprintedout.")
}
[/code]Comingbacktothepreviousexamplewithsearchingforthefirstoccurrenceofakeyword,youmightwanttoprintthepositionofthekeywordtothescreen:
valfirstOccurrence:Future[Int]=future{
valsource=scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrenceonSuccess{
caseidx=>println("Thekeywordfirstappearsatposition:"+idx)
}
firstOccurrenceonFailure{
caset=>println("Couldnotprocessfile:"+t.getMessage)
}
[/code]The
onComplete,
onSuccess,and
onFailuremethodshaveresulttype
Unit,whichmeansinvocationsofthesemethodscannotbechained.Notethatthisdesignisintentional,toavoidsuggestingthatchainedinvocationsmayimplyanorderingontheexecutionoftheregisteredcallbacks(callbacksregisteredonthesamefutureareunordered).
Thatsaid,weshouldnowcommentonwhenexactlythecallbackgetscalled.Sinceitrequiresthevalueinthefuturetobeavailable,itcanonlybecalledafterthefutureiscompleted.However,thereisnoguaranteeitwillbecalledbythethreadthatcompletedthefutureorthethreadwhichcreatedthecallback.Instead,thecallbackisexecutedbysomethread,atsometimeafterthefutureobjectiscompleted.Wesaythatthecallbackisexecutedeventually.
Furthermore,theorderinwhichthecallbacksareexecutedisnotpredefined,evenbetweendifferentrunsofthesameapplication.Infact,thecallbacksmaynotbecalledsequentiallyoneaftertheother,butmayconcurrentlyexecuteatthesametime.Thismeansthatinthefollowingexamplethevariable
totalAmaynotbesettothecorrectnumberoflowercaseanduppercase
acharactersfromthecomputedtext.
@volatilevartotalA=0
valtext=future{
"na"*16+"BATMAN!!!"
}
textonSuccess{
casetxt=>totalA+=txt.count(_=='a')
}
textonSuccess{
casetxt=>totalA+=txt.count(_=='A')
}
[/code]Above,thetwocallbacksmayexecuteoneaftertheother,inwhichcasethevariable
totalAholdstheexpectedvalue
18.However,theycouldalsoexecuteconcurrently,so
totalAcouldendupbeingeither
16or
2,since
+=isnotanatomicoperation(i.e.itconsistsofareadandawritestepwhichmayinterleavearbitrarilywithotherreadsandwrites).
Forthesakeofcompletenessthesemanticsofcallbacksarelistedhere:
Registeringan
onCompletecallbackonthefutureensuresthatthecorrespondingclosureisinvokedafterthefutureiscompleted,eventually.
Registeringan
onSuccessor
onFailurecallbackhasthesamesemanticsas
onComplete,withthedifferencethattheclosureisonlycalledifthefutureiscompletedsuccessfullyorfails,respectively.
Registeringacallbackonthefuturewhichisalreadycompletedwillresultinthecallbackbeingexecutedeventually(asimpliedby1).
Intheeventthatmultiplecallbacksareregisteredonthefuture,theorderinwhichtheyareexecutedisnotdefined.Infact,thecallbacksmaybeexecutedconcurrentlywithoneanother.However,aparticular
ExecutionContextimplementationmayresultinawell-definedorder.
Intheeventthatsomeofthecallbacksthrowanexception,theothercallbacksareexecutedregardless.
Intheeventthatsomeofthecallbacksnevercomplete(e.g.thecallbackcontainsaninfiniteloop),theothercallbacksmaynotbeexecutedatall.Inthesecases,apotentiallyblockingcallbackmustusethe
blockingconstruct(seebelow).
Onceexecuted,thecallbacksareremovedfromthefutureobject,thusbeingeligibleforGC.
FunctionalCompositionandFor-Comprehensions
Thecallbackmechanismwehaveshownissufficienttochainfutureresultswithsubsequentcomputations.However,itissometimesinconvenientandresultsinbulkycode.Weshowthiswithanexample.AssumewehaveanAPIforinterfacingwithacurrencytradingservice.SupposewewanttobuyUSdollars,butonlywhenit’sprofitable.Wefirstshowhowthiscouldbedoneusingcallbacks:valrateQuote=future{
connection.getCurrentValue(USD)
}
rateQuoteonSuccess{casequote=>
valpurchase=future{
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
purchaseonSuccess{
case_=>println("Purchased"+amount+"USD")
}
}
[/code]Westartbycreatingafuture
rateQuotewhichgetsthecurrentexchangerate.Afterthisvalueisobtainedfromtheserverandthefuturesuccessfullycompleted,thecomputationproceedsinthe
onSuccesscallbackandwearereadytodecidewhethertobuyornot.Wethereforecreateanotherfuture
purchasewhichmakesadecisiontobuyonlyifit’sprofitabletodoso,andthensendsarequest.Finally,oncethepurchaseiscompleted,weprintanotificationmessagetothestandardoutput.
Thisworks,butisinconvenientfortworeasons.First,wehavetouse
onSuccess,andwehavetonestthesecond
purchasefuturewithinit.Imaginethatafterthe
purchaseiscompletedwewanttosellsomeothercurrency.Wewouldhavetorepeatthispatternwithinthe
onSuccesscallback,makingthecodeoverlyindented,bulkyandhardtoreasonabout.
Second,the
purchasefutureisnotinthescopewiththerestofthecode–itcanonlybeacteduponfromwithinthe
onSuccesscallback.Thismeansthatotherpartsoftheapplicationdonotseethe
purchasefutureandcannotregisteranother
onSuccesscallbacktoit,forexample,tosellsomeothercurrency.
Forthesetworeasons,futuresprovidecombinatorswhichallowamorestraightforwardcomposition.Oneofthebasiccombinatorsis
map,which,givenafutureandamappingfunctionforthevalueofthefuture,producesanewfuturethatiscompletedwiththemappedvalueoncetheoriginalfutureissuccessfullycompleted.Youcanreasonaboutmappingfuturesinthesamewayyoureasonaboutmappingcollections.
Let’srewritethepreviousexampleusingthe
mapcombinator:
valrateQuote=future{
connection.getCurrentValue(USD)
}
valpurchase=rateQuotemap{quote=>
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
purchaseonSuccess{
case_=>println("Purchased"+amount+"USD")
}
[/code]Byusing
mapon
rateQuotewehaveeliminatedone
onSuccesscallbackand,moreimportantly,thenesting.Ifwenowdecidetosellsomeothercurrency,itsufficestouse
mapon
purchaseagain.
Butwhathappensif
isProfitablereturns
false,hencecausinganexceptiontobethrown?Inthatcase
purchaseisfailedwiththatexception.Furthermore,imaginethattheconnectionwasbrokenandthat
getCurrentValuethrewanexception,failing
rateQuote.Inthatcasewe’dhavenovaluetomap,sothe
purchasewouldautomaticallybefailedwiththesameexceptionas
rateQuote.
Inconclusion,iftheoriginalfutureiscompletedsuccessfullythenthereturnedfutureiscompletedwithamappedvaluefromtheoriginalfuture.Ifthemappingfunctionthrowsanexceptionthefutureiscompletedwiththatexception.Iftheoriginalfuturefailswithanexceptionthenthereturnedfuturealsocontainsthesameexception.Thisexceptionpropagatingsemanticsispresentintherestofthecombinators,aswell.
Oneofthedesigngoalsforfutureswastoenabletheiruseinfor-comprehensions.Forthisreason,futuresalsohavethe
flatMap,
filterand
foreachcombinators.The
flatMapmethodtakesafunctionthatmapsthevaluetoanewfuture
g,andthenreturnsafuturewhichiscompletedonce
giscompleted.
LetsassumethatwewanttoexchangeUSdollarsforSwissfrancs(CHF).Wehavetofetchquotesforbothcurrencies,andthendecideonbuyingbasedonbothquotes.Hereisanexampleof
flatMapand
withFilterusagewithinfor-comprehensions:
valusdQuote=future{connection.getCurrentValue(USD)}
valchfQuote=future{connection.getCurrentValue(CHF)}
valpurchase=for{
usd<-usdQuote
chf<-chfQuote
ifisProfitable(usd,chf)
}yieldconnection.buy(amount,chf)
purchaseonSuccess{
case_=>println("Purchased"+amount+"CHF")
}
[/code]The
purchasefutureiscompletedonlyonceboth
usdQuoteand
chfQuotearecompleted–itdependsonthevaluesofboththesefuturessoitsowncomputationcannotbeginearlier.
Thefor-comprehensionaboveistranslatedinto:
valpurchase=usdQuoteflatMap{
usd=>
chfQuote
.withFilter(chf=>isProfitable(usd,chf))
.map(chf=>connection.buy(amount,chf))
}
[/code]whichisabithardertograspthanthefor-comprehension,butweanalyzeittobetterunderstandthe
flatMapoperation.The
flatMapoperationmapsitsownvalueintosomeotherfuture.Oncethisdifferentfutureiscompleted,theresultingfutureiscompletedwithitsvalue.Inourexample,
flatMapusesthevalueofthe
usdQuotefuturetomapthevalueofthe
chfQuoteintoathirdfuturewhichsendsarequesttobuyacertainamountofSwissfrancs.Theresultingfuture
purchaseiscompletedonlyoncethisthirdfuturereturnedfrom
mapcompletes.
Thiscanbemind-boggling,butfortunatelythe
flatMapoperationisseldomusedoutsidefor-comprehensions,whichareeasiertouseandunderstand.
The
filtercombinatorcreatesanewfuturewhichcontainsthevalueoftheoriginalfutureonlyifitsatisfiessomepredicate.Otherwise,thenewfutureisfailedwitha
NoSuchElementException.Forfuturescalling
filterhasexactlythesameeffectasdoescalling
withFilter.
Therelationshipbetweenthe
collectand
filtercombinatorissimilartotherelationshipofthesemethodsinthecollectionsAPI.
Itisimportanttonotethatcallingthe
foreachcombinatordoesnotblocktotraversethevalueonceitbecomesavailable.Instead,thefunctionforthe
foreachgetsasynchronouslyexecutedonlyifthefutureiscompletedsuccessfully.Thismeansthatthe
foreachhasexactlythesamesemanticsasthe
onSuccesscallback.
Sincethe
Futuretraitcanconceptuallycontaintwotypesofvalues(computationresultsandexceptions),thereexistsaneedforcombinatorswhichhandleexceptions.
Let’sassumethatbasedonthe
rateQuotewedecidetobuyacertainamount.The
connection.buymethodtakesan
amounttobuyandtheexpected
quote.Itreturnstheamountbought.Ifthe
quotehaschangedinthemeanwhile,itwillthrowa
QuoteChangedExceptionanditwillnotbuyanything.Ifwewantourfuturetocontain
0insteadoftheexception,weusethe
recovercombinator:
valpurchase:Future[Int]=rateQuotemap{
quote=>connection.buy(amount,quote)
}recover{
caseQuoteChangedException()=>0
}
[/code]The
recovercombinatorcreatesanewfuturewhichholdsthesameresultastheoriginalfutureifitcompletedsuccessfully.Ifitdidnotthenthepartialfunctionargumentisappliedtothe
Throwablewhichfailedtheoriginalfuture.Ifitmapsthe
Throwabletosomevalue,thenthenewfutureissuccessfullycompletedwiththatvalue.Ifthepartialfunctionisnotdefinedonthat
Throwable,thentheresultingfutureisfailedwiththesame
Throwable.
The
recoverWithcombinatorcreatesanewfuturewhichholdsthesameresultastheoriginalfutureifitcompletedsuccessfully.Otherwise,thepartialfunctionisappliedtothe
Throwablewhichfailedtheoriginalfuture.Ifitmapsthe
Throwabletosomefuture,thenthisfutureiscompletedwiththeresultofthatfuture.Itsrelationto
recoverissimilartothatof
flatMapto
map.
Combinator
fallbackTocreatesanewfuturewhichholdstheresultofthisfutureifitwascompletedsuccessfully,orotherwisethesuccessfulresultoftheargumentfuture.Intheeventthatboththisfutureandtheargumentfuturefail,thenewfutureiscompletedwiththeexceptionfromthisfuture,asinthefollowingexamplewhichtriestoprintUSdollarvalue,butprintstheSwissfrancvalueinthecaseitfailstoobtainthedollarvalue:
valusdQuote=future{
connection.getCurrentValue(USD)
}map{
usd=>"Value:"+usd+"$"
}
valchfQuote=future{
connection.getCurrentValue(CHF)
}map{
chf=>"Value:"+chf+"CHF"
}
valanyQuote=usdQuotefallbackTochfQuote
anyQuoteonSuccess{println(_)}
[/code]The
andThencombinatorisusedpurelyforside-effectingpurposes.Itreturnsanewfuturewithexactlythesameresultasthecurrentfuture,regardlessofwhetherthecurrentfuturefailedornot.Oncethecurrentfutureiscompletedwiththeresult,theclosurecorrespondingtothe
andThenisinvokedandthenthenewfutureiscompletedwiththesameresultasthisfuture.Thisensuresthatmultiple
andThencallsareordered,asinthefollowingexamplewhichstorestherecentpostsfromasocialnetworktoamutablesetandthenrendersallthepoststothescreen:
valallposts=mutable.Set[String]()
future{
session.getRecentPosts
}andThen{
posts=>allposts++=posts
}andThen{
posts=>
clearAll()
for(post<-allposts)render(post)
}
[/code]Insummary,thecombinatorsonfuturesarepurelyfunctional.Everycombinatorreturnsanewfuturewhichisrelatedtothefutureitwasderivedfrom.
Projections
Toenablefor-comprehensionsonaresultreturnedasanexception,futuresalsohaveprojections.Iftheoriginalfuturefails,thefailedprojectionreturnsafuturecontainingavalueoftype
Throwable.Iftheoriginalfuturesucceeds,the
failedprojectionfailswitha
NoSuchElementException.Thefollowingisanexamplewhichprintstheexceptiontothescreen:
valf=future{
2/0
}
for(exc<-f.failed)println(exc)
[/code]Thefollowingexampledoesnotprintanythingtothescreen:
valf=future{
4/2
}
for(exc<-f.failed)println(exc)
[/code]
ExtendingFutures
SupportforextendingtheFuturesAPIwithadditionalutilitymethodsisplanned.Thiswillallowexternalframeworkstoprovidemorespecializedutilities.Blocking
Asmentionedearlier,blockingonafutureisstronglydiscouragedforthesakeofperformanceandforthepreventionofdeadlocks.Callbacksandcombinatorsonfuturesareapreferredwaytousetheirresults.However,blockingmaybenecessaryincertainsituationsandissupportedbytheFuturesandPromisesAPI.Inthecurrencytradingexampleabove,oneplacetoblockisattheendoftheapplicationtomakesurethatallofthefutureshavebeencompleted.Hereisanexampleofhowtoblockontheresultofafuture:
importscala.concurrent._
importscala.concurrent.duration._
defmain(args:Array[String]){
valrateQuote=future{
connection.getCurrentValue(USD)
}
valpurchase=rateQuotemap{quote=>
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
Await.result(purchase,0nanos)
}
[/code]Inthecasethatthefuturefails,thecallerisforwardedtheexceptionthatthefutureisfailedwith.Thisincludesthe
failedprojection–blockingonitresultsina
NoSuchElementExceptionbeingthrowniftheoriginalfutureiscompletedsuccessfully.
Alternatively,calling
Await.readywaitsuntilthefuturebecomescompleted,butdoesnotretrieveitsresult.Inthesameway,callingthatmethodwillnotthrowanexceptionifthefutureisfailed.
The
Futuretraitimplementsthe
Awaitabletraitwithmethodsmethod
ready()and
result().Thesemethodscannotbecalleddirectlybytheclients–theycanonlybecalledbytheexecutioncontext.
Toallowclientstocall3rdpartycodewhichispotentiallyblockingandavoidimplementingthe
Awaitabletrait,thesame
blockingprimitivecanalsobeusedinthefollowingform:
blocking{
potentiallyBlockingCall()
}
[/code]Theblockingcodemayalsothrowanexception.Inthiscase,theexceptionisforwardedtothecaller.
Exceptions
Whenasynchronouscomputationsthrowunhandledexceptions,futuresassociatedwiththosecomputationsfail.FailedfuturesstoreaninstanceofThrowableinsteadoftheresultvalue.
Futuresprovidethe
onFailurecallbackmethod,whichacceptsa
PartialFunctiontobeappliedtoa
Throwable.Thefollowingspecialexceptionsaretreateddifferently:
scala.runtime.NonLocalReturnControl[_]–thisexceptionholdsavalueassociatedwiththereturn.Typically,
returnconstructsinmethodbodiesaretranslatedto
throwswiththisexception.Insteadofkeepingthisexception,theassociatedvalueisstoredintothefutureorapromise.
ExecutionException-storedwhenthecomputationfailsduetoanunhandled
InterruptedException,
Errorora
scala.util.control.ControlThrowable.Inthiscasethe
ExecutionExceptionhastheunhandledexceptionasitscause.Theseexceptionsarerethrowninthethreadexecutingthefailedasynchronouscomputation.Therationalebehindthisistopreventpropagationofcriticalandcontrol-flowrelatedexceptionsnormallynothandledbytheclientcodeandatthesametimeinformtheclientinwhichfuturethecomputationfailed.
See
NonFatalforamoreprecisesemanticsdescription.
Promises
SofarwehaveonlyconsideredFutureobjectscreatedbyasynchronouscomputationsstartedusingthe
futuremethod.However,futurescanalsobecreatedusingpromises.
Whilefuturesaredefinedasatypeofread-onlyplaceholderobjectcreatedforaresultwhichdoesn’tyetexist,apromisecanbethoughtofasawritable,single-assignmentcontainer,whichcompletesafuture.Thatis,apromisecanbeusedtosuccessfullycompleteafuturewithavalue(by“completing”thepromise)usingthe
successmethod.Conversely,apromisecanalsobeusedtocompleteafuturewithanexception,byfailingthepromise,usingthe
failuremethod.
Apromise
pcompletesthefuturereturnedby
p.future.Thisfutureisspecifictothepromise
p.Dependingontheimplementation,itmaybethecasethat
p.futureeqp.
Considerthefollowingproducer-consumerexample,inwhichonecomputationproducesavalueandhandsitofftoanothercomputationwhichconsumesthatvalue.Thispassingofthevalueisdoneusingapromise.
importscala.concurrent.{future,promise}
importscala.concurrent.ExecutionContext.Implicits.global
valp=promise[T]
valf=p.future
valproducer=future{
valr=produceSomething()
psuccessr
continueDoingSomethingUnrelated()
}
valconsumer=future{
startDoingSomething()
fonSuccess{
caser=>doSomethingWithResult()
}
}
[/code]Here,wecreateapromiseanduseits
futuremethodtoobtainthe
Futurethatitcompletes.Then,webegintwoasynchronouscomputations.Thefirstdoessomecomputation,resultinginavalue
r,whichisthenusedtocompletethefuture
f,byfulfillingthepromise
p.Theseconddoessomecomputation,andthenreadstheresult
rofthecompletedfuture
f.Notethatthe
consumercanobtaintheresultbeforethe
producertaskisfinishedexecutingthe
continueDoingSomethingUnrelated()method.
Asmentionedbefore,promiseshavesingle-assignmentsemantics.Assuch,theycanbecompletedonlyonce.Calling
successonapromisethathasalreadybeencompleted(orfailed)willthrowan
IllegalStateException.
Thefollowingexampleshowshowtofailapromise.
valp=promise[T]
valf=p.future
valproducer=future{
valr=someComputation
if(isInvalid(r))
pfailure(newIllegalStateException)
else{
valq=doSomeMoreComputation(r)
psuccessq
}
}
[/code]Here,the
producercomputesanintermediateresult
r,andcheckswhetherit’svalid.Inthecasethatit’sinvalid,itfailsthepromisebycompletingthepromise
pwithanexception.Inthiscase,theassociatedfuture
fisfailed.Otherwise,the
producercontinuesitscomputation,andfinallycompletesthefuture
fwithavalidresult,bycompletingpromise
p.
Promisescanalsobecompletedwitha
completemethodwhichtakesapotentialvalue
Try[T]–eitherafailedresultoftype
Failure[Throwable]orasuccessfulresultoftype
Success[T].
Analogousto
success,calling
failureand
completeonapromisethathasalreadybeencompletedwillthrowan
IllegalStateException.
Onenicepropertyofprogramswrittenusingpromiseswithoperationsdescribedsofarandfutureswhicharecomposedthroughmonadicoperationswithoutside-effectsisthattheseprogramsaredeterministic.Deterministicheremeansthat,giventhatnoexceptionisthrownintheprogram,theresultoftheprogram(valuesobservedinthefutures)willalwaysbethesame,regardlessoftheexecutionscheduleoftheparallelprogram.
Insomecasestheclientmaywanttocompletethepromiseonlyifithasnotbeencompletedyet(e.g.,thereareseveralHTTPrequestsbeingexecutedfromseveraldifferentfuturesandtheclientisinterestedonlyinthefirstHTTPresponse-correspondingtothefirstfuturetocompletethepromise).Forthesereasonsmethods
tryComplete,
trySuccessand
tryFailureexistonfuture.Theclientshouldbeawarethatusingthesemethodsresultsinprogramswhicharenotdeterministic,butdependontheexecutionschedule.
Themethod
completeWithcompletesthepromisewithanotherfuture.Afterthefutureiscompleted,thepromisegetscompletedwiththeresultofthatfutureaswell.Thefollowingprogramprints
1:
valf=future{1}
valp=promise[Int]
pcompleteWithf
p.futureonSuccess{
casex=>println(x)
}
[/code]Whenfailingapromisewithanexception,threesubtypesof
Throwablesarehandledspecially.Ifthe
Throwableusedtobreakthepromiseisa
scala.runtime.NonLocalReturnControl,thenthepromiseiscompletedwiththecorrespondingvalue.Ifthe
Throwableusedtobreakthepromiseisaninstanceof
Error,
InterruptedException,or
scala.util.control.ControlThrowable,the
Throwableiswrappedasthecauseofanew
ExecutionExceptionwhich,inturn,isfailingthepromise.
Usingpromises,the
onCompletemethodofthefuturesandthe
futureconstructyoucanimplementanyofthefunctionalcompositioncombinatorsdescribedearlier.Let’sassumeyouwanttoimplementanewcombinator
firstwhichtakestwofutures
fand
gandproducesathirdfuturewhichiscompletedbyeither
for
g(whichevercomesfirst),butonlygiventhatitissuccessful.
Hereisanexampleofhowtodoit:
deffirst[T](f:Future[T],g:Future[T]):Future[T]={
valp=promise[T]
fonSuccess{
casex=>p.trySuccess(x)
}
gonSuccess{
casex=>p.trySuccess(x)
}
p.future
}
[/code]Notethatinthisimplementation,ifneither
fnor
gsucceeds,then
first(f,g)nevercompletes(eitherwithavalueorwithanexception).
Utilities
Tosimplifyhandlingoftimeinconcurrentapplicationsscala.concurrentintroducesa
Durationabstraction.
Durationisnotsupposedtobeyetanothergeneraltimeabstraction.Itismeanttobeusedwithconcurrencylibrariesandresidesin
scala.concurrentpackage.
Durationisthebaseclassrepresentinglengthoftime.Itcanbeeitherfiniteorinfinite.Finitedurationisrepresentedwith
FiniteDurationclasswhichisconstructedfrom
Longlengthand
java.util.concurrent.TimeUnit.Infinitedurations,alsoextendedfrom
Duration,existinonlytwoinstances,
Duration.Infand
Duration.MinusInf.Libraryalsoprovidesseveral
Durationsubclassesforimplicitconversionpurposesandthoseshouldnotbeused.
Abstract
Durationcontainsmethodsthatallow:
Conversiontodifferenttimeunits(
toNanos,
toMicros,
toMillis,
toSeconds,
toMinutes,
toHours,
toDaysand
toUnit(unit:TimeUnit)).
Comparisonofdurations(
<,
<=,
>and
>=).
Arithmeticoperations(
+,
-,
*,
/and
unary_-).
Minimumandmaximumbetween
thisdurationandtheonesuppliedintheargument(
min,
max).
Checkifthedurationisfinite(
isFinite).
Durationcanbeinstantiatedinthefollowingways:
Implicitlyfromtypes
Intand
Long.Forexample
vald=100millis.
Bypassinga
Longlengthanda
java.util.concurrent.TimeUnit.Forexample
vald=Duration(100,MILLISECONDS).
Byparsingastringthatrepresentatimeperiod.Forexample
vald=Duration("1.2µs").
Durationalsoprovides
unapplymethodssoitcanbeusedinpatternmatchingconstructs.Examples:
importscala.concurrent.duration._
importjava.util.concurrent.TimeUnit._
//instantiation
vald1=Duration(100,MILLISECONDS)//fromLongandTimeUnit
vald2=Duration(100,"millis")//fromLongandString
vald3=100millis//implicitlyfromLong,IntorDouble
vald4=Duration("1.2µs")//fromString
//patternmatching
valDuration(length,unit)=5millis
Pastedfrom:<
[/code]
相关文章推荐
- Scala 并行和并发编程-Futures 和 Promises
- Scala 并行和并发编程-Futures 和 Promises【翻译】
- Scala Reflection - Mirrors,ClassTag,TypeTag and WeakTypeTag
- Scala-Evaluation Strategies and Termination (4_22)
- Scala, Groovy, Clojure, Jython, JRuby and Java ----我们的工作语言
- Programming in Scala (Second Edition) 读书笔记15 case class and pattern matching
- CassandraSF2011: Progress and Futures
- Scala函数式程序设计原理 week4 Types and Pattern Matching
- scala drools and map
- Scala Reflection - Mirrors,ClassTag,TypeTag and WeakTypeTag
- Scala - Implicit Conversions and Parameters
- Scala-5 - 3 - Lecture 4.3 - Subtyping and Generics (15_02)
- Futures/Promises Golang方式实现
- Play 2, Scala, postgresql and Squeryl 整合
- scala sortBy and sortWith
- SCALA 集合框架文档翻译-Mutable and Immutable Collections
- Callables and Futures
- Asynchronous JS: Callbacks, Listeners, Control Flow Libs and Promises
- Scala Cookbook读书笔记 Chapter 4.Classes and Properties 第一部分
- Books on Scala for statistical computing and data science