Scala Spark 得到最近一天的数据 重点:join
2016-01-22 13:35
351 查看
0.数据
val data= """ user date item1 item2 1 2015-12-01 14 5.6 1 2015-12-01 10 0.6 1 2015-12-02 8 9.4 1 2015-12-02 90 1.3 2 2015-12-01 30 0.3 2 2015-12-01 89 1.2 2 2015-12-30 70 1.9 2 2015-12-31 20 2.5 3 2015-12-01 19 9.3 3 2015-12-01 40 2.3 3 2015-12-02 13 1.4 3 2015-12-02 50 1.0 3 2015-12-02 19 7.8 """
1.方案一
val data2 = data.trim.split("\\n").map(_.split("\\s+").map{ f=>{ (f(0),Listbuffer(f(1).toString,f(2).toInt,f(3).toDouble) } } val data3 = sc.parallelize(data2) val dataReduce = data3.reduceByKey((x,y) => if(x(0).toString >= y(0).toString) x else y) val dataUserAndDateKey = data3.map{ rec=>((rec._1,rec._2(0)),rec) } //daraUserAndDateKey:((3,2015-12-02),(...)) val dataReduceUserAndDateKey = dataReduce.map{ rec => ((rec._1,rec._2(0)),rec) } //daraReduceUserAndDateKey:((3,2015-12-02),(...)), //不过每个用户只有最新一天的一条记录 //现在要得到的是每个用户最新一天的所有记录 val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey) joinData.foreach(println) ((3,2015-12-02),((3,ListBuffer(2015-12-02, 13, 1.4)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((2,2015-12-31),((2,ListBuffer(2015-12-31, 20, 2.5)), (2,ListBuffer(2015-12-31, 20, 2.5)))) ((3,2015-12-02),((3,ListBuffer(2015-12-02, 50, 1.0)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((3,2015-12-02),((3,ListBuffer(2015-12-02, 19, 7.8)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((1,2015-12-02),((1,ListBuffer(2015-12-02, 8, 9.4)), (1,ListBuffer(2015-12-02, 8, 9.4)))) ((1,2015-12-02),((1,ListBuffer(2015-12-02, 90, 1.3)), (1,ListBuffer(2015-12-02, 8, 9.4)))) //我们要的是 joinData.map(rec=>rec._2._1).foreach(println) (3,ListBuffer(2015-12-02, 13, 1.4)) (2,ListBuffer(2015-12-31, 20, 2.5)) (3,ListBuffer(2015-12-02, 50, 1.0)) (3,ListBuffer(2015-12-02, 19, 7.8)) (1,ListBuffer(2015-12-02, 8, 9.4)) (1,ListBuffer(2015-12-02, 90, 1.3))
2.方案二
1.准备数据
val inputLines = sc.parallelize(data.split("\\r?\\n")) // 把header 去掉 val data2 = inputLines.filter(l => !l.startsWith("user")) data2.foreach(println)
2.找到每个用户最近的一天
val keyByUser = data.map( line => { val a = line.split("\\s+") (a(0),line) }) //对每个用户,找到他最新的一天 val latestByUser = keyByUser.reduceByKey( (x,y)=> if(x.split("\\s+")(1) > y.split("\\s+") x else y) latestByUser.foreach(println)
3.Join 原始数据和最近的一天来得到结果
val latestKeyedByUserAndDate = latestByUser.map( x=>(x._1 +":"+x._2.split("\\s+")(1),x._2)) val originalKeyedByUserAndDate = data.map( line =>{ val a = line.split("\\s+") (a(0) + ":"+a(1),line) }) val result = latestKeyedByUserAndDate.join(originalKeyedByUserAndDate) result.foreach(println)
4.将结果转换成你想要的形式
def createCombiner(v:(String,String)): List[(String,String)] = List[(String,String)](v) def mergeValue(acc:LIst[(String,String)], value:(String,String)):List[(String,String)] = value::acc def mergeCombiners(acc1:List[(String,String)], acc2:List[(String,String)]):List[(String,String)] = acc2 ::: acc1 //use combineByKey val transformedResult = result.mapValue( l=>{ val a = l._2.split(" +") (a(2),a(3)) }).combineByKey(createCombiner,mergeValue,mergeCombiners) transformedResult.foreach(println)
相关文章推荐
- web前端开发环境搭建(grunt)
- js之执行环境(作用域)与作用域链深入剖析
- broadcom6838开发环境实现函数栈追踪
- oracle循环insert数据(模拟数据)
- 蓝桥杯 历届试题 大臣的旅费
- html5页面缓存设置
- 博客地址转移
- iOS关于菜单滚动视图实现
- 架构系列:ASP.NET 项目结构搭建
- Android应用程序的生命周期
- Java序列化的作用serialVersionUID
- 利用 CSS 进行网页布局
- Access restriction: The type 'JPEGImageWriter' is not API
- MatLab GUI Use Command for Debug 界面调试的一些方法
- BZOJ3130: [Sdoi2013]费用流
- JAVA注解
- enum,NS_ENUM和NS_OPTIONS
- C语言学习之结构体等
- main函数
- IO流(文本文件读取方式)第二种方式,存后直接输出出去