您的位置:首页 > 编程语言

Scala 并行和并发编程-Futures 和 Promises【翻译】

2016-02-11 20:52 405 查看
官网地址

本文内容

简介Futures阻塞异常Promises工具
最近看了《七周七语言:理解多种编程泛型》,介绍了七种语言(四种编程泛型)的主要特性:基本语法,集合,并行/并发,其中就有Scala。你不能指望这种书全面介绍,因为其中任何一门语言都够写一本书了~

我比较关注并行/并发,但是书中关于Scala的并发部分——Actor,代码编译不通过,“Deprecated”,哎,这书点不负责,代码也不写采用编译器的版本。于是就到Scala官网看了一下,即便是官网,也列出了对Actor的改进,有些已经不再使用了~

Java在它的版本8之前,函数式编程实在太弱了,不然也不会出现像Scala这样在JVM上运行,能够与Java完美融合的语言(估计,Java在函数式编程在这方面,太落后了,社区已经等不急了,而函数式编程最大的优点是——并行)。

本文来自Scala官网,完整示例代码几乎没有,大部分是理论,虽然讲解得很详细,但看起来实在有点费劲。因此,你最好找点这方面完整示例再看看。

官网其实也有中文翻译,但却是机器翻译的。

简介

Future提供了一套高效非阻塞(non-blocking)的方式完成并行操作。其基本思想很简单,所谓Future,指的是一类占位符对象(placeholderobject),用于指代某些尚未完成计算的结果。一般,由Future的计算结果都是并行执行的,计算完后再使用。以这种方式组织并行任务,便可以写出高效、异步、非阻塞的并行代码。

默认情况,future和promise利用回调(callback)的非阻塞方式,并不是采用典型的阻塞方式。为了在语法和概念层面简化回调的使用,Scala提供了flatMap、foreach和filter等算子(combinator),使得我们能够以非阻塞的方式对future进行组合。当然,future对于那些必须使用阻塞的情况仍然支持阻塞操作,可以阻塞等待future(不过不鼓励这样做)。

一个典型的future如下所示:

[code]valinverseFuture:Future[Matrix]=Future{
fatMatrix.inverse()//non-blockinglonglastingcomputation
}(executionContext)

或是更常用的:

[code]implicitvalec:ExecutionContext=...
valinverseFuture:Future[Matrix]=Future{
fatMatrix.inverse()
}//ecisimplicitlypassed

这两个代码片段把fatMatrix.inverse()的执行委托给ExecutionContext,在
inverseFuture
中体现计算结果。

Futures

所谓Future,是一种用于指代某个尚未就绪的值的对象。这个值通常是某个计算过程的结果:

若该计算过程尚未完成,我们就说该Future未完成;
若该计算过程正常结束,或中途抛出异常,我们就说该Future已完成。

Future完成分为两种情况:

当Future带着某个值而完成时,我们就说该Future带着计算结果成功完成。
当Future带着异常而完成时,计算过程中抛出的异常,我们就说Future因异常而失败。

Future具有一个重要的属性——只能被赋值一次。一旦给定了某个值或某个异常,future对象就变成了不可变对象——无法再被改写。
创建future对象最简单的方法是调用future方法,开始异步(asynchronous)计算,并返回保存有计算结果的futrue。一旦该future计算完成,其结果就变的可用。
注意,Future[T]是一个类型,表示future对象,而future是一个方法,创建和调度一个异步计算,并返回一个带有计算结果的future对象。
下面通过一个例子来展示。
假设,我们使用某个社交网络假想的API获取某个用户的朋友列表,我们将打开一个新对话(session),然后发送一个获取特定用户朋友列表的请求。


[code]importscala.concurrent._
importExecutionContext.Implicits.global
valsession=socialNetwork.createSessionFor("user",credentials)
valf:Future[List[Friend]]=Future{
session.getFriends()
}

上面,首先导入scala.concurrent包。然后,通过一个假想的createSessionFor方法初始化一个向服务器发送请求session变量。这个请求是通过网络发送的,所以可能耗时很长。调用getFriends方法返回List[Friend]。为了更好的利用CPU,知道响应到达,不应该阻塞(block)程序的其他部分,这个计算应该被异步调度。future方法就是这样做的,它并行地执行指定的计算块,在这个例子中,向服务器发送请求,等待响应。

一旦服务器响应,futuref中的好友列表将变得可用。

失败可能会导致一个exception。在下面的例子中,session的值未被正确的初始化,于是,future块中计算将抛出一个NullPointerException。这样,futuref失败了。


[code]valsession=null
valf:Future[List[Friend]]=Future{
session.getFriends
}

上面的importExecutionContext.Implicits.global
导入默认的全局执行上下文(globalexecutioncontext)。执行上下文执行提交给他们的任务,你也可把执行上下文看作线程池,这对future方法是必不可少的,因为,它们处理如何和何时执行异步计算。你可以定义自己的执行上下文,并用future使用,但现在,只需要知道你能够通过上面的语句导入默认执行上下文就足够了。

我们的例子是基于一个假想的社交网络API,计算包含了发送网络请求和等待响应。下面,假设你有一个文本文件,想找出一个特定词第一次出现的位置。当磁盘正在检索此文件时,这个计算过程可能会陷入阻塞,因此,并行执行程序的剩余部分将很有意义。


[code]valfirstOccurrence:Future[Int]=Future{
valsource=scala.io.Source.fromFile("e:\scala\myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}

回调函数

现在,我们知道如何开始一个异步计算来创建一个新的future值,但是我们没有演示一旦此结果变得可用后如何使用。我们经常对计算结果感兴趣而不仅仅是它的副作用(side-effects)。
在许多future的实现中,一旦future的客户端对结果感兴趣,它必须阻塞它自己的计算,并等待直到future完成——然后才能使用future的值继续它自己的计算。虽然这在ScalaFutureAPI(在后面会展示)中是允许的,但从性能角度来看更好的办法是完全非阻塞,即在future中注册一个回调。一旦future完成,就异步调用回调。如果当注册回调,future已经完成,那么,回调或是异步执行,或在相同的线程中循序执行。
注册回调最通常的形式,是使用OnComplete方法,即创建一个
Try[T]=>U
类型的回调函数。如果future成功完成,回调则会应用到Success[T]类型的值中,否则应用到
Failure[T]
类型的值中。
Try[T]
Option[T]
Either[T,S]
相似,因为它是一个可能持有某种类型值的单子(monda)。然而,它是为持有一个值或异常对象特殊设计的。
Option[T]
既可以是一个值(如:
Some[T]
)也可以完全不是值(如:
None
),如果
Try[T]
获得一个值是,那么它是
Success[T]
,否则为持有异常的
Failure[T]
Failure[T]
有很多信息,不仅仅是关于为什么没有值None。同时,也可以把
Try[T]
看作一种特殊版本的
Either[Throwable,T]
,特别是当左边值为一个Throwable的情形。
“一个单子(Monad)说白了不过就是自函子范畴上的一个幺半群而已。”这句话出自Haskell大神PhilipWadler,也是他提议把Monad引入Haskell。
回到我们社交网络的例子,假设,我们想获取最近的帖子并显示在屏幕上,可以通过调用getRecentPosts方法,它返回List[String]:

[code]importscala.util.{Success,Failure}
valf:Future[List[String]]=Future{
session.getRecentPosts
}
fonComplete{
caseSuccess(posts)=>for(post<-posts)println(post)
caseFailure(t)=>println("Anerrorhasoccured:"+t.getMessage)
}
onComplete方法允许客户处理失败或成功的future结果。对于成功,onSuccess回调使用如下:


[code]valf:Future[List[String]]=Future{
session.getRecentPosts
}
f
onSuccess{
caseposts=>for(post<-posts
)
println(post)
}
对于失败,onFailure回调使用如下:


[code]valf:Future[List[String]]=Future{
session.getRecentPosts
}
fonFailure{
caset=>println("Anerrorhasoccured:"+t.getMessage)
}
fonSuccess{
caseposts=>for(post<-posts)println(post)
}

onFailure回调只有在future失败,也就是包含一个异常时才会执行。

因为部分函数(partialfunctions)具有isDefinedAt方法,所以,
onFailure
方法只有为了特定Throwable而定义才会触发。下面的例子,已注册的
onFailure
回调永远不会被触发:


[code]valf=Future{
2/0
}
fonFailure{
casenpe:NullPointerException=>
println("I'dbeamazedifthisprintedout.")
}


部分函数(Partialfunctions),假设有一个数学函数f(a,b,c),partial(f,1)返回的是数学函数f(1,b,c),函数的参数a已经被代入。


回到前面例子,查找某个第一次出现的关键字,在屏幕上输出该关键字的位置:


[code]valfirstOccurrence:Future[Int]=Future{
valsource=scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrenceonSuccess{
caseidx=>println("Thekeywordfirstappearsatposition:"+idx)
}
firstOccurrenceonFailure{
caset=>println("Couldnotprocessfile:"+t.getMessage)
}

onComplete,、onSuccess和onFailure方法都具有结果类型Unit,这意味着这些回调方法不能被链接。注意,这种设计是为了避免链式调用可能隐含在已注册回调上一个顺序的执行(同一个future中注册的回调是无序的)。

也就是说,我们现在应讨论论何时调用回调。因为回调需要future中的值是可用的,只有future完成后才能被调用。然而,不能保证被完成future的线程或创建回调的线程调用。反而,回调有时会在future对象完成后被某个线程调用。我们可以说,回调最终会被执行。
更进一步,回调被执行的顺序不是预先定义的,甚至在同一个应用程序。事实上,回调也许不是一个接一个连续调用的,但在同一时间并发调用。这意味着,下面例子中,变量totalA也许不能从计算的文本中得到大小写字母数量的正确值。

[code]@volatilevartotalA=0
valtext=Future{
"na"*16+"BATMAN!!!"
}
textonSuccess{
casetxt=>totalA+=txt.count(_=='a')
}
textonSuccess{
casetxt=>totalA+=txt.count(_=='A')
}

上面,两个回调可能一个接一个地执行,变量totalA得到的预期值为18。然而,它们也可能是并发执行,于是,totalA最终可能是16或2,因为+=不是一个原子操作(即它是由一个读和一个写的步骤组成,这样就可能使其与其他的读和写任意交错执行)。

考虑到完整性,回调的语义如下:

在future上注册onComplete回调,要确保future执行完成后,相应的闭包(closure)最终被调用。
注册onSuccess或onFailure回调,与onComplete语义一样,不同的是,只有在future成功地或失败地执行完后,才会调用。
在future上注册一个已经完成的回调,将导致回调最终被执行。将最终导致一直处于执行状态的回调(上面1所隐含的)。
在future上注册多个回调时,这些回调的执行顺序是不确定的。事实上,这些回调可能是并行执行的,然而,某个ExecutionContext执行可能导致明确的执行顺序。
在某些回调抛出异常时,其他回调的执行不受影响。
在某些回调无法永远无法结束时(例如,回调包含一个无限循环),其他回调可能完全不会执行。这种情况下,那些潜在的阻塞的回调需要使用阻塞结构。将在后面“阻塞”小节说明。
一旦执行完后,回调会从future对象中移除,这对垃圾回收机制(GC)很合适。

函数组合(FunctionalComposition)和For解构(For-Comprehensions)

尽管前文所展示的回调机制已经足够把future的结果和后继计算结合起来的,但是有时回调机制并不易于使用,且容易造成冗余的代码。可以通过一个例子来说明。假设,我们有一个用于进行货币交易服务的API,想要在有盈利的时候购进一些美元。让我们先来看看怎样用回调来解决这个问题:

[code]valrateQuote=Future{
connection.getCurrentValue(USD)
}
rateQuoteonSuccess{casequote=>
valpurchase=Future{
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
purchaseonSuccess{
case_=>println("Purchased"+amount+"USD")
}
}

首先,我们创建一个名为rateQuote的future对象并获得当前的汇率。从服务器返回汇率且该future对象成功完成之后,在onSuccess回调判断买还是不买。所以,我们创建了另一个名为purchase的future对象,用来在可盈利的情况下做出购买决定,并在发送一个请求。最后,一旦purchase运行结束,会输出一条通知消息。

这确实是可行,但有两点原因使这种做法并不方便。其一,不得不使用onSuccess,且不得不在其中嵌套另一个purchasefuture对象。试想一下,如果在purchase执行完成之后我们可能想要卖掉一些其他的货币。这时将不得不在onSuccess的回调中重复这个模式,从而可能使代码过度嵌套,冗长且难以理解。

其二,purchase只是定义在局部范围,只能被来自onSuccess内部的回调响应。也就是说,这个应用的其他部分看不到purchase,而且不能为它注册其他的onSuccess回调,比如说卖掉些其他货币。

为解决上述的两个问题,futures提供了组合器(combinators)使之具有更多易用的组合。映射(map)是最基本的组合器之一。试想,给定一个future对象和一个通过映射来获得该future值的函数,映射方法将创建一个新Future对象,一旦原来的Future成功完成了计算,新Future会通过该返回值来完成自己的计算。你能够像理解容器(collections)的map一样来理解future的map。

让我们用map的方法来重构一下前面的例子:


[code]valrateQuote=Future{
connection.getCurrentValue(USD)
}
valpurchase=rateQuotemap{quote=>
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
purchaseonSuccess{
case_=>println("Purchased"+amount+"USD")
}

通过对rateQuote的映射我们减少了一次onSuccess回调,更重要的是避免了嵌套。这时如果我们决定出售一些货币就可以再次使用purchase方法上的映射了。

可是,如果isProfitable方法返回了false将会发生些什么?会引发异常?这种情况下,purchase的确会因异常而失败。不仅仅如此,想象一下,链接的中断和getCurrentValue方法抛出异常会使rateQuote的操作失败。在这些情况下,映射将不会返回任何值,而purchase也会自动以和rateQuote相同的异常而执行失败。

总之,如果原Future的计算成功完成了,那么返回的Future将会使用原Future的映射值来完成计算。如果映射函数抛出了异常,则Future也会带着该异常完成计算。如果原Future由于异常而计算失败,那么返回的Future也会包含相同的异常。这种异常的传导方式也同样适用于其他的组合器(combinators)。

Future的这个设计目的是使之能够在For-comprehensions下使用。也正是因为这,Future还拥有flatMap,filter和foreach等组合器。其中,flatMap方法可以构造一个函数,可以把值映射到一个姑且称为g的新future,然后返回一个随g的完成而完成的Future对象。

让我们假设,想把一些美元兑换成法郎。必须为这两种货币报价,然后再在这两个报价的基础上确定交易。下面是一个在for-comprehensions中使用flatMap和withFilter的例子:


[code]valusdQuote=Future{connection.getCurrentValue(USD)}
valchfQuote=Future{connection.getCurrentValue(CHF)}
valpurchase=for{
usd<-usdQuote
chf<-chfQuote
ifisProfitable(usd,chf)
}yieldconnection.buy(amount,chf)
purchaseonSuccess{
case_=>println("Purchased"+amount+"CHF")
}

purchase只有当usdQuote和chfQuote都完成计算后才能完成,它以其他两个Future的计算值为前提,所以它自己的计算不能更早开始。

上面的for-comprhension将被转换为:


[code]valpurchase=usdQuoteflatMap{
usd=>
chfQuote
.withFilter(chf=>isProfitable(usd,chf))
.map(chf=>connection.buy(amount,chf))
}

这的确是比for-comprehension稍微难以把握一些,但是我们这样分析有助于更容易的理解flatMap操作。FlatMap操作会把自身的值映射到其他future对象上,并随着该对象计算完成的返回值一起完成计算。在例子中,flatMap用usdQuote的值把chfQuote的值映射到第三个futrue对象里,该对象用于发送一定量法郎的购入请求。只有当通过映射返回的第三个future对象完成了计算,purchase才能完成计算。

这可能有些难以置信,但幸运的是faltMap操作在for-comprhensions模式以外很少使用,因为for-comprehensions本身更容易理解和使用。

再说说filter,它可以用于创建一个新的future对象,该对象只有在满足某些特定条件的前提下才会得到原始future的计算值,否则就会抛出一个NoSuchElementException的异常而失败。调用了filter的future,其效果与直接调用withFilter完全一样。

collect和filter组合器之间的关系有些类似容器(collections)API里那些方法之间的关系。

值得注意的是,调用foreach组合器并不会在计算值可用的时候阻塞当前的进程去获取计算值。恰恰相反,只有当future对象成功计算完成后,foreach所迭代的函数才能够被异步执行。这意味着foreach与onSuccess回调意义完全相同。

由于Futuretrait(trait类似java中的接口interface)从概念上看包含两种类型的返回值(计算结果和异常),所以组合器会有一个处理异常的需求。

比如,我们准备在rateQuote的基础上决定购入一定量的货币,那么
connection.buy
方法需要知道购入的数量和期望的报价值,最终完成购买的数量将会被返回。假如报价值偏偏在这个时候改变了,那么,buy方法将会抛出一个
QuoteChangedExecption
,并且不会做任何交易。如果我们想让我们的Future对象返回0,而不是抛出那个该死的异常,那我们需要使用recover组合器:


[code]valpurchase:Future[Int]=rateQuotemap{
quote=>connection.buy(amount,quote)
}recover{
caseQuoteChangedException()=>0
}

recover能够创建一个新future对象,该对象当计算完成时持有和原future对象一样的值。如果执行不成功,则偏函数的参数会被传递给使原Future失败的那个Throwable异常。如果它把Throwable映射到了某个值,那么新的Future就会成功完成并返回该值。如果偏函数没有定义在Throwable中,那么最终产生结果的future也会失败并返回同样的Throwable。

组合器recoverWith能够创建一个新future对象,当原future对象成功完成计算时,新future对象包含有和原future对象相同的计算结果。若原future失败或异常,偏函数将会返回造成原future失败的相同的Throwable异常。如果此时Throwable又被映射给了别的future,那么新Future就会完成并返回这个future的结果。recoverWith和recover的关系与flatMap和map之间的关系很像。

fallbackTo组合器生成的future对象可以在该原future成功完成计算时返回结果,如果原future失败或异常返回future参数对象的成功值。在原future和参数future都失败的情况下,新future对象会完成并返回原future对象抛出的异常。正如下面例子,本想打印美元的汇率,但在获取美元汇率失败的情况下会打印法郎的汇率:


[code]valusdQuote=Future{
connection.getCurrentValue(USD)
}map{
usd=>"Value:"+usd+"$"
}
valchfQuote=Future{
connection.getCurrentValue(CHF)
}map{
chf=>"Value:"+chf+"CHF"
}
valanyQuote=usdQuotefallbackTochfQuote
anyQuoteonSuccess{println(_)}

组合器andThen的用法是出于纯粹的side-effecting目的。经andThen返回的新Future,无论原Future成功或失败都会返回与原Future一样的结果。一旦原Future完成并返回结果,andThen后跟的代码块就会被调用,且新Future将返回与原Future一样的结果,这确保了多个andThen调用的顺序执行。正如下例所示,这段代码可以从社交网站上把近期发出的帖子收集到一个可变集合里,然后把它们都打印在屏幕上:


[code]valallposts=mutable.Set[String]()
Future{
session.getRecentPosts
}andThen{
posts=>allposts++=posts
}andThen{
posts=>
clearAll()
for(post<-allposts)render(post)
}

综上所述,Future的组合器功能是纯函数式的,每种组合器都会返回一个与原Future相关的新Future对象。

投影(Projections)

为了确保for-comprehensions能够返回异常,futures也提供了投影(projections)。如果原future对象失败了,失败的投影(projection)会返回一个带有Throwable类型返回值的future对象。如果原Future成功了,失败的投影(projection)会抛出一个NoSuchElementException异常。下面是一个在屏幕上打印出异常的例子:


[code]valf=Future{
2/0
}
for(exc<-f.failed)println(exc)
下面例子不会在屏幕上打印出任何东西:


[code]valf=Future{
4/2
}
for(exc<-f.failed)println(exc)

Future扩展

更多的实用方法对FuturesAPI进行了扩展支持,这将为很多外部框架提供更多专业工具。

阻塞

future一般都是异步的,不会阻塞潜在的执行线程。然而,在某些情况下,阻塞是必要的。我们区分了两种阻塞执行线程的形式:future内的阻塞,以及future外的阻塞,等待直到future完成。

Future内的阻塞

Asseenwiththeglobal
ExecutionContext
,itispossibletonotifyan
ExecutionContext
ofablockingcallwiththe
blocking
construct.Theimplementationishoweveratthecompletediscretionofthe
ExecutionContext
.Whilesome
ExecutionContext
suchas
ExecutionContext.global
implement
blocking
bymeansofa
ManagedBlocker
,someexecutioncontextssuchasthefixedthreadpool:

正如全局ExecutionContext,它可以通知一个具有阻塞结构的阻塞调用的ExecutionContext。然而,实现是很慎重的。当某些ExecutionContext通过ManagedBlocker实现阻塞,一些执行上下文,如固定的线程池:

[code]ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))

下面代码将什么都不做:


[code]implicitvalec=ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
Future{
blocking{blockingStuff()}
}

下面效果一样:


[code]Future{blockingStuff()}

阻塞的代码也可能抛出异常。在这种情况下,这个异常会转发给调用者。

future外阻塞

正如前面所说,在future上阻塞是不鼓励的,因为会出现性能和死锁。回调(Callbacks)和组合器(combinators)才是首选方法,但在某些情况中阻塞也是需要的,并且Futures和PromisesAPI也支持。

在之前的并发交易例子中,在最后有一处用到阻塞来确定是否所有的futures都已完成。下面是如何使用block来处理一个future结果的例子:


[code]importscala.concurrent._
importscala.concurrent.duration._
defmain(args:Array[String]){
valrateQuote=Future{
connection.getCurrentValue(USD)
}
valpurchase=rateQuotemap{quote=>
if(isProfitable(quote))connection.buy(amount,quote)
elsethrownewException("notprofitable")
}
Await.result(purchase,0nanos)
}


在这种情况下,future失败了,调用者转发出了该future失败的异常。它包含了失败的投影(projection)——阻塞该结果,将会造成一个NoSuchElementException异常,若原future对象被成功完成。

相反的,调用
Await.ready
来等待,知道这个future完成,但获不到结果。同样,如果future失败,调用不会抛出异常的方法。

Futuretrait用
ready()
result()
方法实现了Awaitabletrait。这些方法不能被客户端直接调用——它们只能被执行上下文调用。

异常

当异步计算抛出未处理的异常时,与那些计算相关的futures就失败了。失败的futures存储了一个Throwable的实例,而不是返回值。Futures提供onFailure回调方法,它用一个PartialFunction去表示一个Throwable。下列特殊异常的处理方式不同:

scala.runtime.NonLocalReturnControl[_]
–此异常保存了一个与返回相关联的值。通常情况下,在方法体中的返回结构被调用去抛出这个异常。相关联的值将会存储到future或一个promise中,而不是一直保存在这个异常中。

ExecutionException-当因为一个未处理的中断异常、错误或者
scala.util.control.ControlThrowable
导致计算失败时会被存储起来。这种情况下,ExecutionException会为此具有未处理的异常。这些异常会在执行失败的异步计算线程中重新抛出。这样做的目的,是为了防止正常情况下没有被客户端代码处理过的那些关键的、与控制流相关的异常继续传播下去,同时告知客户端其中的future对象是计算失败的。

更精确的语义描述请参见[NonFatal]。

Promises

到目前为止,我们仅考虑了通过异步计算的方式创建future对象来使用future的方法。尽管如此,futures也可以使用promises来创建。
如果说futures是为了一个还没有存在的结果,而当成一种只读占位符的对象类型去创建,那么promise就被认为是一个可写的,可以实现一个future的单一赋值容器。这就是说,promise通过这种success方法可以成功去实现一个带有值的future。相反的,因为一个失败的promise通过failure方法就会实现一个带有异常的future。
一个promisep通过p.future方式返回future。这个futrue对象被指定到promisep。根据这种实现方式,可能就会出现p.future与p相同的情况。
考虑下面的生产者-消费者的例子,其中一个计算产生一个值,并把它转移到另一个使用该值的计算。这个传递中的值通过一个promise来完成。

[code]importscala.concurrent.{future,promise}
importscala.concurrent.ExecutionContext.Implicits.global
valp=promise[T]
valf=p.future
valproducer=future{
valr=produceSomething()
psuccessr
continueDoingSomethingUnrelated()
}
valconsumer=future{
startDoingSomething()
fonSuccess{
caser=>doSomethingWithResult()
}
}


这里,我们创建了一个promise,并利用它的future方法获得一个Future。然后,开始两个异步计算。第一种,做些计算,将结果放在r中,通过执行promisep,这个值被用来完成future对象f。第二个,也做某写计算,然后读取实现了futuref的计算结果值r。注意,在生产者完成执行
continueDoingSomethingUnrelated()
方法之前,消费者可以获得这个结果值。

正如前面提到的,promises具有单赋值语义。因此,它们仅能被实现一次。在一个已经计算完成的promise或者failed的promise上调用success方法将会抛出一个IllegalStateException异常。

下面例子显示了如何失败一个promise。


[code]valp=promise[T]
valf=p.future
valproducer=future{
valr=someComputation
if(isInvalid(r))
pfailure(newIllegalStateException)
else{
valq=doSomeMoreComputation(r)
psuccessq
}
}


如上,生产者计算出一个中间结果r,并判断它的有效性。如果无效,它会通过返回一个异常实现promisep的方式失败这个promise,关联的futuref是failed。否则,生产者会继续它的计算,最终使用一个有效的结果值实现futuref,同时实现promisep。

Promises也能通过一个complete方法来实现,这个方法采用了一个
potentialvalueTry[T]
,这个值要么是一个类型为
Failure[Throwable]
的失败结果,要么是一个类型为
Success[T]
的成功结果。

类似success方法,在一个已经完成的promise对象上调用failure和complete方法同样会抛出一个IllegalStateException异常。

前面所述的promises和futures方法的一个优点是,这些方法是单一操作的,并且是没有副作用(side-effects)的,因此程序是确定性的(deterministic)。确定性意味着,如果该程序没有抛出异常(future的计算值被获得),无论并行的程序如何调度,那么程序的结果将会永远是一样的。

在一些情况下,客户端也许希望能够只在promie没有完成的情况下完成该promise的计算(例如,如果有多个HTTP请求被多个不同的futures对象来执行,并且客户端只关心一个HTTP应答,该应答对应于第一个完成该promise的future)。因此,future提供了tryComplete,trySuccess和tryFailure方法。客户端需要意识到调用这些的结果是不确定的,调用的结果将依赖程序的调度。

completeWith方法将用另外一个future完成promise计算。当该future结束的时候,该promise对象得到那个future对象同样的值,如下的程序将打印1:


[code]valf=future{1}
valp=promise[Int]
pcompleteWithf
p.futureonSuccess{
casex=>println(x)
}


当让一个promise以异常失败的时候,三种子类型的Throwable异常被分别的处理。如果中断该promise的可抛出(Throwable)一场是
scala.runtime.NonLocalReturnControl
,那么该promise将以对应的值结束;如果是一个Error的实例,
InterruptedException
或者
scala.util.control.ControlThrowable
,那么该可抛出(Throwable)异常将会封装一个ExecutionException异常,该ExectionException将会让该promise以失败结束。

通过使用promises,futures的onComplete方法和future的构造方法,你能够实现前文描述的任何函数式竞争组合器(compitioncombinators)。假设,你想实现一个新的组合器,该组合器首先使用两个future对象f和g,并产生第三个future,该future是通过f或者g完成的,但只在成功完成的情况下。

下面是关于这个的例子:


[code]deffirst[T](f:Future[T],g:Future[T]):Future[T]={
valp=promise[T]
fonSuccess{
casex=>p.trySuccess(x)
}
gonSuccess{
casex=>p.trySuccess(x)
}
p.future

注意,在这种实现方式中,如果f与g都不成功,那么
first(f,g)
将永远不会完成(即返回一个值或者返回一个异常)。

工具(Utilities)

为了简化在并发应用中处理时序的问题,
scala.concurrent
引入了Duration抽象。Duration不是被作为另外一个通常的时间抽象存在的。他是为了用在并发库中,Duration位于
scala.concurrent
包中。

Duration是表示时间长短的基础类,其可以是有限的或者无限的。有限的duration用FiniteDuration类来表示,并通过时间长度
(length)
java.util.concurrent.TimeUnit
来构造。无限的durations,同样扩展了Duration,只在两种情况下存在,
Duration.Inf
Duration.MinusInf
。库中同样提供了一些Durations的子类用来做隐式的转换,这些子类不应被直接使用。

抽象的Duration类包含了如下方法:

转换到不同的四件单位(toNanos,toMicros,toMillis,toSeconds,toMinutes,toHours,toDaysandtoUnit(unit:TimeUnit));
durations比较(<,<=,>和>=);
算术运算符(+,-,*,/和unary_-);
Minimumandmaximumbetweenthisdurationandtheonesuppliedintheargument(min,max).
检查duration是否无限(isFinite)。

Duration能够用如下方法实例化(
instantiated):


隐式的通过Int和Long类型转换得来
vald=100millis

通过传递一个
Longlength
java.util.concurrent.TimeUnit
。例如
vald=Duration(100,MILLISECONDS)

通过传递一个字符串来表示时间区间,例如
vald=Duration("1.2µs")


Duration也提供了unapply方法,可以被用于模式匹配构造(patternmatchingconstructs)中,例如:


[code]importscala.concurrent.duration._
importjava.util.concurrent.TimeUnit._
//instantiation
vald1=Duration(100,MILLISECONDS)//fromLongandTimeUnit
vald2=Duration(100,"millis")//fromLongandString
vald3=100millis//implicitlyfromLong,IntorDouble
vald4=Duration("1.2µs")//fromString
//patternmatching
valDuration(length,unit)=5millis

参考资料

ScalaGuidesandOverviews
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐
章节导航