您的位置:首页 > 数据库

Spark项目练习(根据规则库统计地区用户访问量,写入数据库)

2018-02-01 09:06 441 查看
项目说明:附件为要计算数据的demo。点击打开链接

分析用户访问数据,将访问IP计算分析,根据规则库,统计出各省份的访问量。

其中,用到知识点:1、常用算法二分法。2、IP地址转换成10进制数的简单算法。3、spark对Mysql数据库的操作。

具体程序如下:

package cn.allengao.Location

import java.sql.{Connection, Date, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}

/**
* class_name:
* package:
* describe: 分析用户访问数据,将访问IP计算分析,根据规则库,统计出各省份的访问量。
* creat_user: Allen Gao
* creat_date: 2018/2/1
* creat_time: 9:06
**/
object IPLocation {

val data2MySQL = (iterator: Iterator[(String, Int)]) => {
var conn: Connection = null
var ps : PreparedStatement = null
val sql = "INSERT INTO location_info (location, counts, access_date) VALUES (?, ?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "toor")
iterator.foreach(line => {
ps = conn.prepareStatement(sql)
ps.setString(1, line._1)
ps.setInt(2, line._2)
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.executeUpdate()
})
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null)
ps.close()
if (conn != null)
conn.close()
}
}
//将IP地址转换为Long类型,以方便比较
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum =  fragments(i).toLong | ipNum << 8L
}
ipNum
}
//二分法检索
def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("IpLocation").setMaster("local[2]")
val sc = new SparkContext(conf)
// 1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
val ipRulesRdd = sc.textFile("j://information//ip.txt").map(line =>{
val fields = line.split("\\|")
val startIP = fields(2)//起始IP
val endIP = fields(3)//结束IP
val province = fields(6)//省份
(startIP, endIP, province)
})
//全部的ip映射规则
val ipRulesArrary = ipRulesRdd.collect()
//广播规则,将需要广播的数据广播到集群中的相应的Executor
val ipRulesBroadcast = sc.broadcast(ipRulesArrary)
//加载要处理的数据,读取用户点击流日志
val ipsRDD = sc.textFile("j://information//access_log").map(line => {
val fields = line.split("\\|")
val ip = fields(1)
val ipToLong = ip2Long(ip)//把IP转换成Long类型
val arr = ipRulesBroadcast.value//拿到广播变量中的数据
val index = binarySearch(arr, ipToLong)//得到IP索引
val province = arr(index)._3//根据索引找到对应的省份

(province,1)
})
//计算结果
val res = ipsRDD.reduceByKey(_+_)
//向MySQL写入数据
res.foreachPartition(data2MySQL(_))

//println(res.collect().toBuffer)
sc.stop()
}
}

打开Mysql数据库,新建数据库bigdata,新建表location_info,新建四个字段,id(非空,自增),location(varchar),counts(int),access_date(datetime)。运行程序,刷新数据库,可见如下结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: