Akka(一) - akka的wordcount
2016-06-22 11:16
399 查看
1. 启动类
object Application extends App{ val _system = ActorSystem("HelloAkka") //构建akka容器 val master:ActorRef = _system.actorOf(Props[MasterActor],name="master") //akka容器创建actor println("master.path ==>\t"+master.path) //akka://HelloAkka/user/master master ! "hi my name is spark, so happy" master ! "hi my zsh" master ! "xixi" Thread.sleep(1000) master ! new Result Thread.sleep(500) _system.terminate }
2. MasterActor创建map,reduce,aggregate任务的actor
class MasterActor extends Actor{ val aggregateActor:ActorRef = context.actorOf(Props[AggregateActor],name="aggregate") val reduceActor:ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)),name="reduce") val mapActor:ActorRef = context.actorOf(Props(new MapActor(reduceActor)),name="map") println("aggregateActor ==>\t"+aggregateActor.path) //akka://HelloAkka/user/master/aggregate (master的子actor) println("mapActor ==>\t"+mapActor.path) println("reduceActor ==>\t"+reduceActor.path) override def receive: Receive = { // Receive用type重命名的PartialFunction case msg:String => mapActor ! msg case msg:Result => aggregateActor ! msg case _ => } }
3. map任务
class MapActor(var reduceActor: ActorRef)extends Actor{ val STOP_WORDS = List("is","a") override def receive: Receive = { case msg:String => reduceActor ! evlExpression(msg) case _ => } def evlExpression(line:String):MapData = { val dataList = new ArrayBuffer[Word] // scala可变数组 val parser:StringTokenizer = new StringTokenizer(line) while(parser.hasMoreTokens){ val str: String = parser.nextToken() if(!STOP_WORDS.contains(str)){ dataList += (new Word(str,1)) } } new MapData(dataList) }
4. reduce任务
class ReduceActor(var aggregateActor: ActorRef) extends Actor{ override def receive: Receive = { case msg: MapData => aggregateActor ! reduce(msg.dataList) case _ => } def reduce(dataList:ArrayBuffer[Word]) : ReduceData ={ val map = new HashMap[String,Int] for(w:Word <- dataList){ val str: String = w.word map += (str -> map.getOrElse(str,1)) } new ReduceData(map) } }
5. aggregate任务
class AggregateActor extends Actor{ var finalMap = new HashMap[String,Int] override def receive: Receive = { case msg:ReduceData => sum(msg.raduceMap) case msg:Result => println(finalMap) } def sum(map:HashMap[String,Int]){ //多个reduceactor会向aggregateactor发送整理好的map for(tuple <- map){ val c = finalMap.getOrElse(tuple._1,0)+tuple._2 finalMap += (tuple._1 -> c) } } }
6. 用到的实体类
class Word(val word:String,val count:Int) case class Result(); class MapData(val dataList:ArrayBuffer[Word]) class ReduceData(val raduceMap:HashMap[String,Int])
相关文章推荐
- 最左原位
- 使用子查询可提升 COUNT DISTINCT 速度 50 倍
- php数组指针学习笔记(一)
- leetcode——Maximal Rectangle
- Unix传奇
- Appnium移动自动化框架初探
- calc()问题
- Java多线程系列--“基础篇”01之 基本概念
- iOS手势识别的简单应用
- Android网络相关(WiFi的开关,WiFi热点的开关,获取手机IP地址等)
- ehcache.xml配置文件详解
- listview.setselection(position)不起作用
- 使用GSON库转换Java对象为JSON对象的进阶实例详解
- 获取当前url地址和目录不包含访问的文件名
- think in java笔记:this关键字
- BASISI系统中如何配置web service
- JavaScript Promise
- 上帝造题的七分钟 [Tyvj 1716]
- 毕向东Java视频学习笔记【day08-继承】
- dlib landmark+人面识别