您的位置:首页 > 其它

scala 并行集合在spark中的应用

2015-11-12 00:40 369 查看

一.scala并行集合

现在有一个集合,对它的每个元素进行处理,比如:

val arr = List[String]("a","b","c")
arr.foreach(println(_))
//输出结果:
a
b
c

//并行集合
arr.par.foreach(println(_))
//输出结果:
a
c
b


观察线程

println((0 to 1000).map{r => Thread.currentThread.getName}.distinct)

Vector(main)

println((0 to 1000).par.map{r => Thread.currentThread.getName}.distinct)

ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)


二.在spark中的使用

现在你有n个相互之间没有关联的任务需要执行,比如说从100张表中取出数据并进行数据清洗。按照spark的机制,job初使的并发度为数据的partition数(在hdfs中为数据的分块数,在es和cass中读取的数据为数据的副本数)。

所有即使你给任务分配了100个core,在数据读取阶段也只会使用到其中几个。

val table_list = List[String]("table1","table2","table3","...")

val iter = arr.iterator

while(iter.hasNext){
val tableName = iter.next
//根据table从数据源读取数据
val data = ...
//数据清洗操作
data.map(...)
}


改成并行集合的模式,程序会以最大的并发度(你设置的core数量)从数据源拉取数据,大大提高了程序运行效率。

val table_list = List[String]("table1","table2","table3","...")

table_list.par.foreach(r=>{
val tableName = r
//根据table从数据源读取数据
val data = ...
//数据清洗操作
data.map(...)
})


当然也可以使用
scala.concurrent
包中的Future等可以达到相同的作用

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val arr = List[String]("a","b","c")

val futures = arr.map(r=>Future{
val tableName = r
//根据table从数据源读取数据
val data = ...
//数据清洗操作
data.map(...)
})

val future = Future.sequence(futures)

//任务最长执行时间
Await.result(future, 1 hour)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: