您的位置:首页 > 编程语言

scala用actor并发编程写一个单机版的WorldCount(类似Hadoop的MapReduce思想)

2017-12-31 19:58 567 查看
1、准备数据,2个文件

words.txt

内容:

lilei hello

zhangsan hello

lisi hello

苏三 hello

words.log

内容:

lilei hello

zhangsan hello

lisi hello

2、环境Intellj IDEA   scala插件

3、代码

package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

//模式匹配类,用于提交任务
case class SubmitTask(fileName:String)
//单例的模式匹配类,用于停止任务
case object StopTask
//用于收集分组后结果的
case class ResultTask(result:Map[String,Int])
/**
* scala Actor构建在java的线程基础之上的,
* 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法
* 方法执行完毕后,仍然被保留
*/
class Task extends Actor{

override def act(){
loop{//重复执行一个代码块
react{
case SubmitTask(fileName)=>{
val result=Source.fromFile(fileName,"gb2312").getLines()//获取文件,有中文-编码,每一行生成一个List集合
.flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合压缩成一个集合,再切分,再生成map-(“单词”,1)
.groupBy(_._1).mapValues(_.size)//按照key分组,value就是分组后map的数量
sender ! ResultTask(result)//把单个文件的统计结果输出,!代表异步执行
}
case StopTask=>{
exit()
}
}
}
}
}
object WorkCount{
def main(args: Array[String]) {
//要读取的文件
val files=Array("E://words.txt","E://words.log")
val replaySet=new mutable.HashSet[Future[Any]]
val resultList=new mutable.ListBuffer[ResultTask]

//每个文件启动一个线程,异步提交,replaySet接收返回的值
for(f<-files){
val t=new Task
val replay=t.start() !! SubmitTask(f)
replaySet+=replay
}
while(replaySet.size>0){
//检查replaySet中是否有执行完Future,过滤出来
val toCompute=replaySet.filter(_.isSet)
for(r<-toCompute){
//r.apply()等价于r()取出r对象
val result=r.apply()
//取出的对象进行强转,放到resultList中
resultList+=result.asInstanceOf[ResultTask]
//操作完一个移除一个,避免重复
replaySet.remove(r)//replaySet -=r
}
Thread.sleep(100)//睡一会避免死循环,等待所有任务执行完
}
//最终resultList中的数据是每个文件处理好的esult
4000
Task(Map[String,Int])集合
//此步骤类似于hadoop里的reducer
val finalResult=resultList.map(_.result)//变成List里装的很多map格式
.flatten.groupBy(_._1)//压缩分组
.mapValues(x=>x.foldLeft(0)(_+_._2))//累加
//打印结果
println(finalResult)
}
}

5、结果
Map(lisi -> 2, 苏三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: