您的位置:首页 > 产品设计 > UI/UE

利用spark实现一些简单案例

2018-03-15 22:07 1161 查看
1、实现first_value
package com.ruozedata.core

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用Spark Core API来实现first_value函数
  */
object FirstValueApp {
  def main(args: Array[String]) {
    val conf = new SparkConf()
          .setAppName("FirstValueApp")
          .setMaster("local[2]")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List(
      ("A", "A1"),
        ("A", "A2"),
        ("A", "A3"),
        ("B", "B1"),
        ("B", "B2"),
        ("B", "B3"),
        ("C", "C1")
    ))
    //data.collect().foreach(println)

    data.groupByKey().map(x => {
      (x._1, firstValue(x._2))
    }).collect().foreach(println)

    // TODO... 进来一个迭代器,输出一个firstvalue
    def firstValue(items: Iterable[String]) = {
        for(item <- items)
          yield (item, items.head)
    }

    sc.stop()
  }

}

2、使用spark-core读取sequenceFile数据
package com.ruozedata.core

import org.apache.hadoop.io.BytesWritable
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用Spark Core来读取SequenceFile/ProtoBuf的数据
  *有一些场景是要用到的 
  */
object ReadSequenceFileApp {

  def main(args: Array[String]) {
    val conf = new SparkConf()
//      .setAppName("ReadSequenceFileApp")
//      .setMaster("local[2]")
    val sc = new SparkContext(conf)

    val file = sc.sequenceFile[BytesWritable, String]("hdfs://hadoop000:8020/user/hive/warehouse/states_seq")
//    file.collect()

    // 对于seq/pb格式的文件来说,key通常是没用
//    file.map(x => (x._1.copyBytes(), x._2)).collect()
    file.map(x => x._2.split("\t"))
      .map(x => (x(0),x(1)))
      .collect()
          .foreach(println)
    sc.stop()
  }

}

3、二次排序
package com.ruozedata.core

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用Spark Core API实现二次排序
  *  1) 自定义排序的key, 要实现Ordered和Serializable接口
  *  2)将要排序的数据,映射成key为自定义排序的key,value就是原始的值
  *  3)按照业务逻辑实现compare方法
  *  4)使用sortByKey(false/true)算子按照自定义的key进行排序
  *  5)丢弃key,取value
  */
object SecondSortApp {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("FirstValueApp")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)

    val data = sc.textFile("D:/sort.txt")
    //data.collect().foreach(println)

    data.map(x => {
      val splits = x.split(",")
      (new SecondSortKey(splits(0).trim.toLong, splits(1).trim.toLong), x)
    }).sortByKey().map(x => x._2)
      .collect().foreach(println)

    sc.stop()
  }

  // 将(c1,c2)作为一个整体,当做是SecondSortKey
  // 在scala中,第一个trait我们可以使用extends,如果还有其他trait,就用with
  class SecondSortKey(val first:Long, val second:Long)
    extends Ordered[SecondSortKey]
    with Serializable {

    /**
      * 排序的实现方式: this  that
      */
    override def compare(that: SecondSortKey): Int = {
      // >0  =0  <0
        if(this.first - that.first != 0) {
          (this.first - that.first).toInt
        } else { //==0
          (this.second - that.second).toInt
        }
    }
  }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐