Akka学习笔记03--计算多个句子中每个单词出现的次数
2015-03-27 17:08
337 查看
***********
(暂时粗略写下,用空扩充。。)
用到的主要方法(可参考:Akka学习笔记06--Actor的消息):
1.tell,!。
2.ask,?。
3.sender。
4.RoundRobinRouter,如代码段:
val reduceActor=context.actorOf(Props[ReduceActor].withRouter(
****
代码:
********************************************************************************************************************
package com.akka.mapreduce
import akka.actor.{ActorSystem, Actor, Props}
import akka.routing.RoundRobinRouter
import scala.collection.mutable.HashMap
import scala.concurrent.Await
import scala.collection.immutable.Map
import scala.collection.mutable.ArrayBuffer
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.util.Timeout
/**
* Created by laiwenqiang on 15/2/2.
*/
sealed trait MapReduceMessage
case class WordCount(word: String, index: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataList: Map[String, Int]) extends MapReduceMessage
case class Result() extends MapReduceMessage
object MapReduceApplication extends App{
val _system = ActorSystem("MapReduceApp")
val master = _system.actorOf(Props[MasterActor], name = "master")
implicit val timeout = Timeout(5000)
master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
master ! "Dog is man's best friend"
master ! "Dog and Fox belong to the same family"
Thread.sleep(500)
val future = (master ? Result).mapTo[String]
val result = Await.result(future, timeout.duration)
println(result)
_system.shutdown
}
class MasterActor extends Actor {
val mapActor = context.actorOf(Props[MapActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "map")
val reduceActor = context.actorOf(Props[ReduceActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "reduce")
val aggregateActor = context.actorOf(Props[AggregateActor],
name = "aggregate")
def receive: Receive = {
case line: String =>
mapActor ! line
case mapData: MapData =>
reduceActor ! mapData
case reduceData: ReduceData =>
aggregateActor ! reduceData
case Result =>
aggregateActor forward Result
}
}
class MapActor extends Actor {
val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as",
"at", "be", "do", "go", "if", "in", "is", "it", "of", "on", "the",
"to")
val defaultCount: Int = 1
def receive: Receive = {
case message: String =>
sender ! evaluateExpression(message)
}
def evaluateExpression(line: String): MapData = MapData {
line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {
(index, word) =>
if(!STOP_WORDS_LIST.contains(word.toLowerCase))
index += WordCount(word.toLowerCase, 1)
else index
} }
}
class ReduceActor extends Actor {
def receive: Receive = {
case MapData(dataList) =>
sender ! reduce(dataList)
}
def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {
words.foldLeft(Map.empty[String, Int]) { (index, words) =>
if (index contains words.word)
index + (words.word -> (index.get(words.word).get + 1))
else
index + (words.word -> 1)
}
} }
class AggregateActor extends Actor {
val finalReducedMap = new HashMap[String, Int]
def receive: Receive = {
case ReduceData(reduceDataMap) =>
aggregateInMemoryReduce(reduceDataMap)
case Result =>
sender ! finalReducedMap.toString()
}
def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {
for ((key,value) <- reducedList) {
if (finalReducedMap contains key)
finalReducedMap(key) = (value + finalReducedMap.get(key).get)
else
finalReducedMap += (key -> value)
}
}
}
***************************************************************************************
输出结果:Map(dog -> 4, lazy -> 1, jump -> 1, fell -> 1, man's -> 1, family -> 1, belong -> 1, over -> 1, friend -> 1, fox -> 2, tried -> 1, quick -> 1, same -> 1, best -> 1, brown -> 1)
tips:参考于《Akka.Essentials》第二章。
(暂时粗略写下,用空扩充。。)
用到的主要方法(可参考:Akka学习笔记06--Actor的消息):
1.tell,!。
2.ask,?。
3.sender。
4.RoundRobinRouter,如代码段:
val reduceActor=context.actorOf(Props[ReduceActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "reduce")
****
代码:
********************************************************************************************************************
package com.akka.mapreduce
import akka.actor.{ActorSystem, Actor, Props}
import akka.routing.RoundRobinRouter
import scala.collection.mutable.HashMap
import scala.concurrent.Await
import scala.collection.immutable.Map
import scala.collection.mutable.ArrayBuffer
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.util.Timeout
/**
* Created by laiwenqiang on 15/2/2.
*/
sealed trait MapReduceMessage
case class WordCount(word: String, index: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataList: Map[String, Int]) extends MapReduceMessage
case class Result() extends MapReduceMessage
object MapReduceApplication extends App{
val _system = ActorSystem("MapReduceApp")
val master = _system.actorOf(Props[MasterActor], name = "master")
implicit val timeout = Timeout(5000)
master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
master ! "Dog is man's best friend"
master ! "Dog and Fox belong to the same family"
Thread.sleep(500)
val future = (master ? Result).mapTo[String]
val result = Await.result(future, timeout.duration)
println(result)
_system.shutdown
}
class MasterActor extends Actor {
val mapActor = context.actorOf(Props[MapActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "map")
val reduceActor = context.actorOf(Props[ReduceActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)), name = "reduce")
val aggregateActor = context.actorOf(Props[AggregateActor],
name = "aggregate")
def receive: Receive = {
case line: String =>
mapActor ! line
case mapData: MapData =>
reduceActor ! mapData
case reduceData: ReduceData =>
aggregateActor ! reduceData
case Result =>
aggregateActor forward Result
}
}
class MapActor extends Actor {
val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as",
"at", "be", "do", "go", "if", "in", "is", "it", "of", "on", "the",
"to")
val defaultCount: Int = 1
def receive: Receive = {
case message: String =>
sender ! evaluateExpression(message)
}
def evaluateExpression(line: String): MapData = MapData {
line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {
(index, word) =>
if(!STOP_WORDS_LIST.contains(word.toLowerCase))
index += WordCount(word.toLowerCase, 1)
else index
} }
}
class ReduceActor extends Actor {
def receive: Receive = {
case MapData(dataList) =>
sender ! reduce(dataList)
}
def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {
words.foldLeft(Map.empty[String, Int]) { (index, words) =>
if (index contains words.word)
index + (words.word -> (index.get(words.word).get + 1))
else
index + (words.word -> 1)
}
} }
class AggregateActor extends Actor {
val finalReducedMap = new HashMap[String, Int]
def receive: Receive = {
case ReduceData(reduceDataMap) =>
aggregateInMemoryReduce(reduceDataMap)
case Result =>
sender ! finalReducedMap.toString()
}
def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {
for ((key,value) <- reducedList) {
if (finalReducedMap contains key)
finalReducedMap(key) = (value + finalReducedMap.get(key).get)
else
finalReducedMap += (key -> value)
}
}
}
***************************************************************************************
输出结果:Map(dog -> 4, lazy -> 1, jump -> 1, fell -> 1, man's -> 1, family -> 1, belong -> 1, over -> 1, friend -> 1, fox -> 2, tried -> 1, quick -> 1, same -> 1, best -> 1, brown -> 1)
tips:参考于《Akka.Essentials》第二章。
相关文章推荐
- Akka学习笔记03--计算多个句子中每个单词出现的次数
- Python实现计算一段文本中每个单词出现的次数
- java小算法—统计句子中每个单词出现的次数
- Python实现计算一段文本中每个单词出现的次数
- Count words and letters-计算用户输入一行文本中的单词数和每个字母出现次数
- 计算出一字符串(字符串中每个单词之间有一个或多个空格)中每个单词的 出现的次数
- 请实现一个函数能够计算一段文本中每个单词出现的次数。
- Java统计英文句子中出现次数最多的单词并计算出现次数的方法
- java 在一段英文文本中计算每个单词出现的次数
- 计算每个字符出现的次数(两种方法)
- 学习笔记二--如何:查询包含一组指定单词的句子 (LINQ)
- 读取txt文档中单词,并计算单词出现的次数(英文文档)
- 黑马程序员——统计文件中每个英文单词出现的次数
- 计算一串数字中每个数字出现的次数
- 计算文件中所有单词出现的次数
- 读取一个.txt文件并计数每个单词出现的次数
- 统计输出某个文件中每个单词出现的次数
- 统计文本中每个单词的序列 和 出现次数
- 计算数组中任意一个单词出现的次数(HashMap)
- 投掷100次掷子计算每个数出现的次数 C#源码案例 附第二种简单解法