scala 并行集合在spark中的应用
2015-11-12 00:40
369 查看
一.scala并行集合
现在有一个集合,对它的每个元素进行处理,比如:val arr = List[String]("a","b","c") arr.foreach(println(_)) //输出结果: a b c //并行集合 arr.par.foreach(println(_)) //输出结果: a c b
观察线程
println((0 to 1000).map{r => Thread.currentThread.getName}.distinct) Vector(main) println((0 to 1000).par.map{r => Thread.currentThread.getName}.distinct) ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)
二.在spark中的使用
现在你有n个相互之间没有关联的任务需要执行,比如说从100张表中取出数据并进行数据清洗。按照spark的机制,job初使的并发度为数据的partition数(在hdfs中为数据的分块数,在es和cass中读取的数据为数据的副本数)。所有即使你给任务分配了100个core,在数据读取阶段也只会使用到其中几个。
val table_list = List[String]("table1","table2","table3","...") val iter = arr.iterator while(iter.hasNext){ val tableName = iter.next //根据table从数据源读取数据 val data = ... //数据清洗操作 data.map(...) }
改成并行集合的模式,程序会以最大的并发度(你设置的core数量)从数据源拉取数据,大大提高了程序运行效率。
val table_list = List[String]("table1","table2","table3","...") table_list.par.foreach(r=>{ val tableName = r //根据table从数据源读取数据 val data = ... //数据清洗操作 data.map(...) })
当然也可以使用
scala.concurrent包中的Future等可以达到相同的作用
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.concurrent.duration._ val arr = List[String]("a","b","c") val futures = arr.map(r=>Future{ val tableName = r //根据table从数据源读取数据 val data = ... //数据清洗操作 data.map(...) }) val future = Future.sequence(futures) //任务最长执行时间 Await.result(future, 1 hour)
相关文章推荐
- 提升APNS消息推送质量的一些想法和验证程序
- 基础Shader
- 最小子串覆盖
- mysql 获取季度的第一天 本月的第一天,本周的第一天sql语句(转)
- Android 实战 - 个人App乐逗项目 之 查看图片,查看GIF封装
- MySQL 导入导出Execl
- 使用 Build.VERSION.SDK_INT兼容不同版本的API
- 微信分享c++接口&OC实现
- windows配置JDK环境变量
- [python网络编程]使用scapy修改源IP发送请求
- 回射客户端服务器UDP(echo client&&server based on UDP)
- js操作数组1
- MySQL查询本周、上周、本月、上个月份数据的sql代码(转)
- Axure RP 各个版本中文版 汉化包 破解版 下载地址及注册码
- hadoop完全分布式集群+Win Eclipse+Hbase+Hive+Zookeeper+Sqoop+SPARK试验机平台
- Ubuntu14安装Nvidia显卡驱动
- linux命令-rpm安装和卸载
- Quick-Cocos2d-x初学者游戏教程(三)
- Quick-Cocos2d-x初学者游戏教程(二)
- 我的前端故事----疯狂倒计时(requestAnimationFrame)