Spark 操作数据库API
2017-08-25 17:10
148 查看
import java.sql.{Connection, Date, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
object IPLocation {
val data2MySQL = (iterator: Iterator[(String, Int)]) => {
var conn: Connection = null
var ps : PreparedStatement = null
val sql = "INSERT INTO location_info (location, counts, accesse_date) VALUES (?, ?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456")
iterator.foreach(line => {
ps = conn.prepareStatement(sql)
ps.setString(1, line._1)
ps.setInt(2, line._2)
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.executeUpdate()
})
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null)
ps.close()
if (conn != null)
conn.close()
}
}
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation")
val sc = new SparkContext(conf)
val ipRulesRdd = sc.textFile("c://ip.txt").map(line =>{
val fields = line.split("\\|")
val start_num = fields(2)
val end_num = fields(3)
val province = fields(6)
(start_num, end_num, province)
})
//全部的ip映射规则
val ipRulesArrary = ipRulesRdd.collect()
//广播规则
val ipRulesBroadcast = sc.broadcast(ipRulesArrary)
//加载要处理的数据
val ipsRDD = sc.textFile("c://access_log").map(line => {
val fields = line.split("\\|")
fields(1)
})
val result = ipsRDD.map(ip => {
val ipNum = ip2Long(ip)
val index = binarySearch(ipRulesBroadcast.value, ipNum)
val info = ipRulesBroadcast.value(index)
//(ip的起始Num, ip的结束Num,省份名)
info
}).map(t => (t._3, 1)).reduceByKey(_+_)
//向MySQL写入数据
result.foreachPartition(data2MySQL(_))
//println(result.collect().toBuffer)
sc.stop()
}
}
import org.apache.spark.{SparkConf, SparkContext}
object IPLocation {
val data2MySQL = (iterator: Iterator[(String, Int)]) => {
var conn: Connection = null
var ps : PreparedStatement = null
val sql = "INSERT INTO location_info (location, counts, accesse_date) VALUES (?, ?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456")
iterator.foreach(line => {
ps = conn.prepareStatement(sql)
ps.setString(1, line._1)
ps.setInt(2, line._2)
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.executeUpdate()
})
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null)
ps.close()
if (conn != null)
conn.close()
}
}
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation")
val sc = new SparkContext(conf)
val ipRulesRdd = sc.textFile("c://ip.txt").map(line =>{
val fields = line.split("\\|")
val start_num = fields(2)
val end_num = fields(3)
val province = fields(6)
(start_num, end_num, province)
})
//全部的ip映射规则
val ipRulesArrary = ipRulesRdd.collect()
//广播规则
val ipRulesBroadcast = sc.broadcast(ipRulesArrary)
//加载要处理的数据
val ipsRDD = sc.textFile("c://access_log").map(line => {
val fields = line.split("\\|")
fields(1)
})
val result = ipsRDD.map(ip => {
val ipNum = ip2Long(ip)
val index = binarySearch(ipRulesBroadcast.value, ipNum)
val info = ipRulesBroadcast.value(index)
//(ip的起始Num, ip的结束Num,省份名)
info
}).map(t => (t._3, 1)).reduceByKey(_+_)
//向MySQL写入数据
result.foreachPartition(data2MySQL(_))
//println(result.collect().toBuffer)
sc.stop()
}
}
相关文章推荐
- PHP数据库操作:从MySQL原生API到PDO
- 在Java的Spring框架的程序中使用JDBC API操作数据库
- Spark 下操作 HBase(1.0.0 新 API)
- spark连接数据库操作(scala实现)
- 在Spark 1.2.0的spark-shell中进行Hive数据库的操作
- Spark SQL相关API操作实例 spark研习第五季
- Spark操作数据库
- ODBC api数据库操作(续)
- Cassandra 使用Thrift API操作数据库--读写单行多列(切片)数据
- sqlite3的C语言API操作数据库的实例分析
- 如何直接使用ODBC提供的API来操作数据库呢?
- Django学习笔记5-玩转API-Django数据库操作
- Spark代码4之Spark 文件API及其对搜狗数据的操作
- C++数据库API操作mysql数据库的使用
- 用api实现数据库的操作!
- python数据库操作(1)--Mysql/Sqlite--DB-API
- Django视图之ORM数据库查询操作API的实例
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- Django学习笔记(二)----签到设计逻辑及数据库基本操作API
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset