您的位置:首页 > 编程语言

《理财市场情绪监测系统》代码实现【2】之爬虫数据解析

2017-07-17 10:05 791 查看
数据源为从新浪,腾讯,搜狐三个财经网站爬取而来,C++先进行过分词;

这边对分词后的词进行处理,代码如下:

/**
* Created by lkl on 2017/6/26.
*///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar
import java.sql.{DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object titlesplit {

val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"

classOf[com.mysql.jdbc.Driver]
val conn = DriverManager.getConnection(rl)
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val format = new java.text.SimpleDateFormat("yyyyMMdd")

val yearformat = new java.text.SimpleDateFormat("yyyy")
val   year=yearformat.format(new java.util.Date().getTime())

val monthformat = new java.text.SimpleDateFormat("MM")
val   month=monthformat.format(new java.util.Date().getTime())

val dayformat = new java.text.SimpleDateFormat("dd")
val   day=dayformat.format(new java.util.Date().getTime())

val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000)
val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000)

val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
// val log01= sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/"+year+"/"+month+"/"+day+"/events_192.168.0.217_datacenter4.1499879147814")
val  log01=sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/14/events_192.168.0.217_datacenter4.1499994258650.gzip")
///user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/13
val  l=log01.map(line=>(line.split("\",\"")(1).split("\":\"")(1),line.split("\",\"")(4).split("\":\"")(1),line.split("\",\"")(12).split("\":\"")(1)
,line.split("\",\"")(13).split("\":\"")(1)
,line.split("\",\"")(23).split("\":\"")(1)))

val role = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
import sqlContext.implicits._
val df=l.toDF("channelType","sourcetitle","title","time","innerSessionId")
df.printSchema()
df.insertIntoJDBC(role, "newstitles", true)

val job = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "newstitle")
val jo = job.toDF().registerTempTable("job")
val ed = sqlContext.sql("select `innerSessionId`,`time`,`channelType`,`sourcetitle`,`title` from job")

val pp = ed.map(p => {
val v0 = p.getString(0)
val v1 = p.getString(1)
val v2 = p.getString(2)
val v3 = p.getString(3)
val v4 = p.getString(4)
val v5 = p.getString(4).split("\\|")
(v0, v1, v2, v3,v4, v5)
})

pp.foreach(p => {
for (i <- 0 until p._6.size) {
println(p._6.size)
val v0 = p._1
val v1 = p._2
val v2 = p._3
val v3 = p._4
val v4 = p._5
val v5 = p._6(i).split(" ")
if (v5.size == 4) {
println("12")
insert(v0, v1, v2, v3,v4, v5(0), v5(1), v5(2), v5(3))
}

}
conn.close()
})

def insert(value0: String, value1: String, value2: String, value3: String, value4: String, value5: String,
value6: String, value7: String, value8: String): Unit = {

println(value0, value1, value2, value3, value4, value5, value6, value7, value8)

// CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100))
try {
val prep = conn.prepareStatement("INSERT INTO titlesplit(innserSessionid,times,channelType,sourcetitle,title,words,characters,refer,role) VALUES (?,?,?,?,?,?,?,?,?) ")
prep.setString(1, value0)
prep.setString(2, value1)
prep.setString(3, value2)
prep.setString(4, value3)
prep.setString(5, value4)
prep.setString(6, value5)
prep.setString(7, value6)
prep.setString(8, value7)
prep.setString(9, value8)
prep.executeUpdate
} catch {
case e: Exception => e.printStackTrace
}
finally {
conn.close()
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐