Spark测试代码
2017-08-17 11:59
363 查看
测试代码:
测试结果:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext /** * Created by Administrator on 2017/1/7. */ object TestMain { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Hangzhou_Test") //.setMaster("local[1]").setMaster("spark://172.21.7.10:7077").setJars(List("xxx.jar")).set("spark.executor.memory", "10g") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) // use rc_hive_db; hiveContext.sql("use rc_hive_db") import hiveContext.implicits._ hiveContext.setConf("mapred.max.split.size", "256000000") hiveContext.setConf("mapred.min.split.size.per.node", "100000000") hiveContext.setConf("mapred.min.split.size.per.rack", "100000000") hiveContext.setConf("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat") hiveContext.setConf("hive.merge.mapfiles", "true") hiveContext.setConf("hive.merge.mapredfiles", "true") hiveContext.setConf("hive.merge.size.per.task", "256000000") hiveContext.setConf("hive.merge.smallfiles.avgsize", "256000000") hiveContext.setConf("hive.groupby.skewindata", "true") hiveContext.sql("create table if not exists tb_id_vs_name(id int,name string)") hiveContext.sql("create table if not exists tb_id_vs_name2(id int,name string)") println("-------------------------word count:------------------------------------") // http://blog.csdn.net/t1dmzks/article/details/70189509 var words = "When building the vocabulary ignore terms that have a document frequency strictly lower than the given threshold. This value is also called cut-off in the literature. If float, the parameter represents a proportion of documents, integer absolute counts. This parameter is ignored if vocabulary is not None." val textFile = sc.parallelize(words.split(" "), 2) textFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) .foreach(println) println("-------------------------map(func):------------------------------------") // 1.map(func) val rdd = sc.parallelize(1 to 10) //创建RDD val map = rdd.map(_ * 2) //对RDD中的每个元素都乘于2 map.foreach(x => print(x + " ")) println("-------------------------flatMap(func):------------------------------------") // 2.flatMap(func) val fm = rdd.flatMap(x => (1 to x)).collect() fm.foreach(x => print(x + " ")) println("-------------------------mapPartitions(func) 1:------------------------------------") // 3.mapPartitions(func) val mp = sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2).mapPartitions(x => { var woman = List[String]() while (x.hasNext) { val next = x.next() next match { case (_, "female") => woman = next._1 :: woman case _ => } } woman.iterator }) /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/ mp.collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出 println("-------------------------mapPartitions(func) 2:------------------------------------") sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2) .mapPartitions(x => x.filter(_._2 == "female")) .map(x => x._1) .foreach(x => (print(x + " "))) println("-------------------------mapPartitionsWithIndex(func) :------------------------------------") // 4.mapPartitionsWithIndex(func) sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2) .mapPartitionsWithIndex((index: Int, iter: Iterator[(String, String)]) => { var woman = List[String]() while (iter.hasNext) { val next = iter.next() next match { case (_, "female") => woman = "[" + index + "]" + next._1 :: woman case _ => } } woman.iterator }) .collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出 println("-------------------------simple(withReplacement,fraction,seed) :------------------------------------") // 5.simple(withReplacement,fraction,seed) val sample1 = rdd.sample(true, 0.5, 3) sample1.collect.foreach(x => print(x + " ")) println("-------------------------union(ortherDataset) :将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重------------------------------------") // 6.union(ortherDataset) val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) rdd1.union(rdd2).collect.foreach(x => print(x + " ")) println("-------------------------union(ortherDataset) :返回两个RDD的交集------------------------------------") // 7.intersection(otherDataset) rdd1.intersection(rdd2).collect.foreach(x => print(x + " ")) println("-------------------------distinct([numTasks]) :对RDD中的元素进行去重------------------------------------") // 8.distinct([numTasks]) sc.parallelize(List(1, 1, 2, 5, 2, 9, 6, 1)).distinct().collect.foreach(x => print(x + " ")) println("-------------------------cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作------------------------------------") // 9.cartesian(otherDataset) sc.parallelize(1 to 3).cartesian(sc.parallelize(2 to 5)).foreach(x => println(x + " ")) println("-------------------------coalesce(numPartitions,shuffle):对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数------------------------------------") // 10.coalesce(numPartitions,shuffle) val coalesceRDD = sc.parallelize(1 to 16, 4).coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7) println("重新分区后的分区个数:" + coalesceRDD.partitions.size) val coalesceRDD2 = sc.parallelize(1 to 16, 4).coalesce(7, true) println("重新分区后的分区个数:" + coalesceRDD2.partitions.size) println("RDD依赖关系:" + coalesceRDD2.toDebugString) println("-------------------------repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和例9.1的coalesce(numPartition,true)的一样------------------------------------") // 11.repartition(numPartition) // 12.glom()glom():将RDD的每个分区中的类型为T的元素转换换数组Array[T] // 13.randomSplit(weight:Array[Double],seed):根据weight权重值将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大 println("-------------------------repartition(numPartition)-----------------------------") sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (21, "211"), (22, "222"), (23, "233"), (24, "244"), (25, "255"), (26, "266"), (27, "277"), (28, "288"), (29, "99"), (210, "21010"), (31, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (41, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (51, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (61, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (71, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), (81, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010") )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name") hiveContext.sql("insert into tb_id_vs_name select * from temp_tb_id_vs_name") sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010") )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name2") hiveContext.sql("insert into tb_id_vs_name2 select * from temp_tb_id_vs_name2") var result = hiveContext.sql("select t10.id as t10_id,t10.name as t10_name from tb_id_vs_name t10 inner join tb_id_vs_name2 t11 on t10.id=t11.id") result.map(s => (s.getAs[Int]("t10_id"), s.getAs[String]("t10_name"))).foreach(s => { println(s._1 + ":" + s._2) }) sc.stop() } }
测试结果:
-------------------------word count:------------------------------------ -------------------------map(func):------------------------------------ -------------------------flatMap(func):------------------------------------ 1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 1 2 3 4 5 6 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 10 -------------------------mapPartitions(func) 1:------------------------------------ kpop lucy -------------------------mapPartitions(func) 2:------------------------------------ -------------------------mapPartitionsWithIndex(func) :------------------------------------ [0]kpop [1]lucy -------------------------simple(withReplacement,fraction,seed) :------------------------------------ 4 4 6 8 -------------------------union(ortherDataset) :灏嗕袱涓猂DD涓殑鏁版嵁闆嗚繘琛屽悎骞讹紝鏈€缁堣繑鍥炰袱涓猂DD鐨勫苟闆嗭紝鑻DD涓瓨鍦ㄧ浉鍚岀殑鍏冪礌涔熶笉浼氬幓閲----------------------------------- 1 2 3 3 4 5 -------------------------union(ortherDataset) :杩斿洖涓や釜RDD鐨勪氦闆----------------------------------- 3 -------------------------distinct([numTasks]) :瀵筊DD涓殑鍏冪礌杩涜鍘婚噸------------------------------------ 1 2 5 6 9 -------------------------cartesian(otherDataset):瀵逛袱涓猂DD涓殑鎵€鏈夊厓绱犺繘琛岀瑳鍗″皵绉搷浣----------------------------------- -------------------------coalesce(numPartitions锛宻huffle):瀵筊DD鐨勫垎鍖鸿繘琛岄噸鏂板垎鍖猴紝shuffle榛樿鍊间负false,褰搒huffle=false鏃讹紝涓嶈兘澧炲姞鍒嗗尯鏁----------------------------------- 閲嶆柊鍒嗗尯鍚庣殑鍒嗗尯涓暟:3 閲嶆柊鍒嗗尯鍚庣殑鍒嗗尯涓暟:7 RDD渚濊禆鍏崇郴:(7) MapPartitionsRDD[40] at coalesce at TestMain.scala:117 [] | CoalescedRDD[39] at coalesce at TestMain.scala:117 [] | ShuffledRDD[38] at coalesce at TestMain.scala:117 [] +-(4) MapPartitionsRDD[37] at coalesce at TestMain.scala:117 [] | ParallelCollectionRDD[36] at parallelize at TestMain.scala:117 [] -------------------------repartition(numPartition):鏄嚱鏁癱oalesce(numPartition,true)鐨勫疄鐜帮紝鏁堟灉鍜屼緥9.1鐨刢oalesce(numPartition,true)鐨勪竴鏍----------------------------------- -------------------------repartition(numPartition)-----------------------------
相关文章推荐
- 分别用Java、Scala、spark-shell开发wordcount程序及测试代码
- lkl风控.逻辑回归分析模型测试代码spark1.6
- lkl风控.随机森林模型测试代码spark1.6
- 在window上提交spark代码到远程测试环境上运行
- 基于spark的自然语言处理包集成和测试(命名实体识别)
- 一段代码运行时间测试
- STL学习之简介及第一节顺序容器附测试代码
- 测试TeX代码的网址
- emqttd java代码测试【转】
- 基于spark排序的一种更廉价的实现方案-附基于spark的性能测试
- 专题:如何写测试——Spark
- epoll的测试用例(代码)
- 《多任务下的数据结构与算法》一书中的红黑树的测试代码,供读者参考!
- Android的jni下c与java数据互传测试代码
- 代码测试--Monkey测试(猴子测试)
- 多条件试卷提取的问题(附带测试代码)
- 在Jmeter中使用自定义编写的Java测试代码
- 一起谈.NET技术,走向ASP.NET架构设计——第二章:设计/ 测试/代码
- 利用API发送和接收消息,本代码测试全通过
- 测试代码