您的位置:首页 > 数据库

spark:SparkSQL应用--46

2015-06-03 11:32 260 查看
由于spark1.3.x版本后不支持hive,所以我对某本很火的spark书上的两个例子进行改写使用sparksql:

package llf

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkContext, SparkConf}

import scala.collection.mutable.ListBuffer

/**
* Created by sendoh on 2015/6/3.
*/
object NewSM {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 3) {
println("Usage: java -jar code.jar dependency_jars file_location save_location")
System.exit(0)
}
val jars = ListBuffer[String]()
args(0).split(',').map(jars += _)
//设置运行环境
val conf = new SparkConf().setAppName("NewSM").setMaster("local[2]").setSparkHome("/usr/local/spark-1.2.0-bin-hadoop2.4").setJars(jars)
val sc = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)
//通过sparkSQL查出每个店的销售数量和金额
import sqlcontext.createSchemaRDD
//Date定义了日期的分类,将每天分别赋予所属的月份、星期、季度等属性
//日期、年月、年、月、日、周几、第几周、季度、旬、半月
case class Date(dateID: String, theyearmonth: String, theyear: String, themonth: String, thedate: String,
theweek: String, theweeks: String, thequot: String, thetenday: String, thehalfmonth: String)
//Stock文件定义了订单表头
//订单号、交易位置、交易日期
case class Stock(ordernumber: String, locationid: String, dateID: String)
//StockDetail文件定义了订单明细
//订单号、行号、货品、数量、金额
case class StockDetail(ordernumber: String, itemid: String, rownum: Int, qty: Int, price: Int, amount: Int)
//生成临时表
val saledata = sc.textFile("hdfs://localhost:9000/datatnt/sale.txt").map(_.split(",")).map(p => Date(p(0),p(1),p(2),p(3),p(4)
,p(5),p(6),p(7),p(8),p(9).trim.toString)).registerTempTable("SALEDATA")
val salestock = sc.textFile("hdfs://localhost:9000/datatnt/stock.txt").map(_.split(",")).map(t =>
Stock(t(0),t(1),t(2).trim.toString)).registerTempTable("SALESTOCK")
val salestockdetail = sc.textFile("hdfs://localhost:9000/datatnt/stockdetail.txt").map(_.split(",")).map(y =>
StockDetail(y(0),y(1),y(2).toInt,y(3).toInt,y(4).toInt,y(5).trim.toInt)).registerTempTable("SALESTOCKDETAIL")
//
//sqlcontext.sql("SET spark.sql.shuffle.partitions = 20")//
val sqldata = sqlcontext.sql("select a.locationid, sum(b.qty) totalqty, sum(b.amount) totalamount from SALESTOCK" +
"tblStock a join tblstockdetail b on a.ordernumber = b.ordernumber group by a.locationid")
//将查询数据转换成向量
val parsedData = sqldata.map{
case Row(_, totalqty, totalamount) =>
val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
Vectors.dense(features)
}
//对数据集聚类,3个类,20次迭代,形成数据模型
val numClusters = 3
val numIterations = 20
val model = KMeans.train(parsedData, numClusters, numIterations)
//用模型对读入的数据进行分类并输出
//由于partition没设置,输出为200个小文件,可以用bin/hdfs dfs -getmerge 合并下载到本地
val result2 = sqldata.map{
case Row(locationid, totalqty, totalamount) =>
val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
val linevectore = Vectors.dense(features)
val prediction = model.predict(linevectore)
locationid + " " + totalqty + " " + totalamount + " " + prediction
}.saveAsTextFile("hdfs://localhost:9000/outdatatnt/sqlmllib")
sc.stop()
}

}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

package llf

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkContext, SparkConf}

import scala.collection.mutable.ListBuffer

/**
* Created by sendoh on 2015/6/3.
*/
object NewSGX {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 3) {
println("Usage: java -jar code.jar dependency_jars file_location save_location")
System.exit(0)
}
val jars = ListBuffer[String]()
args(0).split(',').map(jars += _)
//设置运行环境
val conf = new SparkConf().setAppName("NewSGX").setMaster("local[2]").setSparkHome("/usr/local/spark-1.2.0-bin-hadoop2.4").setJars(jars)
val sc = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)
//通过sparkSQL
import sqlcontext.createSchemaRDD
//Vertices定义了顶点
//ID,定点名称
case class Vertices(id: String, title: String)
//Edges文件定义了边
//
case class Edges(srcid: String, distid: String)
//生成临时表
val salevertices = sc.textFile("hdfs://localhost:9000/datatnt/salevertices.txt").map(_.split(",")).map(p => Vertices(p(0),p(1).trim.toString))
.registerTempTable("SALEVERTICES")
val saleedges = sc.textFile("hdfs://localhost:9000/datatnt/edges.txt").map(_.split(",")).map(t =>
Edges(t(0),t(1).trim.toString)).registerTempTable("SALEEDGES")
//
val verticesdata = sqlcontext.sql("select id, title from SALEVERTICES")
val edgesdata = sqlcontext.sql("select srcid, distid from SALEDEGS")
//装载顶点和边
val vertices = verticesdata.map{ case Row(id, title) => (id.toString.toLong, title.toString)}
val edges = edgesdata.map{ case Row(srcid, distid) => Edge(srcid.toString.toLong, distid.toString.toLong, 0)}
//构建图
val graph = Graph(vertices, edges, "").persist()
//pageRank算法里面使用了cache(),所以前面persist时只能使用MEMORY_ONLY
println("################################################################")
println("PageRank计算,获取最有价值的数据")
println("################################################################")
val prGraph = graph.pageRank(0.001).cache()
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices){
(v, title, rank) => (rank.getOrElse(0.0), title)
}
titleAndPrGraph.vertices.top(10){
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ":" + t._2._1))
sc.stop()
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: