Spark项目练习(计算用户停留时间最长的两个小区)
2018-01-29 17:14
381 查看
项目说明:附件为要计算数据的demo。点击打开链接
其中bs_log文件夹数据格式为(手机号,时间戳,基站ID,连接状态(“1”为连接,“0”为断开))
lac_info.txt 文件数据格式为(基站ID,经度,纬度,信号辐射类型)
程序思路:
1, 先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳)
2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行join,->(基站ID,(手机号,停留时间))
3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度))
4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度)
5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度))
6、取出停留时间最长的两个基站ID。
具体程序如下:
其中bs_log文件夹数据格式为(手机号,时间戳,基站ID,连接状态(“1”为连接,“0”为断开))
lac_info.txt 文件数据格式为(基站ID,经度,纬度,信号辐射类型)
程序思路:
1, 先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳)
2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行join,->(基站ID,(手机号,停留时间))
3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度))
4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度)
5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度))
6、取出停留时间最长的两个基站ID。
具体程序如下:
package cn.allengao.Location import org.apache.spark.{SparkConf, SparkContext} /** * class_name: * package: * describe: 基站信息查询 * creat_user: Allen Gao * creat_date: 2018/1/29 * creat_time: 10:03 **/ /* * 说明: * 1, 先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳) * 2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构, * 以便和坐标数据进行join,->(基站ID,(手机号,停留时间)) * 3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度)) * 4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度) * 5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度)) * 6、取出停留时间最长的两个基站ID。 * */ object UserLocation { def main(args: Array[String]): Unit = { //创建Spark配置信息 val conf = new SparkConf().setAppName("UserLocation").setMaster("local[*]") //建立Spark上下文,并将配置信息导入 val sc = new SparkContext(conf) /* 基站连接手机号,连接时间戳,基站站点ID信息,“1”表示连接,“0”表示断开连接。 18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 */ //从log文件拿到数据,并按行采集。 //sc.textFile("c://information//bs_log").map(_.split(",")).map(x => (x(0), x(1), x(2), x(3))) val rdd_Info = sc.textFile("j://information//bs_log").map(line => { //通过“,”将数据进行切分field(0)手机号,field(1)时间戳,field(2)基站ID信息,field(3)事件类型 val fields = line.split(",") //事件类型,“1”表示连接,“0”表示断开。 val eventType = fields(3) val time = fields(1) //连接基站将时间戳至为“-”,断开基站将时间戳至为“+”,以便后面进行计算。 val timeLong = if(eventType == "1") -time.toLong else time.toLong //构成一个数据类型(手机号,基站ID信息,带符号的时间戳) ((fields(0),fields(2)),timeLong) }) val rdd_lacInfo = rdd_Info.reduceByKey(_+_).map(t=>{ val mobile = t._1._1 val lac = t._1._2 val time = t._2 (lac, (mobile, time)) }) val rdd_coordinate = sc.textFile("j://information//lac_info.txt").map(line =>{ val f = line.split(",") //(基站ID, (经度, 纬度)) (f(0),(f(1), f(2))) }) //rdd1.join(rdd2)-->(CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))) val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t =>{ val lac = t._1 val mobile = t._2._1._1 val time = t._2._1._2 val x = t._2._2._1 val y = t._2._2._2 (mobile, lac, time, x, y) }) //按照手机号进行分组 val rdd_mobile = rdd_all.groupBy(_._1) //取出停留时间最长的前两个基站 val rdd_topTwo= rdd_mobile.mapValues(it =>{ it.toList.sortBy(_._3).reverse.take(2) }) // println(rdd_Info.collect().toBuffer) // println(rdd_lacInfo.collect().toBuffer) // println(rdd_coordinate.collect().toBuffer) // println(rdd_all.collect().toBuffer) // println(rdd_mobile.collect().toBuffer) // println(rdd_topTwo.collect().toBuffer) rdd_topTwo.saveAsTextFile("j://information//out") sc.stop() } }
相关文章推荐
- Spark项目练习(计算用户访问学科子网页的top3)
- SQL 数据小练习,呼叫中心 ----查询通话时间最长的5条记录 ----查询以0开头的通话总时,以秒为计算单位 ----查询2010年7月通话总时长最多的前两个呼叫员的编号 ----查询201
- 练习--JSP定时刷新与统计用户在某页停留时间
- Spark项目练习(根据规则库统计地区用户访问量,写入数据库)
- 用户行为统计分析页面停留时间计算
- javascript计算用户打开网页的停留时间
- javascript计算用户打开网页的停留时间
- 第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
- Java计算两个程序运行时间的实例
- go语言计算两个时间的时间差
- java计算两个时间相差天数的方法汇总
- Java计算时间差(两个时间相减)
- PHP 计算两个时间相差的天
- 在windows下计算两个时间的时间差(精确到毫秒)
- Spark项目之电商用户行为分析大数据平台之(十一)JSON及FASTJSON
- 计算两个日期的时间间隔
- Java计算两个程序运行时间
- 常用PHP函数系列八:计算两个时间戳之间的日期(常用于倒计时计算时间)
- 记录用户在页面停留时间并写入数据库
- 迅雷创始人程浩:创业公司5招做好内部创新(组建小型敢死队:一共3个人,一个产品经理,两个研发;腾讯做不做这个项目是一个伪命题;让用户来验证,而不是相反 good)