您的位置:首页 > Web前端

Scala(50)- scalaz-stream: 资源使用安全-Resource Safety

2016-07-26 13:47 405 查看




val src = Process.emitAll(Seq("a","b","c")).toSource //> p  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Emit(List(a, b, c))
val p1 = src.onComplete{Process.suspend{println("---RUN CLEANUP---");Process.halt}}
//> p1  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Append(Em
//正常终止                                         //| it(List(a, b, c)),Vector(<function1>))
p1.runLog.run                                     //> ---RUN CLEANUP---
//提前强制终止                                      //| res0: Vector[String] = Vector(a, b, c)
p1.take(2).runLog.run                             //> ---RUN CLEANUP---
//异常终止                                         //| res1: Vector[String] = Vector(a, b)
p1.map{_.toDouble}.runLog.run                     //> ---RUN CLEANUP---
//| java.lang.NumberFormatException: For input string: "a"


* Run `p2` after this `Process` completes normally, or in the event of an error.
* This behaves almost identically to `append`, except that `p1 append p2` will
* not run `p2` if `p1` halts with an `Error` or is killed. Any errors raised by
* `this` are reraised after `p2` completes.
* Note that `p2` is made into a finalizer using `asFinalizer`, so we
* can be assured it is run even when this `Process` is being killed
* by a downstream consumer.
final def onComplete[F2[x] >: F[x], O2 >: O](p2: => Process[F2, O2]): Process[F2, O2] =
this.onHalt { cause => p2.asFinalizer.causedBy(cause) }

* When this `Process` halts, call `f` to produce the next state.
* Note that this function may be used to swallow or handle errors.
final def onHalt[F2[x] >: F[x], O2 >: O](f: Cause => Process[F2, O2]): Process[F2, O2] = {
val next = (t: Cause) => Trampoline.delay(Try(f(t)))
this match {
case (append: Append[F2, O2] @unchecked) => Append(append.head, append.stack :+ next)
case emt@Emit(_)        => Append(emt, Vector(next))
case awt@Await(_, _, _) => Append(awt, Vector(next))
case hlt@Halt(rsn)      => Append(hlt, Vector(next))

* Mostly internal use function. Ensures this `Process` is run even
* when being `kill`-ed. Used to ensure resource safety in various
* combinators.
final def asFinalizer: Process[F, O] = {
def mkAwait[F[_], A, O](req: F[A], cln: A => Trampoline[Process[F,Nothing]])(rcv: EarlyCause \/ A => Trampoline[Process[F, O]]) = Await(req, rcv,cln)
step match {
case Step(e@Emit(_), cont) => e onHalt {
case Kill => (halt +: cont).asFinalizer.causedBy(Kill)
case cause => (Halt(cause) +: cont).asFinalizer
case Step(Await(req, rcv, cln), cont) => mkAwait(req, cln) {
case -\/(Kill) => Trampoline.delay(Await(req, rcv, cln).asFinalizer.causedBy(Kill))
case x => rcv(x).map(p => (p +: cont).asFinalizer)
case hlt@Halt(_) => hlt


(p1 |> process1.filter(_ == true) |> process1.take(10)).runLog.run
//> ---RUN CLEANUP---
//| res3: Vector[String] = Vector()
(p1 |> process1.take(2)).runLog.run               //> ---RUN CLEANUP---
//| res4: Vector[String] = Vector(a, b)
(p1 |> process1.id.map{_.toUpperCase} |> process1.take(2)).runLog.run
//> ---RUN CLEANUP---
//| res5: Vector[String] = Vector(A, B)
(p1 |> process1.id.map{_.toDouble}).runLog.run    //> ---RUN CLEANUP---
//| java.lang.NumberFormatException: For input string: "a"


* Feed the output of this `Process` as input of `p1`. The implementation
* will fuse the two processes, so this process will only generate
* values as they are demanded by `p1`. If `p1` signals termination, `this`
* is killed with same reason giving it an opportunity to cleanup.
final def pipe[O2](p1: Process1[O, O2]): Process[F, O2] =
p1.suspendStep.flatMap({ s1 =>
s1 match {
case s@Step(awt1@Await1(rcv1), cont1) =>
val nextP1 = s.toProcess
this.step match {
case Step(awt@Await(_, _, _), cont) => awt.extend(p => (p +: cont) pipe nextP1)
case Step(Emit(os), cont)           => cont.continue pipe process1.feed(os)(nextP1)
case hlt@Halt(End)                  => hlt pipe nextP1.disconnect(Kill).swallowKill
case hlt@Halt(rsn: EarlyCause)      => hlt pipe nextP1.disconnect(rsn)

case Step(emt@Emit(os), cont)      =>
// When the pipe is killed from the outside it is killed at the beginning or after emit.
// This ensures that Kill from the outside is not swallowed.
emt onHalt {
case End => this.pipe(cont.continue)
case early => this.pipe(Halt(early) +: cont).causedBy(early)

case Halt(rsn)           => this.kill onHalt { _ => Halt(rsn) }

/** Operator alias for `pipe`. */
final def |>[O2](p2: Process1[O, O2]): Process[F, O2] = pipe(p2)


val fileLines = io.linesR(s"/Users/TraverseUsage.scala")
//> fileLines  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@6279cee3,<function1>,<function1>)
val lns = fileLines.onComplete(Process.eval[Task,String]{Task.delay{println("--FILE CLOSED--");""}})
//> lns  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Append(Await(scalaz.concurrent.Task@6279cee3,<function1>,<function1>),Vector(<function1>))

lns.take(3).runLog.run                            //> --FILE CLOSED--
//| res6: Vector[String] = Vector(package scalaz.example, "", object TraverseUsage extends App {)
lns.map {_.toDouble}.runLog.run                   //> --FILE CLOSED--
//| java.lang.NumberFormatException: empty String caused by: java.lang.NumberFormatException: For input string: "package scalaz.example"


* Creates a `Process[Task,String]` from the lines of a file, using
* the `iteratorR` combinator to ensure the file is closed
* when processing the stream of lines is finished.
def linesR(filename: String)(implicit codec: Codec): Process[Task,String] =

* Creates a `Process[Task,String]` from the lines of the `InputStream`,
* using the `iteratorR` combinator to ensure the `InputStream` is closed
* when processing the stream of lines is finished.
def linesR(in: => InputStream)(implicit codec: Codec): Process[Task,String] =

* Creates a `Process[Task,String]` from the lines of the `Source`,
* using the `iteratorR` combinator to ensure the `Source` is closed
* when processing the stream of lines is finished.
def linesR(src: => Source): Process[Task,String] = {
iteratorR(Task.delay(src))(src => Task.delay(src.close()))(r => Task.delay(r.getLines()))


* Create a Process from an iterator that is tied to some resource,
* `R` (like a file handle) that we want to ensure is released.
* See `linesR` for an example use.
* @param req acquires the resource
* @param release releases the resource
* @param mkIterator creates the iterator from the resource
* @tparam R is the resource
* @tparam O is a value in the iterator
* @return
def iteratorR[R, O](req: Task[R])(
release: R => Task[Unit])(
mkIterator: R => Task[Iterator[O]]): Process[Task, O] = {
bracket[Task, R, O](req)(r => Process.eval_(release(r)))(r => iterator(mkIterator(r)) )


val iterLines =
src => Task.delay{src.close()})(
r => Task.delay{r.getLines()})    //> iterLines  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1a0dcaa,<function1>,<function1>)
iterLines.take(5).runLog.run                      //> res7: Vector[String] = Vector(package scalaz.example, "", object TraverseUsage extends App {, "  import scalaz._", "")



* Resource and preemption safe `await` constructor.
* Use this combinator, when acquiring resources. This build a process that when run
* evaluates `req`, and then runs `rcv`. Once `rcv` is completed, fails, or is interrupted, it will run `release`
* When the acquisition (`req`) is interrupted, neither `release` or `rcv` is run, however when the req was interrupted after
* resource in `req` was acquired then, the `release` is run.
* If,the acquisition fails, use `bracket(req)(onPreempt)(rcv).onFailure(err => ???)` code to recover from the
* failure eventually.
def bracket[F[_], A, O](req: F[A])(release: A => Process[F, Nothing])(rcv: A => Process[F, O]): Process[F, O] = {
{ (r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(Halt.apply, a => rcv(a) onComplete release(a) ))) },
{ a: A => Trampoline.delay(release(a)) })


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息