scala用actor并发编程写一个单机版的WorldCount(类似Hadoop的MapReduce思想)
2017-12-31 19:58
567 查看
1、准备数据,2个文件
words.txt
内容:
lilei hello
zhangsan hello
lisi hello
苏三 hello
words.log
内容:
lilei hello
zhangsan hello
lisi hello
2、环境Intellj IDEA scala插件
3、代码
package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source
//模式匹配类,用于提交任务
case class SubmitTask(fileName:String)
//单例的模式匹配类,用于停止任务
case object StopTask
//用于收集分组后结果的
case class ResultTask(result:Map[String,Int])
/**
* scala Actor构建在java的线程基础之上的,
* 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法
* 方法执行完毕后,仍然被保留
*/
class Task extends Actor{
override def act(){
loop{//重复执行一个代码块
react{
case SubmitTask(fileName)=>{
val result=Source.fromFile(fileName,"gb2312").getLines()//获取文件,有中文-编码,每一行生成一个List集合
.flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合压缩成一个集合,再切分,再生成map-(“单词”,1)
.groupBy(_._1).mapValues(_.size)//按照key分组,value就是分组后map的数量
sender ! ResultTask(result)//把单个文件的统计结果输出,!代表异步执行
}
case StopTask=>{
exit()
}
}
}
}
}
object WorkCount{
def main(args: Array[String]) {
//要读取的文件
val files=Array("E://words.txt","E://words.log")
val replaySet=new mutable.HashSet[Future[Any]]
val resultList=new mutable.ListBuffer[ResultTask]
//每个文件启动一个线程,异步提交,replaySet接收返回的值
for(f<-files){
val t=new Task
val replay=t.start() !! SubmitTask(f)
replaySet+=replay
}
while(replaySet.size>0){
//检查replaySet中是否有执行完Future,过滤出来
val toCompute=replaySet.filter(_.isSet)
for(r<-toCompute){
//r.apply()等价于r()取出r对象
val result=r.apply()
//取出的对象进行强转,放到resultList中
resultList+=result.asInstanceOf[ResultTask]
//操作完一个移除一个,避免重复
replaySet.remove(r)//replaySet -=r
}
Thread.sleep(100)//睡一会避免死循环,等待所有任务执行完
}
//最终resultList中的数据是每个文件处理好的esult
4000
Task(Map[String,Int])集合
//此步骤类似于hadoop里的reducer
val finalResult=resultList.map(_.result)//变成List里装的很多map格式
.flatten.groupBy(_._1)//压缩分组
.mapValues(x=>x.foldLeft(0)(_+_._2))//累加
//打印结果
println(finalResult)
}
}
5、结果
Map(lisi -> 2, 苏三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)
words.txt
内容:
lilei hello
zhangsan hello
lisi hello
苏三 hello
words.log
内容:
lilei hello
zhangsan hello
lisi hello
2、环境Intellj IDEA scala插件
3、代码
package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source
//模式匹配类,用于提交任务
case class SubmitTask(fileName:String)
//单例的模式匹配类,用于停止任务
case object StopTask
//用于收集分组后结果的
case class ResultTask(result:Map[String,Int])
/**
* scala Actor构建在java的线程基础之上的,
* 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法
* 方法执行完毕后,仍然被保留
*/
class Task extends Actor{
override def act(){
loop{//重复执行一个代码块
react{
case SubmitTask(fileName)=>{
val result=Source.fromFile(fileName,"gb2312").getLines()//获取文件,有中文-编码,每一行生成一个List集合
.flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合压缩成一个集合,再切分,再生成map-(“单词”,1)
.groupBy(_._1).mapValues(_.size)//按照key分组,value就是分组后map的数量
sender ! ResultTask(result)//把单个文件的统计结果输出,!代表异步执行
}
case StopTask=>{
exit()
}
}
}
}
}
object WorkCount{
def main(args: Array[String]) {
//要读取的文件
val files=Array("E://words.txt","E://words.log")
val replaySet=new mutable.HashSet[Future[Any]]
val resultList=new mutable.ListBuffer[ResultTask]
//每个文件启动一个线程,异步提交,replaySet接收返回的值
for(f<-files){
val t=new Task
val replay=t.start() !! SubmitTask(f)
replaySet+=replay
}
while(replaySet.size>0){
//检查replaySet中是否有执行完Future,过滤出来
val toCompute=replaySet.filter(_.isSet)
for(r<-toCompute){
//r.apply()等价于r()取出r对象
val result=r.apply()
//取出的对象进行强转,放到resultList中
resultList+=result.asInstanceOf[ResultTask]
//操作完一个移除一个,避免重复
replaySet.remove(r)//replaySet -=r
}
Thread.sleep(100)//睡一会避免死循环,等待所有任务执行完
}
//最终resultList中的数据是每个文件处理好的esult
4000
Task(Map[String,Int])集合
//此步骤类似于hadoop里的reducer
val finalResult=resultList.map(_.result)//变成List里装的很多map格式
.flatten.groupBy(_._1)//压缩分组
.mapValues(x=>x.foldLeft(0)(_+_._2))//累加
//打印结果
println(finalResult)
}
}
5、结果
Map(lisi -> 2, 苏三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)
相关文章推荐
- 快学Scala-Actor并发编程实现WordCount
- Hadoop是Apache提出的一个软件框架(即:开放源码并行运算编程工具和分布式文件系统,与MapReduce和Google档案系统的概念类似)
- Hadoop MapReduce编程 API入门系列之wordcount版本5(九)
- Hadoop集群_WordCount运行详解--MapReduce编程模型
- Hadoop MapReduce编程 API入门系列之wordcount版本2(六)
- 快学Scala- Scala Actor 并发编程
- Scala Actor并发编程 实战(四)
- 初学HADOOP(MAPREDUCE-WORLD COUNT/HIVE/SQOOP)
- Scala Actor并发编程 实战(五)
- Hadoop MapReduce编程 API入门系列之wordcount版本3(七)
- scala多线程之actor并发编程模型
- hadoop学习4--MapReduce及官方WorldCount分析
- hadoop mapreduce hello world(wordcount)
- Hadoop-MapReduce编程思想浅析
- [Hadoop入门] - 1 Ubuntu系统 Hadoop介绍 MapReduce编程思想
- 一个wordcount程序轻松玩转MapReduce编程模型
- Hadoop MapReduce编程 API入门系列之wordcount版本4(八)
- Hadoop学习笔记 --- MapReduce实现WorldCount原理解析
- hadoop学习(7)—— 使用yarn运行mapreduce一个简单的wordcount示例
- scala akka 修炼之路1(使用actor实现一个job的并发计算和task失败重启)