Spark项目练习(根据规则库统计地区用户访问量,写入数据库)
2018-02-01 09:06
441 查看
项目说明:附件为要计算数据的demo。点击打开链接
分析用户访问数据,将访问IP计算分析,根据规则库,统计出各省份的访问量。
其中,用到知识点:1、常用算法二分法。2、IP地址转换成10进制数的简单算法。3、spark对Mysql数据库的操作。
具体程序如下:
打开Mysql数据库,新建数据库bigdata,新建表location_info,新建四个字段,id(非空,自增),location(varchar),counts(int),access_date(datetime)。运行程序,刷新数据库,可见如下结果:
分析用户访问数据,将访问IP计算分析,根据规则库,统计出各省份的访问量。
其中,用到知识点:1、常用算法二分法。2、IP地址转换成10进制数的简单算法。3、spark对Mysql数据库的操作。
具体程序如下:
package cn.allengao.Location import java.sql.{Connection, Date, DriverManager, PreparedStatement} import org.apache.spark.{SparkConf, SparkContext} /** * class_name: * package: * describe: 分析用户访问数据,将访问IP计算分析,根据规则库,统计出各省份的访问量。 * creat_user: Allen Gao * creat_date: 2018/2/1 * creat_time: 9:06 **/ object IPLocation { val data2MySQL = (iterator: Iterator[(String, Int)]) => { var conn: Connection = null var ps : PreparedStatement = null val sql = "INSERT INTO location_info (location, counts, access_date) VALUES (?, ?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "toor") iterator.foreach(line => { ps = conn.prepareStatement(sql) ps.setString(1, line._1) ps.setInt(2, line._2) ps.setDate(3, new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() } } //将IP地址转换为Long类型,以方便比较 def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } //二分法检索 def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 } def main(args: Array[String]) { val conf = new SparkConf().setAppName("IpLocation").setMaster("local[2]") val sc = new SparkContext(conf) // 1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 val ipRulesRdd = sc.textFile("j://information//ip.txt").map(line =>{ val fields = line.split("\\|") val startIP = fields(2)//起始IP val endIP = fields(3)//结束IP val province = fields(6)//省份 (startIP, endIP, province) }) //全部的ip映射规则 val ipRulesArrary = ipRulesRdd.collect() //广播规则,将需要广播的数据广播到集群中的相应的Executor val ipRulesBroadcast = sc.broadcast(ipRulesArrary) //加载要处理的数据,读取用户点击流日志 val ipsRDD = sc.textFile("j://information//access_log").map(line => { val fields = line.split("\\|") val ip = fields(1) val ipToLong = ip2Long(ip)//把IP转换成Long类型 val arr = ipRulesBroadcast.value//拿到广播变量中的数据 val index = binarySearch(arr, ipToLong)//得到IP索引 val province = arr(index)._3//根据索引找到对应的省份 (province,1) }) //计算结果 val res = ipsRDD.reduceByKey(_+_) //向MySQL写入数据 res.foreachPartition(data2MySQL(_)) //println(res.collect().toBuffer) sc.stop() } }
打开Mysql数据库,新建数据库bigdata,新建表location_info,新建四个字段,id(非空,自增),location(varchar),counts(int),access_date(datetime)。运行程序,刷新数据库,可见如下结果:
相关文章推荐
- BOS项目练习(权限/角色/用户管理(CRUD),基于数据库实现动态授权,ehcache缓存权限,shiro标签,菜单权限展示)
- Spark项目练习(计算用户停留时间最长的两个小区)
- Spark项目练习(计算用户访问学科子网页的top3)
- Spark日志分析项目Demo(4)--RDD使用,用户行为统计分析
- 冷门规则引擎drools:从数据库中,根据drt动态生成规则并附上具体项目逻辑
- 框架 day54 BOS项目练习(权限/角色/用户管理(CRUD),基于数据库实现动态授权,ehcache缓存权限,shiro标签,菜单权限展示)
- 统计各个数据库的各个数据表的总数,然后写入到excel中
- Spark 用户访问量
- shell练习(2)--awk统计ip访问量(一条命令)
- Spring aop实现用户操作日志写入数据库
- Spark实现用户量增长实时统计
- Spark实现用户量增长实时统计
- 通过beego快速创建一个Restful风格API项目及API文档自动化 本文演示如何快速(一分钟内,不写一行代码)的根据数据库及表创建一个Restful风格的API项目,及提供便于在线测试API的界
- linux下面根据不同的日期创建不同文件,一般用户数据库的备份的shell编程
- Shell练习-统计出每个IP的访问量有多少?
- 第96讲 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统(数据库)中
- 用户统计 - mysql根据身份证号计算用户年龄
- Spark实现用户量增长实时统计