Akka并发编程——2、Actor模型(二)
2017-05-05 11:17
549 查看
摘要: 本节主要内容:
Actor API解析 1. Actor API解析 Actor中的主要成员变量和方法定义如下: package akka.actor trait Actor extends scala.AnyRef { type Receive = akka.actor.Actor.Receive //context变量暴露当前Actor的上下文信息
本节主要内容:
Actor
API解析
Actor中的主要成员变量和方法定义如下:
上面代码可以看出:Hook方法中,有四个子方法。这里主要关注前两个。
/*
*Actor API: Hook方法
*/
object Example_05 extends App{
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class FirstActor extends Actor with ActorLogging{
//通过context.actorOf方法创建Actor
var child:ActorRef = _
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通过context上下文创建Actor
child = context.actorOf(Props[MyActor], name = "myChild")
}
def receive = {
//向MyActor发送消息
case x => child ! x;log.info("received "+x)
}
//Hook方法,postStop(),Actor停止之后调用
override def postStop(): Unit = {
log.info("postStop() in FirstActor")
}
}
class MyActor extends Actor with ActorLogging{
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in MyActor")
}
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
//Hook方法,postStop(),Actor停止之后调用
override def postStop(): Unit = {
log.info("postStop() in MyActor")
}
}
val system = ActorSystem("MyActorSystem")
val systemLog=system.log
//创建FirstActor对象
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
systemLog.info("准备向myactor发送消息")
//向myactor发送消息
myactor!"test"
myactor! 123
Thread.sleep(5000)
//关闭ActorSystem,停止程序的运行
system.shutdown()
}
代码运行结果:
[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4]
其中 FirstActor为顶级Actor,MyActor为FirstActor的子进程:
class FirstActor extends Actor with ActorLogging{
//通过context.actorOf方法创建Actor
var child:ActorRef = _
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通过context上下文创建Actor
child = context.actorOf(Props[MyActor], name = "myChild")
}
def receive = {
//向MyActor发送消息
case x => child ! x;log.info("received "+x)
}
//Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作
override def postStop(): Unit = {
log.info("postStop() in FirstActor")
}
}
对preStart()和postStop()方法进行了重写。在preStart方法中通过代码:
child = context.actorOf(Props[MyActor], name = "myChild")
对成员变量child进行初始化,然后在postStop方法中使用
//通过context上下文停止MyActor的运行
context.stop(child)
来停止MyActor的运行。
此外,对创建FirstActor的时候:
//创建FirstActor对象
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")会首先调用其preStart方法,而其preStart方法除了打印日志: "preStart()
in FirstActor"外,还创建了其子Actor :MyActor。
在创建MyActor的过程中,也调用其preStart方法。
注意,最后关闭的时候:
//关闭ActorSystem,停止程序的运行
system.shutdown()
FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。
运行结果:
[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3]
代码:
中使用
self!"message from self reference"即向自己发送了一条消息,receive方法通过下面这条语句对这条消息进行处理。
case "message from self reference"=>log.info("message from self refrence")而在处理
case "test" => log.info("received test");sender()!"message from MyActor"的时候,sender()是FirstActor,即向FirstActor发送消息:“message
from MyActor”。
注意:有的朋友可能会问,为什么最后会有两个received unknown message?
其中一个是myActor!123,还有一个是由sender()传给FirstActor的,即把由myActor传过来的“message
from MyActor”经由FirstActor再次传递给MyActor。
unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码
这里不再赘述,详情请见:Akka阿里云
Actor API解析 1. Actor API解析 Actor中的主要成员变量和方法定义如下: package akka.actor trait Actor extends scala.AnyRef { type Receive = akka.actor.Actor.Receive //context变量暴露当前Actor的上下文信息
本节主要内容:
Actor
API解析
Actor API解析
Actor中的主要成员变量和方法定义如下:package akka.actor trait Actor extends scala.AnyRef { type Receive = akka.actor.Actor.Receive //context变量暴露当前Actor的上下文信息及当前消息 implicit val context : akka.actor.ActorContext = { /* compiled code */ } //self作为当前ActorRef的引用 implicit final val self : akka.actor.ActorRef = { /* compiled code */ } //当前Actor接收到最后一条消息对应的消息发送者(Actor) final def sender() : akka.actor.ActorRef = { /* compiled code */ } //receive方法,抽象方法,定义Actor的行为逻辑 def receive : akka.actor.Actor.Receive //内部使用API protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ } protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ } //监督策略,用于Actor容错处理 def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ } //Hook方法,用于Actor生命周期监控 @scala.throws[T](classOf[scala.Exception]) def preStart() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postStop() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ } //发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法 def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } } object Actor extends scala.AnyRef { type Receive = scala.PartialFunction[scala.Any, scala.Unit] //空的行为逻辑 @scala.SerialVersionUID(1) object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive { def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ } def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ } } //Sender为null @scala.SerialVersionUID(1) final val noSender : akka.actor.ActorRef = { /* compiled code */ } }
(1) Hook方法中,preStart()、postStop()方法的使用
上面代码可以看出:Hook方法中,有四个子方法。这里主要关注前两个。/*
*Actor API: Hook方法
*/
object Example_05 extends App{
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class FirstActor extends Actor with ActorLogging{
//通过context.actorOf方法创建Actor
var child:ActorRef = _
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通过context上下文创建Actor
child = context.actorOf(Props[MyActor], name = "myChild")
}
def receive = {
//向MyActor发送消息
case x => child ! x;log.info("received "+x)
}
//Hook方法,postStop(),Actor停止之后调用
override def postStop(): Unit = {
log.info("postStop() in FirstActor")
}
}
class MyActor extends Actor with ActorLogging{
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in MyActor")
}
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
//Hook方法,postStop(),Actor停止之后调用
override def postStop(): Unit = {
log.info("postStop() in MyActor")
}
}
val system = ActorSystem("MyActorSystem")
val systemLog=system.log
//创建FirstActor对象
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
systemLog.info("准备向myactor发送消息")
//向myactor发送消息
myactor!"test"
myactor! 123
Thread.sleep(5000)
//关闭ActorSystem,停止程序的运行
system.shutdown()
}
代码运行结果:
[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2]
[akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2]
[akka://MyActorSystem/user/firstActor/myChild] received test [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2]
[akka://MyActorSystem/user/firstActor/myChild] received unknown message [INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor [INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3]
[akka://MyActorSystem/user/firstActor] postStop() in FirstActor
其中 FirstActor为顶级Actor,MyActor为FirstActor的子进程:
class FirstActor extends Actor with ActorLogging{
//通过context.actorOf方法创建Actor
var child:ActorRef = _
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通过context上下文创建Actor
child = context.actorOf(Props[MyActor], name = "myChild")
}
def receive = {
//向MyActor发送消息
case x => child ! x;log.info("received "+x)
}
//Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作
override def postStop(): Unit = {
log.info("postStop() in FirstActor")
}
}
对preStart()和postStop()方法进行了重写。在preStart方法中通过代码:
child = context.actorOf(Props[MyActor], name = "myChild")
对成员变量child进行初始化,然后在postStop方法中使用
//通过context上下文停止MyActor的运行
context.stop(child)
来停止MyActor的运行。
此外,对创建FirstActor的时候:
//创建FirstActor对象
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")会首先调用其preStart方法,而其preStart方法除了打印日志: "preStart()
in FirstActor"外,还创建了其子Actor :MyActor。
在创建MyActor的过程中,也调用其preStart方法。
注意,最后关闭的时候:
//关闭ActorSystem,停止程序的运行
system.shutdown()
FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。
(2) 成员变量self及成员方法sender方法的使用
/* *Actor API:成员变量self及sender()方法的使用 */ object Example_05 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ //通过context.actorOf方法创建Actor var child:ActorRef = _ override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myActor") } def receive = { //向MyActor发送消息 case x => child ! x;log.info("received "+x) } } class MyActor extends Actor with ActorLogging{ self!"message from self reference" def receive = { case "test" => log.info("received test");sender()!"message from MyActor" case "message from self reference"=>log.info("message from self refrence") case _ => log.info("received unknown message"); } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
运行结果:
[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3]
[akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3]
[akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3]
[akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor/myActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor/myActor] message from self refrence [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3]
[akka://MyActorSystem/user/firstActor] received message from MyActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4]
[akka://MyActorSystem/user/firstActor/myActor] received unknown message
代码:
class MyActor extends Actor with ActorLogging{ self!"message from self reference" def receive = { case "test" => log.info("received test");sender()!"message from MyActor" case "message from self reference"=>log.info("message from self refrence") case _ => log.info("received unknown message"); } }
中使用
self!"message from self reference"即向自己发送了一条消息,receive方法通过下面这条语句对这条消息进行处理。
case "message from self reference"=>log.info("message from self refrence")而在处理
case "test" => log.info("received test");sender()!"message from MyActor"的时候,sender()是FirstActor,即向FirstActor发送消息:“message
from MyActor”。
注意:有的朋友可能会问,为什么最后会有两个received unknown message?
其中一个是myActor!123,还有一个是由sender()传给FirstActor的,即把由myActor传过来的“message
from MyActor”经由FirstActor再次传递给MyActor。
(3) unhandled方法的使用
unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码这里不再赘述,详情请见:Akka阿里云
相关文章推荐
- Akka并发编程——第三节:Actor模型(二)
- Akka并发编程——4、Actor模型(四)
- Akka并发编程——5、Actor模型(五)
- 高并发分布式事务解决之道-Actor模型(附Akka与Reactor比较)
- Akka并发编程——第五节:Actor模型(四) 停止Actor
- Akka并发编程——第四节:Actor模型(三)
- Akka并发编程——3、Actor模型(三)
- Akka并发编程——第二节:Actor模型(一)
- 高并发分布式事务解决之道-Actor模型(附Akka与Reactor比较)
- Akka并发编程——第五节:Actor模型(四)
- Akka并发编程——第六节:Actor模型(五)
- 分享:Akka 2.1 发布,Actor 并发模型开发库
- Akka并发编程——第八节:Actor模型(七)
- Akka并发编程——第八节:Actor模型(七)
- Akka:Actor并发模型
- Akka并发编程——第二节:Actor模型(一)
- Akka并发编程——1、Actor模型(一)
- Actor并发模型入门
- 通过Actor模型解决C++ 并发编程的一种思维 — Theron 库简述
- Akka2使用探索7——“云计算”示例(Actor、Future、Remoting、Router、Deploy、异步、并发使用Demo)