您的位置:首页 > 其它

有关RDD的基础学习1

2017-05-18 21:50 281 查看
1.spark rdd为什么不能嵌套?
譬如 val rdd1=sc.parallel(range(1,100))
val rdd2=sc.parallel(range(1,100))
rdd1.map(x=>rdd.count())
因为rdd的构造器中rdd(@trancient sc:SparkContext),这个sc是不可序列化的,而rdd的map之类的操作,需要把参数序列化,
这样就会出问题,sc就成了null,会报空值异常.
为什么sc要设置为不要序列化? 因为sc本身就不能序列化,没有继承serializble接口.

scala中如何使用正则进行抽取字符串中想要的内容?使用模式匹配,例如:
val regex="localhost[(.*)]".r
val master="localhost[4]"
master match{
case regex(threads) => converttoInt(threads)
case _=> println("not found")
}
2.spark rdd 中的partitioner有什么用?
确定数据是如何被划分到partitions里面的,譬如说rdd.repartition就使用了这个类.
rdd内容是1,2,3,4,5,设置了2个分区,每一个分区的数据会是怎么样的?
查看代码你会发现是会分成1,2一组,3,4,5一组
那rdd的partitioner有什么作用呢?
你调用rdd.repartition的时候,数据就根据这个进行重新分组了.
还有如果是转换成新的rdd出现了shuffle的时候就使用这个partitioner.
如果你想把数据重新分组,分成大于3的一组,其他的为一组,如何做?

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.HashPartitioner

object Test extends App {
//在windows平台上调试运行,需要设置hadoop的home
System.setProperty("hadoop.home.dir", "E:\\app\\hadoop");

val sparkConf = new SparkConf()
sparkConf.setMaster("local[2]").setAppName(Test.getClass.toString().dropRight(1))
val sc = new SparkContext(sparkConf)

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2)
.map(i => if(i>3) (1,i) else (0,i))
//.repartition(2)
.partitionBy(new HashPartitioner(2))
.map( {case (a,b)=>b })

println(rdd.toDebugString)
rdd.foreachPartition { p =>
p.foreach { l => println(l) }
println(p.hashCode())
}

sc.stop()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: