烟草零售聚类1、KM_classic
2016-04-19 15:43
363 查看
package tobacco import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.feature.StandardScaler import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkContext, SparkConf} import data.copy.{getspkCtxAndsqlCtx,isColumnNameLine} /** * Created by xcheng on 4/6/16. * * 门店销量,平均价格聚类 * */ object KM_classic { def main(args: Array[String]) { val conf: SparkConf = new SparkConf() conf.setAppName("店铺聚类").setMaster("local[4]") val sparkContext: SparkContext = new SparkContext(conf) sparkContext.setLogLevel("OFF") val sqlContext: SQLContext = new SQLContext(sparkContext) // val (sparkContext: SparkContext, sqlContext: SQLContext) = getspkCtxAndsqlCtx("店铺聚类") val SalePath="/Users/xcheng/Desktop/实验室/销售库存/saledata/*" val rawsale: RDD[String] = sparkContext.textFile(SalePath) val parsedsale: RDD[(Long,String, String, String,String,String)] = rawsale.filter(!isColumnNameLine(_,"SALE_ID")).map(line=>saleparser(line)) sqlContext.createDataFrame(parsedsale).toDF("PUH_TIME","ITEM_CODE","QTY","COM_NAME","PRICE","AMT").registerTempTable("saletable") /** * 门店总销量,品牌数 */ sqlContext.sql("select COM_NAME,ITEM_CODE,sum(QTY) ALLQTY from saletable group by ITEM_CODE,COM_NAME").registerTempTable("comitemqtytable") sqlContext.sql("select count(distinct ITEM_CODE) from saletable").show()//386 sqlContext.sql("select count(distinct COM_NAME) FROM saletable").show()//76/三个超市是80, sqlContext.sql("select count(*) from comitemqtytable").show()//5371 sqlContext.sql("select * from comitemqtytable order by COM_NAME,ITEM_CODE").show() /** * 门店总销售额 */ sqlContext.sql("select COM_NAME, sum(QTY)sumQTY ,sum(AMT)sumAMT FROM saletable where QTY>0 group by COM_NAME ").registerTempTable("totalsale") /** * 门店平均价格 */ sqlContext.sql("select COM_NAME,sumAMT/sumQTY avgprice from totalsale ").registerTempTable("comavgprice")//.show() /** * 门店品牌数 */ /** * 门店销量,均价 */ val qtyprice= sqlContext.sql("select a.COM_NAME,sumQTY,avgprice from totalsale a join comavgprice b on a.COM_NAME=b.COM_NAME") qtyprice.registerTempTable("qtyprice") val tranningdata = qtyprice.map{ case Row(com_name,sumQTY,avgprice)=> val features =Array[Double](sumQTY.toString.toDouble,avgprice.toString.toDouble) Vectors.dense(features) }.cache() /* 特征标准化 */ val scaler = new StandardScaler( withMean = true,withStd = true ).fit(tranningdata) val scaledVectors =tranningdata.map(v => scaler.transform(v)) scaledVectors.cache() // println("data after scale:") // // scaledVectors.take(10).foreach(println) // print(scaledVectors.count()) // val numClusters = 3 /* val numIterations = Array(1,2,3,4,5,6,7,8,9,10) numIterations.foreach{it=> val kmeansplusmodel = KMeans.train(scaledVectors,numClusters,it) //计算组内误差 val cost=kmeansplusmodel.computeCost(scaledVectors) println("kmeans++ "+"iter="+it+" Within Set Sum of Squared Errors="+cost) } val runs = Array(1,2,3,4,5,6,7,8,9,10,15) runs.foreach{r=> val kmeansplusmodel = KMeans.train(scaledVectors,numClusters,maxIterations = 5,runs=r) val cost=kmeansplusmodel.computeCost(scaledVectors) println("kmeans++ "+"runs="+r+"Within Set Sum of Squared Errors="+cost) val kmeanmodel = KMeans.train(scaledVectors,numClusters,maxIterations = 5,runs=r,initializationMode = "random") val kmeanscost=kmeanmodel.computeCost(scaledVectors) println("kmeans "+"runs="+r+"Within Set Sum of Squared Errors="+kmeanscost) }*/ /* val K =Array(2,3,4,5,6,7,8,9,10) K.foreach{k=> val model = KMeans.train(scaledVectors,k,maxIterations = 100,runs=6) // 计算组内误差 val cost=model.computeCost(scaledVectors) println("kmeans++ "+"k="+k+"Within Set Sum of Squared Errors="+cost) } */ /* 选出的模型 */ /* val model = KMeans.train(scaledVectors,numClusters,maxIterations = 100,runs=5) qtyprice.map{ case Row(com_name,sumQTY,avgprice)=> val features =Array[Double](sumQTY.toString.toDouble,avgprice.toString.toDouble) val line= Vectors.dense(features) val scaledline=scaler.transform(line) val prediction = model.predict(scaledline) com_name+","+sumQTY+","+avgprice+","+scaledline(0)+","+scaledline(1)+","+prediction }.saveAsTextFile("/Users/xcheng/360云盘/销售库存/销量均价KM/聚类结果") */ } def saleparser(line:String)={ val splited = line.split(",") val PUH_TIME=splited(3).trim.toLong/1000000 val ITEM_CODE=splited(4) val QTY=splited(5) val COM_NAME=splited(8) val PRICE=splited(6) val AMT = splited(7) (PUH_TIME,ITEM_CODE,QTY,COM_NAME,PRICE,AMT) } def storeparser(line:String)={ val splited = line.split(",") //LICENSE_CODE,ITEM_CODE,QTY,DATE1,TIME1,COM_NAME val ITEM_CODE =splited(1) val QTY=splited(2).toDouble val DATE=splited(3).toInt val COM_NAME = splited(5)//.toString (ITEM_CODE,QTY,DATE,COM_NAME) } def Retailerparser(line:String)={ val splited = line.split(",") val DAY = splited(0) val ORG_CODE=splited(1) val ORG_NAME=splited(2) val TOBACCO_CODE = splited(3) val TOBACCO_NAME = splited(4) val wholesalePrice = splited(5) val CUS_CODE = splited(6) val CUS_NAME=splited(7) val ADDRESS=splited(8) val STATUS = splited(9) val RetailStatu =splited(10) val PAYKIND=splited(11) val CUSFLOW= splited(12) val MARKETTYPE=splited(13) val NEED=splited(14) val SHENHE=splited(15) val DINGGOU=splited(16) val PRICETYPE=splited(17) val TAR_CONT=splited(18) val AMOUNT=splited(19) val MINUTE =splited(20) (DAY,ORG_CODE,ORG_NAME,TOBACCO_CODE,TOBACCO_NAME , wholesalePrice, CUS_CODE, CUS_NAME, ADDRESS, STATUS, RetailStatu, PAYKIND , CUSFLOW , MARKETTYPE, NEED, SHENHE, DINGGOU, PRICETYPE, TAR_CONT, AMOUNT, MINUTE) } } // scaledVectors.map{ // case Row(sumQTY,avgprice)=> // // val prediction = model.predict(Vectors.dense(Array[Double](sumQTY.toString.toDouble,avgprice.toString.toDouble))) // val features = Array(sumQTY.toString.toDouble,avgprice.toString.toDouble) // features.foreach(println) // val line= Vectors.dense(features) // val prediction = model.predict(line) // sumQTY+" "+avgprice+" "+prediction // }.saveAsTextFile("/Users/xcheng/360云盘/销售库存/超市KMscaledpre") // model.predict(scaledVectors).saveAsTextFile("/Users/xcheng/360云盘/销售库存/超市KMscaledpre")//foreach(println) // scaledVectors.zip(model.predict(scaledVectors)).saveAsTextFile("/Users/xcheng/360云盘/销售库存/超市销量均价KM/4类")
相关文章推荐
- android 菜单
- 栈的一些习题
- HDU 1548 A strange lift-bfs
- JS组件系列——BootstrapTable+KnockoutJS实现增删改查解决方案(二)
- [转载]深入理解Java 8 Lambda
- C#获取本地打印机列表,并将指定打印机设置为默认打印机
- 解读node.js的cluster模块
- SourceTree入门
- 如何让自建组件可以在导航栏使用
- protocol buffer 安装与使用
- 详解js闭包
- 菱形继承
- Jenkins进阶系列之——02email-ext邮件通知模板
- 初始安卓基本控件_TextView
- TreeSet()原理及使用
- python学习
- PHP函数补完:call_user_func()
- wildfly jobss 同时连接多个数据源 datasource xa-datasource
- 浅谈UDP
- 利用ELKStack日志平台分析短信攻击