spark:SparkSQL应用--46
2015-06-03 11:32
260 查看
由于spark1.3.x版本后不支持hive,所以我对某本很火的spark书上的两个例子进行改写使用sparksql:
package llf import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.{SparkContext, SparkConf} import scala.collection.mutable.ListBuffer /** * Created by sendoh on 2015/6/3. */ object NewSM { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 3) { println("Usage: java -jar code.jar dependency_jars file_location save_location") System.exit(0) } val jars = ListBuffer[String]() args(0).split(',').map(jars += _) //设置运行环境 val conf = new SparkConf().setAppName("NewSM").setMaster("local[2]").setSparkHome("/usr/local/spark-1.2.0-bin-hadoop2.4").setJars(jars) val sc = new SparkContext(conf) val sqlcontext = new SQLContext(sc) //通过sparkSQL查出每个店的销售数量和金额 import sqlcontext.createSchemaRDD //Date定义了日期的分类,将每天分别赋予所属的月份、星期、季度等属性 //日期、年月、年、月、日、周几、第几周、季度、旬、半月 case class Date(dateID: String, theyearmonth: String, theyear: String, themonth: String, thedate: String, theweek: String, theweeks: String, thequot: String, thetenday: String, thehalfmonth: String) //Stock文件定义了订单表头 //订单号、交易位置、交易日期 case class Stock(ordernumber: String, locationid: String, dateID: String) //StockDetail文件定义了订单明细 //订单号、行号、货品、数量、金额 case class StockDetail(ordernumber: String, itemid: String, rownum: Int, qty: Int, price: Int, amount: Int) //生成临时表 val saledata = sc.textFile("hdfs://localhost:9000/datatnt/sale.txt").map(_.split(",")).map(p => Date(p(0),p(1),p(2),p(3),p(4) ,p(5),p(6),p(7),p(8),p(9).trim.toString)).registerTempTable("SALEDATA") val salestock = sc.textFile("hdfs://localhost:9000/datatnt/stock.txt").map(_.split(",")).map(t => Stock(t(0),t(1),t(2).trim.toString)).registerTempTable("SALESTOCK") val salestockdetail = sc.textFile("hdfs://localhost:9000/datatnt/stockdetail.txt").map(_.split(",")).map(y => StockDetail(y(0),y(1),y(2).toInt,y(3).toInt,y(4).toInt,y(5).trim.toInt)).registerTempTable("SALESTOCKDETAIL") // //sqlcontext.sql("SET spark.sql.shuffle.partitions = 20")// val sqldata = sqlcontext.sql("select a.locationid, sum(b.qty) totalqty, sum(b.amount) totalamount from SALESTOCK" + "tblStock a join tblstockdetail b on a.ordernumber = b.ordernumber group by a.locationid") //将查询数据转换成向量 val parsedData = sqldata.map{ case Row(_, totalqty, totalamount) => val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble) Vectors.dense(features) } //对数据集聚类,3个类,20次迭代,形成数据模型 val numClusters = 3 val numIterations = 20 val model = KMeans.train(parsedData, numClusters, numIterations) //用模型对读入的数据进行分类并输出 //由于partition没设置,输出为200个小文件,可以用bin/hdfs dfs -getmerge 合并下载到本地 val result2 = sqldata.map{ case Row(locationid, totalqty, totalamount) => val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble) val linevectore = Vectors.dense(features) val prediction = model.predict(linevectore) locationid + " " + totalqty + " " + totalamount + " " + prediction }.saveAsTextFile("hdfs://localhost:9000/outdatatnt/sqlmllib") sc.stop() } }////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
package llf import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.{SparkContext, SparkConf} import scala.collection.mutable.ListBuffer /** * Created by sendoh on 2015/6/3. */ object NewSGX { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 3) { println("Usage: java -jar code.jar dependency_jars file_location save_location") System.exit(0) } val jars = ListBuffer[String]() args(0).split(',').map(jars += _) //设置运行环境 val conf = new SparkConf().setAppName("NewSGX").setMaster("local[2]").setSparkHome("/usr/local/spark-1.2.0-bin-hadoop2.4").setJars(jars) val sc = new SparkContext(conf) val sqlcontext = new SQLContext(sc) //通过sparkSQL import sqlcontext.createSchemaRDD //Vertices定义了顶点 //ID,定点名称 case class Vertices(id: String, title: String) //Edges文件定义了边 // case class Edges(srcid: String, distid: String) //生成临时表 val salevertices = sc.textFile("hdfs://localhost:9000/datatnt/salevertices.txt").map(_.split(",")).map(p => Vertices(p(0),p(1).trim.toString)) .registerTempTable("SALEVERTICES") val saleedges = sc.textFile("hdfs://localhost:9000/datatnt/edges.txt").map(_.split(",")).map(t => Edges(t(0),t(1).trim.toString)).registerTempTable("SALEEDGES") // val verticesdata = sqlcontext.sql("select id, title from SALEVERTICES") val edgesdata = sqlcontext.sql("select srcid, distid from SALEDEGS") //装载顶点和边 val vertices = verticesdata.map{ case Row(id, title) => (id.toString.toLong, title.toString)} val edges = edgesdata.map{ case Row(srcid, distid) => Edge(srcid.toString.toLong, distid.toString.toLong, 0)} //构建图 val graph = Graph(vertices, edges, "").persist() //pageRank算法里面使用了cache(),所以前面persist时只能使用MEMORY_ONLY println("################################################################") println("PageRank计算,获取最有价值的数据") println("################################################################") val prGraph = graph.pageRank(0.001).cache() val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices){ (v, title, rank) => (rank.getOrElse(0.0), title) } titleAndPrGraph.vertices.top(10){ Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1) }.foreach(t => println(t._2._2 + ":" + t._2._1)) sc.stop() } }
相关文章推荐
- Oracle抓取表结构的语句
- sql server抓取表结构的语句
- html5 websql 基本语法汇总
- DataBase 之 数据库中的系统表
- MySQL运行状态show status详解
- Redis缓存的安装和使用
- Java学习 - Java操作MongoDB
- 三种东西永远不要放到数据库里
- oracle 如何预估将要创建的索引的大小
- sql中 in和exists的效率
- SQL Server存储过程数组参数
- (JAVA)Mongodb 3.0.2 增、删、改、查 简单示例
- oracle错误代码大全(超详细)
- DataBase 之 数据库常用函数及查看
- oracle 条件:1=1或1=0,动态添加条件
- oracle 删除表、数据
- 待性能改善的一个SQL
- MySQL的InnoDB索引原理详解
- PostgreSQL 简单配置
- [MySQL] xtrabakcup原理