利用spark实现一些简单案例
2018-03-15 22:07
1161 查看
1、实现first_value
package com.ruozedata.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core API来实现first_value函数
*/
object FirstValueApp {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("FirstValueApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(
("A", "A1"),
("A", "A2"),
("A", "A3"),
("B", "B1"),
("B", "B2"),
("B", "B3"),
("C", "C1")
))
//data.collect().foreach(println)
data.groupByKey().map(x => {
(x._1, firstValue(x._2))
}).collect().foreach(println)
// TODO... 进来一个迭代器,输出一个firstvalue
def firstValue(items: Iterable[String]) = {
for(item <- items)
yield (item, items.head)
}
sc.stop()
}
}
2、使用spark-core读取sequenceFile数据
package com.ruozedata.core
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core来读取SequenceFile/ProtoBuf的数据
*有一些场景是要用到的
*/
object ReadSequenceFileApp {
def main(args: Array[String]) {
val conf = new SparkConf()
// .setAppName("ReadSequenceFileApp")
// .setMaster("local[2]")
val sc = new SparkContext(conf)
val file = sc.sequenceFile[BytesWritable, String]("hdfs://hadoop000:8020/user/hive/warehouse/states_seq")
// file.collect()
// 对于seq/pb格式的文件来说,key通常是没用
// file.map(x => (x._1.copyBytes(), x._2)).collect()
file.map(x => x._2.split("\t"))
.map(x => (x(0),x(1)))
.collect()
.foreach(println)
sc.stop()
}
}
3、二次排序
package com.ruozedata.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core API实现二次排序
* 1) 自定义排序的key, 要实现Ordered和Serializable接口
* 2)将要排序的数据,映射成key为自定义排序的key,value就是原始的值
* 3)按照业务逻辑实现compare方法
* 4)使用sortByKey(false/true)算子按照自定义的key进行排序
* 5)丢弃key,取value
*/
object SecondSortApp {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("FirstValueApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val data = sc.textFile("D:/sort.txt")
//data.collect().foreach(println)
data.map(x => {
val splits = x.split(",")
(new SecondSortKey(splits(0).trim.toLong, splits(1).trim.toLong), x)
}).sortByKey().map(x => x._2)
.collect().foreach(println)
sc.stop()
}
// 将(c1,c2)作为一个整体,当做是SecondSortKey
// 在scala中,第一个trait我们可以使用extends,如果还有其他trait,就用with
class SecondSortKey(val first:Long, val second:Long)
extends Ordered[SecondSortKey]
with Serializable {
/**
* 排序的实现方式: this that
*/
override def compare(that: SecondSortKey): Int = {
// >0 =0 <0
if(this.first - that.first != 0) {
(this.first - that.first).toInt
} else { //==0
(this.second - that.second).toInt
}
}
}
}
package com.ruozedata.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core API来实现first_value函数
*/
object FirstValueApp {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("FirstValueApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(
("A", "A1"),
("A", "A2"),
("A", "A3"),
("B", "B1"),
("B", "B2"),
("B", "B3"),
("C", "C1")
))
//data.collect().foreach(println)
data.groupByKey().map(x => {
(x._1, firstValue(x._2))
}).collect().foreach(println)
// TODO... 进来一个迭代器,输出一个firstvalue
def firstValue(items: Iterable[String]) = {
for(item <- items)
yield (item, items.head)
}
sc.stop()
}
}
2、使用spark-core读取sequenceFile数据
package com.ruozedata.core
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core来读取SequenceFile/ProtoBuf的数据
*有一些场景是要用到的
*/
object ReadSequenceFileApp {
def main(args: Array[String]) {
val conf = new SparkConf()
// .setAppName("ReadSequenceFileApp")
// .setMaster("local[2]")
val sc = new SparkContext(conf)
val file = sc.sequenceFile[BytesWritable, String]("hdfs://hadoop000:8020/user/hive/warehouse/states_seq")
// file.collect()
// 对于seq/pb格式的文件来说,key通常是没用
// file.map(x => (x._1.copyBytes(), x._2)).collect()
file.map(x => x._2.split("\t"))
.map(x => (x(0),x(1)))
.collect()
.foreach(println)
sc.stop()
}
}
3、二次排序
package com.ruozedata.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Core API实现二次排序
* 1) 自定义排序的key, 要实现Ordered和Serializable接口
* 2)将要排序的数据,映射成key为自定义排序的key,value就是原始的值
* 3)按照业务逻辑实现compare方法
* 4)使用sortByKey(false/true)算子按照自定义的key进行排序
* 5)丢弃key,取value
*/
object SecondSortApp {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("FirstValueApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val data = sc.textFile("D:/sort.txt")
//data.collect().foreach(println)
data.map(x => {
val splits = x.split(",")
(new SecondSortKey(splits(0).trim.toLong, splits(1).trim.toLong), x)
}).sortByKey().map(x => x._2)
.collect().foreach(println)
sc.stop()
}
// 将(c1,c2)作为一个整体,当做是SecondSortKey
// 在scala中,第一个trait我们可以使用extends,如果还有其他trait,就用with
class SecondSortKey(val first:Long, val second:Long)
extends Ordered[SecondSortKey]
with Serializable {
/**
* 排序的实现方式: this that
*/
override def compare(that: SecondSortKey): Int = {
// >0 =0 <0
if(this.first - that.first != 0) {
(this.first - that.first).toInt
} else { //==0
(this.second - that.second).toInt
}
}
}
}
相关文章推荐
- 利用lvs-nat实现两台web服务器负载均衡的简单案例
- 利用C语言实现一些简单的栈操作
- Struts2+Spring 整合成功测试案例----利用Dojo实现简单的Ajax的效果(一) .
- 一些利用递归思想的简单编程题(JS实现)
- 利用ajax实现局部刷新(简单的注册验证案例)
- saltstack一些简单总结--利用saltstack的event实现自己的功能(2)
- 一些简单的利用循环实现问题的代码
- 【微信支付】分享一个失败的案例 跨域405(Method Not Allowed)问题 关于IM的一些思考与实践 基于WebSocketSharp 的IM 简单实现 【css3】旋转倒计时 【Html5】-- 塔台管制 H5情景意识 --飞机 谈谈转行
- 利用rsync+inotify实现主从服务器数据同步的简单案例
- Struts2+Spring 整合成功测试案例----利用Dojo实现简单的Ajax的效果
- 利用Java泛型实现简单的泛型方法
- 原生JS实现一些简单的操作
- 利用C语言实现POST数据包如此简单【模拟网页提交表单】
- 利用JAVA多线程采集数据库与Linux主机指标的简单实现
- 利用javaScript实现简单的计算器
- 利用VideoView简单实现视频播放 包括 横竖屏切换 声音 亮度 暂停
- python 线程简单使用----1利用threading 实现多线程
- JMS之——基于ActiveMQ实现简单的消息收发案例
- 利用简单的参数传递来实现单条查询的easyui-datagrid
- 通过QQ实现用户资源积累,通过一些验证的盈利方法 对用户资源合理利用