您的位置:首页 > 其它

Akka学习笔记03--计算多个句子中每个单词出现的次数

2015-03-27 17:08 337 查看
***********

(暂时粗略写下,用空扩充。。)

用到的主要方法(可参考:Akka学习笔记06--Actor的消息):

1.tell,!。

2.ask,?。

3.sender。

4.RoundRobinRouter,如代码段:

val reduceActor=context.actorOf(Props[ReduceActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "reduce")



****

代码:

********************************************************************************************************************

package com.akka.mapreduce

import akka.actor.{ActorSystem, Actor, Props}

import akka.routing.RoundRobinRouter

import scala.collection.mutable.HashMap

import scala.concurrent.Await

import scala.collection.immutable.Map

import scala.collection.mutable.ArrayBuffer

import akka.actor.actorRef2Scala

import akka.pattern.ask

import akka.util.Timeout

/**

* Created by laiwenqiang on 15/2/2.

*/

sealed trait MapReduceMessage

case class WordCount(word: String, index: Int) extends MapReduceMessage

case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage

case class ReduceData(reduceDataList: Map[String, Int]) extends MapReduceMessage

case class Result() extends MapReduceMessage

object MapReduceApplication extends App{

val _system = ActorSystem("MapReduceApp")

val master = _system.actorOf(Props[MasterActor], name = "master")

implicit val timeout = Timeout(5000)

master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"

master ! "Dog is man's best friend"

master ! "Dog and Fox belong to the same family"

Thread.sleep(500)

val future = (master ? Result).mapTo[String]

val result = Await.result(future, timeout.duration)

println(result)

_system.shutdown

}

class MasterActor extends Actor {

val mapActor = context.actorOf(Props[MapActor].withRouter(

RoundRobinRouter(nrOfInstances = 5)), name = "map")

val reduceActor = context.actorOf(Props[ReduceActor].withRouter(

RoundRobinRouter(nrOfInstances = 5)), name = "reduce")

val aggregateActor = context.actorOf(Props[AggregateActor],

name = "aggregate")

def receive: Receive = {

case line: String =>

mapActor ! line

case mapData: MapData =>

reduceActor ! mapData

case reduceData: ReduceData =>

aggregateActor ! reduceData

case Result =>

aggregateActor forward Result

}

}

class MapActor extends Actor {

val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as",

"at", "be", "do", "go", "if", "in", "is", "it", "of", "on", "the",

"to")

val defaultCount: Int = 1

def receive: Receive = {

case message: String =>

sender ! evaluateExpression(message)

}

def evaluateExpression(line: String): MapData = MapData {

line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {

(index, word) =>

if(!STOP_WORDS_LIST.contains(word.toLowerCase))

index += WordCount(word.toLowerCase, 1)

else index

} }

}

class ReduceActor extends Actor {

def receive: Receive = {

case MapData(dataList) =>

sender ! reduce(dataList)

}

def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {

words.foldLeft(Map.empty[String, Int]) { (index, words) =>

if (index contains words.word)

index + (words.word -> (index.get(words.word).get + 1))

else

index + (words.word -> 1)

}

} }

class AggregateActor extends Actor {

val finalReducedMap = new HashMap[String, Int]

def receive: Receive = {

case ReduceData(reduceDataMap) =>

aggregateInMemoryReduce(reduceDataMap)

case Result =>

sender ! finalReducedMap.toString()

}

def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {

for ((key,value) <- reducedList) {

if (finalReducedMap contains key)

finalReducedMap(key) = (value + finalReducedMap.get(key).get)

else

finalReducedMap += (key -> value)

}

}

}

***************************************************************************************
输出结果:Map(dog -> 4, lazy -> 1, jump -> 1, fell -> 1, man's -> 1, family -> 1, belong -> 1, over -> 1, friend -> 1, fox -> 2, tried -> 1, quick -> 1, same -> 1, best -> 1, brown -> 1)

tips:参考于《Akka.Essentials》第二章。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: