您的位置:首页 > 其它

scala Futures and Promises

2015-01-27 17:02 309 查看
http://docs.scala-lang.org/overviews/core/futures.htmlFuturesandPromises



By:PhilippHaller,AleksandarProkopec,HeatherMiller,ViktorKlang,RolandKuhn,andVojinJovanovic

Introduction

Futuresprovideanicewaytoreasonaboutperformingmanyoperationsinparallel–inanefficientandnon-blockingway.Theideaissimple,a
Future
isasortofaplaceholderobjectthatyoucancreateforaresultthatdoesnotyetexist.Generally,theresultofthe
Future
iscomputedconcurrentlyandcanbelatercollected.Composingconcurrenttasksinthiswaytendstoresultinfaster,asynchronous,non-blockingparallelcode.

Bydefault,futuresandpromisesarenon-blocking,makinguseofcallbacksinsteadoftypicalblockingoperations.Tosimplifytheuseofcallbacksbothsyntacticallyandconceptually,Scalaprovidescombinatorssuchas
flatMap
,
foreach
,and
filter
usedtocomposefuturesinanon-blockingway.Blockingisstillpossible-forcaseswhereitisabsolutelynecessary,futurescanbeblockedon(althoughthisisdiscouraged).

Futures

A
Future
isanobjectholdingavaluewhichmaybecomeavailableatsomepoint.Thisvalueisusuallytheresultofsomeothercomputation:

Ifthecomputationhasnotyetcompleted,wesaythatthe
Future
isnotcompleted.
Ifthecomputationhascompletedwithavalueorwithanexception,wesaythatthe
Future
iscompleted.
Completioncantakeoneoftwoforms:

Whena
Future
iscompletedwithavalue,wesaythatthefuturewassuccessfullycompletedwiththatvalue.
Whena
Future
iscompletedwithanexceptionthrownbythecomputation,wesaythatthe
Future
wasfailedwiththatexception.
A
Future
hasanimportantpropertythatitmayonlybeassignedonce.Oncea
Future
objectisgivenavalueoranexception,itbecomesineffectimmutable–itcanneverbeoverwritten.

Thesimplestwaytocreateafutureobjectistoinvokethe
future
methodwhichstartsanasynchronouscomputationandreturnsafutureholdingtheresultofthatcomputation.Theresultbecomesavailableoncethefuturecompletes.

Notethat
Future[T]
isatypewhichdenotesfutureobjects,whereas
future
isamethodwhichcreatesandschedulesanasynchronouscomputation,andthenreturnsafutureobjectwhichwillbecompletedwiththeresultofthatcomputation.

Thisisbestshownthroughanexample.

Let’sassumethatwewanttouseahypotheticalAPIofsomepopularsocialnetworktoobtainalistoffriendsforagivenuser.Wewillopenanewsessionandthensendarequesttoobtainalistoffriendsofaparticularuser:

importscala.concurrent._

importExecutionContext.Implicits.global


valsession=socialNetwork.createSessionFor("user",credentials)

valf:Future[List[Friend]]=future{

session.getFriends()

}

[/code]Above,wefirstimportthecontentsofthe
scala.concurrent
packagetomakethetype
Future
andtheconstruct
future
visible.Wewillexplainthesecondimportshortly.

Wetheninitializeasessionvariablewhichwewillusetosendrequeststotheserver,usingahypothetical
createSessionFor
method.Toobtainthelistoffriendsofauser,arequesthastobesentoveranetwork,whichcantakealongtime.Thisisillustratedwiththecalltothemethod
getFriends
thatreturns
List[Friend]
.TobetterutilizetheCPUuntiltheresponsearrives,weshouldnotblocktherestoftheprogram–thiscomputationshouldbescheduledasynchronously.The
future
methoddoesexactlythat–itperformsthespecifiedcomputationblockconcurrently,inthiscasesendingarequesttotheserverandwaitingforaresponse.

Thelistoffriendsbecomesavailableinthefuture
f
oncetheserverresponds.

Anunsuccessfulattemptmayresultinanexception.Inthefollowingexample,the
session
valueisincorrectlyinitialized,sothecomputationinthe
future
blockwillthrowa
NullPointerException
.Thisfuture
f
isthenfailedwiththisexceptioninsteadofbeingcompletedsuccessfully:

valsession=null

valf:Future[List[Friend]]=future{

session.getFriends

}

[/code]Theline
importExecutionContext.Implicits.global
aboveimportsthedefaultglobalexecutioncontext.Executioncontextsexecutetaskssubmittedtothem,andyoucanthinkofexecutioncontextsasthreadpools.Theyareessentialforthe
future
methodbecausetheyhandlehowandwhentheasynchronouscomputationisexecuted.Youcandefineyourownexecutioncontextsandusethemwith
future
,butfornowitissufficienttoknowthatyoucanimportthedefaultexecutioncontextasshownabove.

OurexamplewasbasedonahypotheticalsocialnetworkAPIwherethecomputationconsistsofsendinganetworkrequestandwaitingforaresponse.Itisfairtoofferanexampleinvolvinganasynchronouscomputationwhichyoucantryoutofthebox.Assumeyouhaveatextfileandyouwanttofindthepositionofthefirstoccurrenceofaparticularkeyword.Thiscomputationmayinvolveblockingwhilethefilecontentsarebeingretrievedfromthedisk,soitmakessensetoperformitconcurrentlywiththerestofthecomputation.

valfirstOccurrence:Future[Int]=future{

valsource=scala.io.Source.fromFile("myText.txt")

source.toSeq.indexOfSlice("myKeyword")

}

[/code]

Callbacks

Wenowknowhowtostartanasynchronouscomputationtocreateanewfuturevalue,butwehavenotshownhowtousetheresultonceitbecomesavailable,sothatwecandosomethingusefulwithit.Weareofteninterestedintheresultofthecomputation,notjustitsside-effects.

Inmanyfutureimplementations,oncetheclientofthefuturebecomesinterestedinitsresult,ithastoblockitsowncomputationandwaituntilthefutureiscompleted–onlythencanitusethevalueofthefuturetocontinueitsowncomputation.AlthoughthisisallowedbytheScala
Future
APIaswewillshowlater,fromaperformancepointofviewabetterwaytodoitisinacompletelynon-blockingway,byregisteringacallbackonthefuture.Thiscallbackiscalledasynchronouslyoncethefutureiscompleted.Ifthefuturehasalreadybeencompletedwhenregisteringthecallback,thenthecallbackmayeitherbeexecutedasynchronously,orsequentiallyonthesamethread.

Themostgeneralformofregisteringacallbackisbyusingthe
onComplete
method,whichtakesacallbackfunctionoftype
Try[T]=>U
.Thecallbackisappliedtothevalueoftype
Success[T]
ifthefuturecompletessuccessfully,ortoavalueoftype
Failure[T]
otherwise.

The
Try[T]
issimilarto
Option[T]
or
Either[T,S]
,inthatitisamonadpotentiallyholdingavalueofsometype.However,ithasbeenspecificallydesignedtoeitherholdavalueorsomethrowableobject.Wherean
Option[T]
couldeitherbeavalue(i.e.
Some[T]
)ornovalueatall(i.e.
None
),
Try[T]
isa
Success[T]
whenitholdsavalueandotherwise
Failure[T]
,whichholdsanexception.
Failure[T]
holdsmoreinformationthatjustaplain
None
bysayingwhythevalueisnotthere.Inthesametime,youcanthinkof
Try[T]
asaspecialversionof
Either[Throwable,T]
,specializedforthecasewhentheleftvalueisa
Throwable
.

Comingbacktooursocialnetworkexample,let’sassumewewanttofetchalistofourownrecentpostsandrenderthemtothescreen.Wedosobycallingamethod
getRecentPosts
whichreturnsa
List[String]
–alistofrecenttextualposts:

importscala.util.{Success,Failure}


valf:Future[List[String]]=future{

session.getRecentPosts

}


fonComplete{

caseSuccess(posts)=>for(post<-posts)println(post)

caseFailure(t)=>println("Anerrorhasoccured:"+t.getMessage)

}

[/code]The
onComplete
methodisgeneralinthesensethatitallowstheclienttohandletheresultofbothfailedandsuccessfulfuturecomputations.Tohandleonlysuccessfulresults,the
onSuccess
callbackisused(whichtakesapartialfunction):

valf:Future[List[String]]=future{

session.getRecentPosts

}


fonSuccess{

caseposts=>for(post<-posts)println(post)

}

[/code]Tohandlefailedresults,the
onFailure
callbackisused:

valf:Future[List[String]]=future{

session.getRecentPosts

}


fonFailure{

caset=>println("Anerrorhasoccured:"+t.getMessage)

}


fonSuccess{

caseposts=>for(post<-posts)println(post)

}

[/code]The
onFailure
callbackisonlyexecutedifthefuturefails,thatis,ifitcontainsanexception.

Sincepartialfunctionshavethe
isDefinedAt
method,the
onFailure
methodonlytriggersthecallbackifitisdefinedforaparticular
Throwable
.Inthefollowingexampletheregistered
onFailure
callbackisnevertriggered:

valf=future{

2/0

}


fonFailure{

casenpe:NullPointerException=>

println("I'dbeamazedifthisprintedout.")

}

[/code]Comingbacktothepreviousexamplewithsearchingforthefirstoccurrenceofakeyword,youmightwanttoprintthepositionofthekeywordtothescreen:

valfirstOccurrence:Future[Int]=future{

valsource=scala.io.Source.fromFile("myText.txt")

source.toSeq.indexOfSlice("myKeyword")

}


firstOccurrenceonSuccess{

caseidx=>println("Thekeywordfirstappearsatposition:"+idx)

}


firstOccurrenceonFailure{

caset=>println("Couldnotprocessfile:"+t.getMessage)

}

[/code]The
onComplete
,
onSuccess
,and
onFailure
methodshaveresulttype
Unit
,whichmeansinvocationsofthesemethodscannotbechained.Notethatthisdesignisintentional,toavoidsuggestingthatchainedinvocationsmayimplyanorderingontheexecutionoftheregisteredcallbacks(callbacksregisteredonthesamefutureareunordered).

Thatsaid,weshouldnowcommentonwhenexactlythecallbackgetscalled.Sinceitrequiresthevalueinthefuturetobeavailable,itcanonlybecalledafterthefutureiscompleted.However,thereisnoguaranteeitwillbecalledbythethreadthatcompletedthefutureorthethreadwhichcreatedthecallback.Instead,thecallbackisexecutedbysomethread,atsometimeafterthefutureobjectiscompleted.Wesaythatthecallbackisexecutedeventually.

Furthermore,theorderinwhichthecallbacksareexecutedisnotpredefined,evenbetweendifferentrunsofthesameapplication.Infact,thecallbacksmaynotbecalledsequentiallyoneaftertheother,butmayconcurrentlyexecuteatthesametime.Thismeansthatinthefollowingexamplethevariable
totalA
maynotbesettothecorrectnumberoflowercaseanduppercase
a
charactersfromthecomputedtext.

@volatilevartotalA=0


valtext=future{

"na"*16+"BATMAN!!!"

}


textonSuccess{

casetxt=>totalA+=txt.count(_=='a')

}


textonSuccess{

casetxt=>totalA+=txt.count(_=='A')

}

[/code]Above,thetwocallbacksmayexecuteoneaftertheother,inwhichcasethevariable
totalA
holdstheexpectedvalue
18
.However,theycouldalsoexecuteconcurrently,so
totalA
couldendupbeingeither
16
or
2
,since
+=
isnotanatomicoperation(i.e.itconsistsofareadandawritestepwhichmayinterleavearbitrarilywithotherreadsandwrites).

Forthesakeofcompletenessthesemanticsofcallbacksarelistedhere:

Registeringan
onComplete
callbackonthefutureensuresthatthecorrespondingclosureisinvokedafterthefutureiscompleted,eventually.

Registeringan
onSuccess
or
onFailure
callbackhasthesamesemanticsas
onComplete
,withthedifferencethattheclosureisonlycalledifthefutureiscompletedsuccessfullyorfails,respectively.

Registeringacallbackonthefuturewhichisalreadycompletedwillresultinthecallbackbeingexecutedeventually(asimpliedby1).

Intheeventthatmultiplecallbacksareregisteredonthefuture,theorderinwhichtheyareexecutedisnotdefined.Infact,thecallbacksmaybeexecutedconcurrentlywithoneanother.However,aparticular
ExecutionContext
implementationmayresultinawell-definedorder.

Intheeventthatsomeofthecallbacksthrowanexception,theothercallbacksareexecutedregardless.

Intheeventthatsomeofthecallbacksnevercomplete(e.g.thecallbackcontainsaninfiniteloop),theothercallbacksmaynotbeexecutedatall.Inthesecases,apotentiallyblockingcallbackmustusethe
blocking
construct(seebelow).

Onceexecuted,thecallbacksareremovedfromthefutureobject,thusbeingeligibleforGC.

FunctionalCompositionandFor-Comprehensions

Thecallbackmechanismwehaveshownissufficienttochainfutureresultswithsubsequentcomputations.However,itissometimesinconvenientandresultsinbulkycode.Weshowthiswithanexample.AssumewehaveanAPIforinterfacingwithacurrencytradingservice.SupposewewanttobuyUSdollars,butonlywhenit’sprofitable.Wefirstshowhowthiscouldbedoneusingcallbacks:

valrateQuote=future{

connection.getCurrentValue(USD)

}


rateQuoteonSuccess{casequote=>

valpurchase=future{

if(isProfitable(quote))connection.buy(amount,quote)

elsethrownewException("notprofitable")

}


purchaseonSuccess{

case_=>println("Purchased"+amount+"USD")

}

}

[/code]Westartbycreatingafuture
rateQuote
whichgetsthecurrentexchangerate.Afterthisvalueisobtainedfromtheserverandthefuturesuccessfullycompleted,thecomputationproceedsinthe
onSuccess
callbackandwearereadytodecidewhethertobuyornot.Wethereforecreateanotherfuture
purchase
whichmakesadecisiontobuyonlyifit’sprofitabletodoso,andthensendsarequest.Finally,oncethepurchaseiscompleted,weprintanotificationmessagetothestandardoutput.

Thisworks,butisinconvenientfortworeasons.First,wehavetouse
onSuccess
,andwehavetonestthesecond
purchase
futurewithinit.Imaginethatafterthe
purchase
iscompletedwewanttosellsomeothercurrency.Wewouldhavetorepeatthispatternwithinthe
onSuccess
callback,makingthecodeoverlyindented,bulkyandhardtoreasonabout.

Second,the
purchase
futureisnotinthescopewiththerestofthecode–itcanonlybeacteduponfromwithinthe
onSuccess
callback.Thismeansthatotherpartsoftheapplicationdonotseethe
purchase
futureandcannotregisteranother
onSuccess
callbacktoit,forexample,tosellsomeothercurrency.

Forthesetworeasons,futuresprovidecombinatorswhichallowamorestraightforwardcomposition.Oneofthebasiccombinatorsis
map
,which,givenafutureandamappingfunctionforthevalueofthefuture,producesanewfuturethatiscompletedwiththemappedvalueoncetheoriginalfutureissuccessfullycompleted.Youcanreasonaboutmappingfuturesinthesamewayyoureasonaboutmappingcollections.

Let’srewritethepreviousexampleusingthe
map
combinator:

valrateQuote=future{

connection.getCurrentValue(USD)

}


valpurchase=rateQuotemap{quote=>

if(isProfitable(quote))connection.buy(amount,quote)

elsethrownewException("notprofitable")

}


purchaseonSuccess{

case_=>println("Purchased"+amount+"USD")

}

[/code]Byusing
map
on
rateQuote
wehaveeliminatedone
onSuccess
callbackand,moreimportantly,thenesting.Ifwenowdecidetosellsomeothercurrency,itsufficestouse
map
on
purchase
again.

Butwhathappensif
isProfitable
returns
false
,hencecausinganexceptiontobethrown?Inthatcase
purchase
isfailedwiththatexception.Furthermore,imaginethattheconnectionwasbrokenandthat
getCurrentValue
threwanexception,failing
rateQuote
.Inthatcasewe’dhavenovaluetomap,sothe
purchase
wouldautomaticallybefailedwiththesameexceptionas
rateQuote
.

Inconclusion,iftheoriginalfutureiscompletedsuccessfullythenthereturnedfutureiscompletedwithamappedvaluefromtheoriginalfuture.Ifthemappingfunctionthrowsanexceptionthefutureiscompletedwiththatexception.Iftheoriginalfuturefailswithanexceptionthenthereturnedfuturealsocontainsthesameexception.Thisexceptionpropagatingsemanticsispresentintherestofthecombinators,aswell.

Oneofthedesigngoalsforfutureswastoenabletheiruseinfor-comprehensions.Forthisreason,futuresalsohavethe
flatMap
,
filter
and
foreach
combinators.The
flatMap
methodtakesafunctionthatmapsthevaluetoanewfuture
g
,andthenreturnsafuturewhichiscompletedonce
g
iscompleted.

LetsassumethatwewanttoexchangeUSdollarsforSwissfrancs(CHF).Wehavetofetchquotesforbothcurrencies,andthendecideonbuyingbasedonbothquotes.Hereisanexampleof
flatMap
and
withFilter
usagewithinfor-comprehensions:

valusdQuote=future{connection.getCurrentValue(USD)}

valchfQuote=future{connection.getCurrentValue(CHF)}


valpurchase=for{

usd<-usdQuote

chf<-chfQuote

ifisProfitable(usd,chf)

}yieldconnection.buy(amount,chf)


purchaseonSuccess{

case_=>println("Purchased"+amount+"CHF")

}

[/code]The
purchase
futureiscompletedonlyonceboth
usdQuote
and
chfQuote
arecompleted–itdependsonthevaluesofboththesefuturessoitsowncomputationcannotbeginearlier.

Thefor-comprehensionaboveistranslatedinto:

valpurchase=usdQuoteflatMap{

usd=>

chfQuote

.withFilter(chf=>isProfitable(usd,chf))

.map(chf=>connection.buy(amount,chf))

}

[/code]whichisabithardertograspthanthefor-comprehension,butweanalyzeittobetterunderstandthe
flatMap
operation.The
flatMap
operationmapsitsownvalueintosomeotherfuture.Oncethisdifferentfutureiscompleted,theresultingfutureiscompletedwithitsvalue.Inourexample,
flatMap
usesthevalueofthe
usdQuote
futuretomapthevalueofthe
chfQuote
intoathirdfuturewhichsendsarequesttobuyacertainamountofSwissfrancs.Theresultingfuture
purchase
iscompletedonlyoncethisthirdfuturereturnedfrom
map
completes.

Thiscanbemind-boggling,butfortunatelythe
flatMap
operationisseldomusedoutsidefor-comprehensions,whichareeasiertouseandunderstand.

The
filter
combinatorcreatesanewfuturewhichcontainsthevalueoftheoriginalfutureonlyifitsatisfiessomepredicate.Otherwise,thenewfutureisfailedwitha
NoSuchElementException
.Forfuturescalling
filter
hasexactlythesameeffectasdoescalling
withFilter
.

Therelationshipbetweenthe
collect
and
filter
combinatorissimilartotherelationshipofthesemethodsinthecollectionsAPI.

Itisimportanttonotethatcallingthe
foreach
combinatordoesnotblocktotraversethevalueonceitbecomesavailable.Instead,thefunctionforthe
foreach
getsasynchronouslyexecutedonlyifthefutureiscompletedsuccessfully.Thismeansthatthe
foreach
hasexactlythesamesemanticsasthe
onSuccess
callback.

Sincethe
Future
traitcanconceptuallycontaintwotypesofvalues(computationresultsandexceptions),thereexistsaneedforcombinatorswhichhandleexceptions.

Let’sassumethatbasedonthe
rateQuote
wedecidetobuyacertainamount.The
connection.buy
methodtakesan
amount
tobuyandtheexpected
quote
.Itreturnstheamountbought.Ifthe
quote
haschangedinthemeanwhile,itwillthrowa
QuoteChangedException
anditwillnotbuyanything.Ifwewantourfuturetocontain
0
insteadoftheexception,weusethe
recover
combinator:

valpurchase:Future[Int]=rateQuotemap{

quote=>connection.buy(amount,quote)

}recover{

caseQuoteChangedException()=>0

}

[/code]The
recover
combinatorcreatesanewfuturewhichholdsthesameresultastheoriginalfutureifitcompletedsuccessfully.Ifitdidnotthenthepartialfunctionargumentisappliedtothe
Throwable
whichfailedtheoriginalfuture.Ifitmapsthe
Throwable
tosomevalue,thenthenewfutureissuccessfullycompletedwiththatvalue.Ifthepartialfunctionisnotdefinedonthat
Throwable
,thentheresultingfutureisfailedwiththesame
Throwable
.

The
recoverWith
combinatorcreatesanewfuturewhichholdsthesameresultastheoriginalfutureifitcompletedsuccessfully.Otherwise,thepartialfunctionisappliedtothe
Throwable
whichfailedtheoriginalfuture.Ifitmapsthe
Throwable
tosomefuture,thenthisfutureiscompletedwiththeresultofthatfuture.Itsrelationto
recover
issimilartothatof
flatMap
to
map
.

Combinator
fallbackTo
createsanewfuturewhichholdstheresultofthisfutureifitwascompletedsuccessfully,orotherwisethesuccessfulresultoftheargumentfuture.Intheeventthatboththisfutureandtheargumentfuturefail,thenewfutureiscompletedwiththeexceptionfromthisfuture,asinthefollowingexamplewhichtriestoprintUSdollarvalue,butprintstheSwissfrancvalueinthecaseitfailstoobtainthedollarvalue:

valusdQuote=future{

connection.getCurrentValue(USD)

}map{

usd=>"Value:"+usd+"$"

}

valchfQuote=future{

connection.getCurrentValue(CHF)

}map{

chf=>"Value:"+chf+"CHF"

}


valanyQuote=usdQuotefallbackTochfQuote


anyQuoteonSuccess{println(_)}

[/code]The
andThen
combinatorisusedpurelyforside-effectingpurposes.Itreturnsanewfuturewithexactlythesameresultasthecurrentfuture,regardlessofwhetherthecurrentfuturefailedornot.Oncethecurrentfutureiscompletedwiththeresult,theclosurecorrespondingtothe
andThen
isinvokedandthenthenewfutureiscompletedwiththesameresultasthisfuture.Thisensuresthatmultiple
andThen
callsareordered,asinthefollowingexamplewhichstorestherecentpostsfromasocialnetworktoamutablesetandthenrendersallthepoststothescreen:

valallposts=mutable.Set[String]()


future{

session.getRecentPosts

}andThen{

posts=>allposts++=posts

}andThen{

posts=>

clearAll()

for(post<-allposts)render(post)

}

[/code]Insummary,thecombinatorsonfuturesarepurelyfunctional.Everycombinatorreturnsanewfuturewhichisrelatedtothefutureitwasderivedfrom.

Projections

Toenablefor-comprehensionsonaresultreturnedasanexception,futuresalsohaveprojections.Iftheoriginalfuturefails,the
failed
projectionreturnsafuturecontainingavalueoftype
Throwable
.Iftheoriginalfuturesucceeds,the
failed
projectionfailswitha
NoSuchElementException
.Thefollowingisanexamplewhichprintstheexceptiontothescreen:

valf=future{

2/0

}

for(exc<-f.failed)println(exc)

[/code]Thefollowingexampledoesnotprintanythingtothescreen:

valf=future{

4/2

}

for(exc<-f.failed)println(exc)

[/code]

ExtendingFutures

SupportforextendingtheFuturesAPIwithadditionalutilitymethodsisplanned.Thiswillallowexternalframeworkstoprovidemorespecializedutilities.

Blocking

Asmentionedearlier,blockingonafutureisstronglydiscouragedforthesakeofperformanceandforthepreventionofdeadlocks.Callbacksandcombinatorsonfuturesareapreferredwaytousetheirresults.However,blockingmaybenecessaryincertainsituationsandissupportedbytheFuturesandPromisesAPI.

Inthecurrencytradingexampleabove,oneplacetoblockisattheendoftheapplicationtomakesurethatallofthefutureshavebeencompleted.Hereisanexampleofhowtoblockontheresultofafuture:

importscala.concurrent._

importscala.concurrent.duration._


defmain(args:Array[String]){

valrateQuote=future{

connection.getCurrentValue(USD)

}


valpurchase=rateQuotemap{quote=>

if(isProfitable(quote))connection.buy(amount,quote)

elsethrownewException("notprofitable")

}


Await.result(purchase,0nanos)

}

[/code]Inthecasethatthefuturefails,thecallerisforwardedtheexceptionthatthefutureisfailedwith.Thisincludesthe
failed
projection–blockingonitresultsina
NoSuchElementException
beingthrowniftheoriginalfutureiscompletedsuccessfully.

Alternatively,calling
Await.ready
waitsuntilthefuturebecomescompleted,butdoesnotretrieveitsresult.Inthesameway,callingthatmethodwillnotthrowanexceptionifthefutureisfailed.

The
Future
traitimplementsthe
Awaitable
traitwithmethodsmethod
ready()
and
result()
.Thesemethodscannotbecalleddirectlybytheclients–theycanonlybecalledbytheexecutioncontext.

Toallowclientstocall3rdpartycodewhichispotentiallyblockingandavoidimplementingthe
Awaitable
trait,thesame
blocking
primitivecanalsobeusedinthefollowingform:

blocking{

potentiallyBlockingCall()

}

[/code]Theblockingcodemayalsothrowanexception.Inthiscase,theexceptionisforwardedtothecaller.

Exceptions

Whenasynchronouscomputationsthrowunhandledexceptions,futuresassociatedwiththosecomputationsfail.Failedfuturesstoreaninstanceof
Throwable
insteadoftheresultvalue.
Future
sprovidethe
onFailure
callbackmethod,whichacceptsa
PartialFunction
tobeappliedtoa
Throwable
.Thefollowingspecialexceptionsaretreateddifferently:

scala.runtime.NonLocalReturnControl[_]
–thisexceptionholdsavalueassociatedwiththereturn.Typically,
return
constructsinmethodbodiesaretranslatedto
throw
swiththisexception.Insteadofkeepingthisexception,theassociatedvalueisstoredintothefutureorapromise.

ExecutionException
-storedwhenthecomputationfailsduetoanunhandled
InterruptedException
,
Error
ora
scala.util.control.ControlThrowable
.Inthiscasethe
ExecutionException
hastheunhandledexceptionasitscause.Theseexceptionsarerethrowninthethreadexecutingthefailedasynchronouscomputation.Therationalebehindthisistopreventpropagationofcriticalandcontrol-flowrelatedexceptionsnormallynothandledbytheclientcodeandatthesametimeinformtheclientinwhichfuturethecomputationfailed.

See
NonFatal
foramoreprecisesemanticsdescription.

Promises

Sofarwehaveonlyconsidered
Future
objectscreatedbyasynchronouscomputationsstartedusingthe
future
method.However,futurescanalsobecreatedusingpromises.

Whilefuturesaredefinedasatypeofread-onlyplaceholderobjectcreatedforaresultwhichdoesn’tyetexist,apromisecanbethoughtofasawritable,single-assignmentcontainer,whichcompletesafuture.Thatis,apromisecanbeusedtosuccessfullycompleteafuturewithavalue(by“completing”thepromise)usingthe
success
method.Conversely,apromisecanalsobeusedtocompleteafuturewithanexception,byfailingthepromise,usingthe
failure
method.

Apromise
p
completesthefuturereturnedby
p.future
.Thisfutureisspecifictothepromise
p
.Dependingontheimplementation,itmaybethecasethat
p.futureeqp
.

Considerthefollowingproducer-consumerexample,inwhichonecomputationproducesavalueandhandsitofftoanothercomputationwhichconsumesthatvalue.Thispassingofthevalueisdoneusingapromise.

importscala.concurrent.{future,promise}

importscala.concurrent.ExecutionContext.Implicits.global


valp=promise[T]

valf=p.future


valproducer=future{

valr=produceSomething()

psuccessr

continueDoingSomethingUnrelated()

}


valconsumer=future{

startDoingSomething()

fonSuccess{

caser=>doSomethingWithResult()

}

}

[/code]Here,wecreateapromiseanduseits
future
methodtoobtainthe
Future
thatitcompletes.Then,webegintwoasynchronouscomputations.Thefirstdoessomecomputation,resultinginavalue
r
,whichisthenusedtocompletethefuture
f
,byfulfillingthepromise
p
.Theseconddoessomecomputation,andthenreadstheresult
r
ofthecompletedfuture
f
.Notethatthe
consumer
canobtaintheresultbeforethe
producer
taskisfinishedexecutingthe
continueDoingSomethingUnrelated()
method.

Asmentionedbefore,promiseshavesingle-assignmentsemantics.Assuch,theycanbecompletedonlyonce.Calling
success
onapromisethathasalreadybeencompleted(orfailed)willthrowan
IllegalStateException
.

Thefollowingexampleshowshowtofailapromise.

valp=promise[T]

valf=p.future


valproducer=future{

valr=someComputation

if(isInvalid(r))

pfailure(newIllegalStateException)

else{

valq=doSomeMoreComputation(r)

psuccessq

}

}

[/code]Here,the
producer
computesanintermediateresult
r
,andcheckswhetherit’svalid.Inthecasethatit’sinvalid,itfailsthepromisebycompletingthepromise
p
withanexception.Inthiscase,theassociatedfuture
f
isfailed.Otherwise,the
producer
continuesitscomputation,andfinallycompletesthefuture
f
withavalidresult,bycompletingpromise
p
.

Promisescanalsobecompletedwitha
complete
methodwhichtakesapotentialvalue
Try[T]
–eitherafailedresultoftype
Failure[Throwable]
orasuccessfulresultoftype
Success[T]
.

Analogousto
success
,calling
failure
and
complete
onapromisethathasalreadybeencompletedwillthrowan
IllegalStateException
.

Onenicepropertyofprogramswrittenusingpromiseswithoperationsdescribedsofarandfutureswhicharecomposedthroughmonadicoperationswithoutside-effectsisthattheseprogramsaredeterministic.Deterministicheremeansthat,giventhatnoexceptionisthrownintheprogram,theresultoftheprogram(valuesobservedinthefutures)willalwaysbethesame,regardlessoftheexecutionscheduleoftheparallelprogram.

Insomecasestheclientmaywanttocompletethepromiseonlyifithasnotbeencompletedyet(e.g.,thereareseveralHTTPrequestsbeingexecutedfromseveraldifferentfuturesandtheclientisinterestedonlyinthefirstHTTPresponse-correspondingtothefirstfuturetocompletethepromise).Forthesereasonsmethods
tryComplete
,
trySuccess
and
tryFailure
existonfuture.Theclientshouldbeawarethatusingthesemethodsresultsinprogramswhicharenotdeterministic,butdependontheexecutionschedule.

Themethod
completeWith
completesthepromisewithanotherfuture.Afterthefutureiscompleted,thepromisegetscompletedwiththeresultofthatfutureaswell.Thefollowingprogramprints
1
:

valf=future{1}

valp=promise[Int]


pcompleteWithf


p.futureonSuccess{

casex=>println(x)

}

[/code]Whenfailingapromisewithanexception,threesubtypesof
Throwable
sarehandledspecially.Ifthe
Throwable
usedtobreakthepromiseisa
scala.runtime.NonLocalReturnControl
,thenthepromiseiscompletedwiththecorrespondingvalue.Ifthe
Throwable
usedtobreakthepromiseisaninstanceof
Error
,
InterruptedException
,or
scala.util.control.ControlThrowable
,the
Throwable
iswrappedasthecauseofanew
ExecutionException
which,inturn,isfailingthepromise.

Usingpromises,the
onComplete
methodofthefuturesandthe
future
constructyoucanimplementanyofthefunctionalcompositioncombinatorsdescribedearlier.Let’sassumeyouwanttoimplementanewcombinator
first
whichtakestwofutures
f
and
g
andproducesathirdfuturewhichiscompletedbyeither
f
or
g
(whichevercomesfirst),butonlygiventhatitissuccessful.

Hereisanexampleofhowtodoit:

deffirst[T](f:Future[T],g:Future[T]):Future[T]={

valp=promise[T]


fonSuccess{

casex=>p.trySuccess(x)

}


gonSuccess{

casex=>p.trySuccess(x)

}


p.future

}

[/code]Notethatinthisimplementation,ifneither
f
nor
g
succeeds,then
first(f,g)
nevercompletes(eitherwithavalueorwithanexception).

Utilities

Tosimplifyhandlingoftimeinconcurrentapplications
scala.concurrent
introducesa
Duration
abstraction.
Duration
isnotsupposedtobeyetanothergeneraltimeabstraction.Itismeanttobeusedwithconcurrencylibrariesandresidesin
scala.concurrent
package.

Duration
isthebaseclassrepresentinglengthoftime.Itcanbeeitherfiniteorinfinite.Finitedurationisrepresentedwith
FiniteDuration
classwhichisconstructedfrom
Long
lengthand
java.util.concurrent.TimeUnit
.Infinitedurations,alsoextendedfrom
Duration
,existinonlytwoinstances,
Duration.Inf
and
Duration.MinusInf
.Libraryalsoprovidesseveral
Duration
subclassesforimplicitconversionpurposesandthoseshouldnotbeused.

Abstract
Duration
containsmethodsthatallow:

Conversiontodifferenttimeunits(
toNanos
,
toMicros
,
toMillis
,
toSeconds
,
toMinutes
,
toHours
,
toDays
and
toUnit(unit:TimeUnit)
).
Comparisonofdurations(
<
,
<=
,
>
and
>=
).
Arithmeticoperations(
+
,
-
,
*
,
/
and
unary_-
).
Minimumandmaximumbetween
this
durationandtheonesuppliedintheargument(
min
,
max
).
Checkifthedurationisfinite(
isFinite
).
Duration
canbeinstantiatedinthefollowingways:

Implicitlyfromtypes
Int
and
Long
.Forexample
vald=100millis
.
Bypassinga
Long
lengthanda
java.util.concurrent.TimeUnit
.Forexample
vald=Duration(100,MILLISECONDS)
.
Byparsingastringthatrepresentatimeperiod.Forexample
vald=Duration("1.2µs")
.
Durationalsoprovides
unapply
methodssoitcanbeusedinpatternmatchingconstructs.Examples:

importscala.concurrent.duration._

importjava.util.concurrent.TimeUnit._


//instantiation

vald1=Duration(100,MILLISECONDS)//fromLongandTimeUnit

vald2=Duration(100,"millis")//fromLongandString

vald3=100millis//implicitlyfromLong,IntorDouble

vald4=Duration("1.2µs")//fromString


//patternmatching

valDuration(length,unit)=5millis

Pastedfrom:<http://docs.scala-lang.org/overviews/core/futures.html>
[/code]

来自为知笔记(Wiz)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: