您的位置:首页 > 其它

Scala Spark 得到最近一天的数据 重点:join

2016-01-22 13:35 351 查看

0.数据

val data=
"""
user date      item1 item2
1    2015-12-01 14  5.6
1    2015-12-01 10  0.6
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3
2    2015-12-01 30  0.3
2    2015-12-01 89  1.2
2    2015-12-30 70  1.9
2    2015-12-31 20  2.5
3    2015-12-01 19  9.3
3    2015-12-01 40  2.3
3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
"""


1.方案一

val data2 = data.trim.split("\\n").map(_.split("\\s+").map{
f=>{
(f(0),Listbuffer(f(1).toString,f(2).toInt,f(3).toDouble)
}
}

val data3 = sc.parallelize(data2)

val dataReduce = data3.reduceByKey((x,y) =>
if(x(0).toString >= y(0).toString) x else y)

val dataUserAndDateKey = data3.map{
rec=>((rec._1,rec._2(0)),rec)
}
//daraUserAndDateKey:((3,2015-12-02),(...))

val dataReduceUserAndDateKey = dataReduce.map{
rec => ((rec._1,rec._2(0)),rec)
}
//daraReduceUserAndDateKey:((3,2015-12-02),(...)),
//不过每个用户只有最新一天的一条记录

//现在要得到的是每个用户最新一天的所有记录
val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
joinData.foreach(println)

((3,2015-12-02),((3,ListBuffer(2015-12-02, 13, 1.4)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((2,2015-12-31),((2,ListBuffer(2015-12-31, 20, 2.5)),
(2,ListBuffer(2015-12-31, 20, 2.5))))
((3,2015-12-02),((3,ListBuffer(2015-12-02, 50, 1.0)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((3,2015-12-02),((3,ListBuffer(2015-12-02, 19, 7.8)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((1,2015-12-02),((1,ListBuffer(2015-12-02, 8, 9.4)),
(1,ListBuffer(2015-12-02, 8, 9.4))))
((1,2015-12-02),((1,ListBuffer(2015-12-02, 90, 1.3)),
(1,ListBuffer(2015-12-02, 8, 9.4))))

//我们要的是
joinData.map(rec=>rec._2._1).foreach(println)

(3,ListBuffer(2015-12-02, 13, 1.4))
(2,ListBuffer(2015-12-31, 20, 2.5))
(3,ListBuffer(2015-12-02, 50, 1.0))
(3,ListBuffer(2015-12-02, 19, 7.8))
(1,ListBuffer(2015-12-02, 8, 9.4))
(1,ListBuffer(2015-12-02, 90, 1.3))


2.方案二

1.准备数据

val inputLines = sc.parallelize(data.split("\\r?\\n"))
// 把header 去掉
val data2 = inputLines.filter(l => !l.startsWith("user"))
data2.foreach(println)


2.找到每个用户最近的一天

val keyByUser = data.map(
line => {
val a = line.split("\\s+")
(a(0),line)
})

//对每个用户,找到他最新的一天
val latestByUser = keyByUser.reduceByKey(
(x,y)=>
if(x.split("\\s+")(1) > y.split("\\s+") x else y)
latestByUser.foreach(println)


3.Join 原始数据和最近的一天来得到结果

val latestKeyedByUserAndDate = latestByUser.map(
x=>(x._1 +":"+x._2.split("\\s+")(1),x._2))

val originalKeyedByUserAndDate = data.map(
line =>{
val a = line.split("\\s+")
(a(0) + ":"+a(1),line)
})
val result = latestKeyedByUserAndDate.join(originalKeyedByUserAndDate)
result.foreach(println)


4.将结果转换成你想要的形式

def createCombiner(v:(String,String)):
List[(String,String)] = List[(String,String)](v)

def mergeValue(acc:LIst[(String,String)],
value:(String,String)):List[(String,String)]
= value::acc

def mergeCombiners(acc1:List[(String,String)],
acc2:List[(String,String)]):List[(String,String)]
= acc2 ::: acc1

//use combineByKey
val transformedResult = result.mapValue(
l=>{
val a = l._2.split(" +")
(a(2),a(3))
}).combineByKey(createCombiner,mergeValue,mergeCombiners)
transformedResult.foreach(println)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: