sparkcookbook-GettingStarted with ML
2015-09-06 14:11
399 查看
创建向量
$ spark-shell import org.apache.spark.mllib.linalg.{Vectors,Vector} val dvPerson = Vectors.dense(160.0,69.0,24.0) val svPerson = Vectors.sparse(3,Array(0,1,2),Array(160.0,69.0,24.0)
密集向量方法定义:def dense(values: Array[Double]): Vector
稀疏向量定义:def sparse(size: Int, indices: Array[Int], values: Array[Double]):
Vector
size是向量大小,indices是严格增的数组下标,数组值必须和indices等长
实战
1,创建各种标签点scala> import org.apache.spark.mllib.linalg.{Vectors,Vector} import org.apache.spark.mllib.regression.LabeledPoint val willBuySUV = LabeledPoint(1.0,Vectors. dense(300.0,80,40)) val willNotBuySUV = LabeledPoint(0.0,Vectors. dense(150.0,60,25)) val willBuySUV = LabeledPoint(1.0,Vectors.sparse(3,Array(0, 1,2),Array(300.0,80,40))) val willNotBuySUV = LabeledPoint(0.0,Vectors.sparse(3,Array (0,1,2),Array(150.0,60,25)))
2,创建包含相同数据的libsvm文件:
$vi person_libsvm.txt (libsvm indices start with 1) 0 1:150 2:60 3:25 1 1:300 2:80 3:40
3,上传到hdfs:
$ hdfs dfs -put person_libsvm.txt person_libsvm.txt
4,加载文件
import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val persons = MLUtils.loadLibSVMFile(sc,"person_libsvm. txt")
创建矩阵
$spark-shell import org.apache.spark.mllib.linalg.{Vectors,Matrix, Matrices} val people = Matrices.dense(3,2,Array(150d,60d,25d, 300d,80d,40d)) val personRDD = sc.parallelize(List(Vectors. dense(150,60,25), Vectors.dense(300,80,40))) import org.apache.spark.mllib.linalg.distributed. {IndexedRow, IndexedRowMatrix,RowMatrix, CoordinateMatrix, MatrixEntry} val personMat = new RowMatrix(personRDD) print(personMat.numRows) print(personMat.numCols) val personRDD = sc.parallelize(List(IndexedRow(0L, Vectors. dense(150,60,25)), IndexedRow(1L, Vectors.dense(300,80,40)))) val pirmat = new IndexedRowMatrix(personRDD) print(pirmat.numRows) print(pirmat.numCols) val personMat = pirmat.toRowMatrix val meRDD = sc.parallelize(List( MatrixEntry(0,0,150), //(列,行,值) MatrixEntry(1,0,60), MatrixEntry(2,0,25), MatrixEntry(0,1,300), MatrixEntry(1,1,80), MatrixEntry(2,1,40) )) val pcmat = new CoordinateMatrix(meRDD) print(pcmat.numRows) //3 print(pcmat.numCols) //2
计算统计量
$ spark-shell import org.apache.spark.mllib.linalg.{Vectors,Vector} import org.apache.spark.mllib.stat.Statistics val personRDD = sc.parallelize(List(Vectors. dense(150,60,25), Vectors.dense(300,80,40))) //计算列统计值 val summary = Statistics.colStats(personRDD) print(summary.mean) print(summary.variance) print(summary.numNonzeros)//列非零值 print(summary.count)//样本大小 print(summary.max)//列最大值
计算相关系数
import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statistics val sizes = sc.parallelize(List(2100, 2300, 2046, 4314, 1244, 4608, 2173, 2750, 4010, 1959.0)) val prices = sc.parallelize(List(1620000 , 1690000, 1400000, 2000000, 1060000, 3830000, 1230000, 2400000, 3380000, 1480000.00)) 由于缺少第三个参数,默认使用pearson相关系数 val correlation = Statistics.corr(sizes,prices) correlation: Double = 0.8577177736252577 val correlation = Statistics.corr(sizes,prices) val correlation = Statistics.corr(sizes,prices,"spearman")
假设检验
$ spark-shell scala> import org.apache.spark.mllib.stat.Statistics scala> import org.apache.spark.mllib.linalg.{Vector,Vectors} scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices} val dems = Vectors.dense(32.0,41.0) val reps= Vectors.dense(28.0,25.0) val indies = Vectors.dense(34.0,26.0) //Do the chi-square goodness of fit test of the observed data against uniform //distribution:对观测数据做卡方拟合优度检验 scala> val dfit = Statistics.chiSqTest(dems) dfit: org.apache.spark.mllib.stat.test.ChiSqTestResult = Chi squared test summary: method: pearson degrees of freedom = 1 statistic = 1.1095890410958904 pValue = 0.29217129931682495 No presumption against null hypothesis: observed follows the same distribution as expected.. scala> val rfit = Statistics.chiSqTest(reps) rfit: org.apache.spark.mllib.stat.test.ChiSqTestResult = Chi squared test summary: method: pearson degrees of freedom = 1 statistic = 0.16981132075471697 pValue = 0.6802795473344502 No presumption against null hypothesis: observed follows the same distribution as expected.. scala> val ifit = Statistics.chiSqTest(indies) ifit: org.apache.spark.mllib.stat.test.ChiSqTestResult = Chi squared test summary: method: pearson degrees of freedom = 1 statistic = 1.0666666666666667 pValue = 0.3016995824783478 No presumption against null hypothesis: observed follows the same distribution as expected.. scala> print(dfit) scala> print(rfit) scala> print(ifit) //输入矩阵 val mat = Matrices.dense(2,3,Array(32.0,41.0, 28.0,25.0, 34.0,26.0)) //卡方独立性检验 val in = Statistics.chiSqTest(mat) print(in) Chi squared test summary: method: pearson degrees of freedom = 2 statistic = 2.324830449774039 pValue = 0.31272995485237365 No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
使用ML创建机器学习管道
Spark ML是一个用于构建机器学习管道的库,用于把多个机器学习算法组合为一个管道,向dataset一样使用dataframe理解一些基本概念,使用transformers把一个数据框转为另一个数据框,例如添加一个列,类似数据库中的alter table.
Estimator代表一个机器学习算法,它从数据中学习,dataframe输入到estimator 得到transformer.每个Estimator 有一个fit()方法,执行训练算法。
机器学习管道可以定义为一系列stages,每个stages是一个estimator 或者一个transformer
下面我们判断一个人是否是一个篮球运动员,有一个estimator 和一个transformer组成的管道。
Estimator 得到训练数据训练算法,然后transformer做预测。
假设使用LogisticRegression算法
$ spark-shell scala> import org.apache.spark.mllib.linalg.{Vector,Vectors} scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.ml.classification. LogisticRegression val lebron = LabeledPoint(1.0,Vectors.dense(80.0,250.0)) val tim = LabeledPoint(0.0,Vectors.dense(70.0,150.0)) val brittany = LabeledPoint(1.0,Vectors.dense(80.0,207.0)) val stacey = LabeledPoint(0.0,Vectors.dense(65.0,120.0)) //创建训练RDD val trainingRDD = sc.parallelize(List(lebron,tim,brittany,s tacey)) //创建训练数据框 val trainingDF = trainingRDD.toDF //这里可能会报错: INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/09/06 14:00:45 INFO metastore.ObjectStore: ObjectStore, initialize called 15/09/06 14:00:46 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/09/06 14:00:46 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/09/06 14:00:46 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) //看来是classpath没设置好 //这时需要用spark-shell --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar来启动spark-shell //创建一个LogisticRegression estimator val estimator = new LogisticRegression //通过拟合数据框创建一个transformer val transformer = estimator.fit(trainingDF) //测试数据集: val john = Vectors.dense(90.0,270.0)//正样本 val tom = Vectors.dense(62.0,120.0) //负样本 //创建一个训练RDD val testRDD = sc.parallelize(List(john,tom)) //创建一个特征case class case class Feature(v:Vector) val featuresRDD = testRDD.map( v => Feature(v)) val featuresDF = featuresRDD.toDF("features") //添加predictions列 val predictionsDF = transformer.transform(featuresDF) predictionsDF.foreach(println) [[90.0,270.0],[-61.88475862420546,61.88475862420546],[1.329813739095893E-27,1.0],1.0] [[62.0,120.0],[31.460769105952664,-31.460769105952664],[0.9999999999999782,2.1715087358995634E-14],0.0] val shorterPredictionsDF = predictionsDF. select("features","prediction") //重命名 val playerDF = shorterPredictionsDF.toDF("features","isBask etBallPlayer") playerDF.printSchema //root // |-- features: vector (nullable = true) // |-- isBasketBallPlayer: double (nullable = true)
监督学习
线性回归
$ spark-shell scala> import org.apache.spark.mllib.linalg.Vectors scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.regression. LinearRegressionWithSGD scala> val points = Array( LabeledPoint(1620000,Vectors.dense(2100)), LabeledPoint(1690000,Vectors.dense(2300)), LabeledPoint(1400000,Vectors.dense(2046)), LabeledPoint(2000000,Vectors.dense(4314)), LabeledPoint(1060000,Vectors.dense(1244)), LabeledPoint(3830000,Vectors.dense(4608)), LabeledPoint(1230000,Vectors.dense(2173)), LabeledPoint(2400000,Vectors.dense(2750)), LabeledPoint(3380000,Vectors.dense(4010)), LabeledPoint(1480000,Vectors.dense(1959)) ) val pricesRDD = sc.parallelize(points) val model = LinearRegressionWithSGD.train(pricesRDD,100,0.0 000006,1.0,Vectors.zeros(1)) //递归100次,步长为0.0000006,每次递归用所有数据集,初始权重为1 //预测 val prediction = model.predict(Vectors.dense(2500))
惩罚函数(损失函数)
一般最小化惩罚函数是使用梯度下降法,但是spark是使用随机梯度下降法实现的。lasso法线性回归
lasso最小化误差函数,限定系数绝对值的和。普通最小二乘法有两个挑战:
预测精度,OLS预测通常有低预测偏差,高方差,可以通过减小一些系数来改善预测精度,可能会增加一些偏差,但是整个预测精度会得到提升。
解释,有很多预测变量,希望找到其中对相应贡献最大的几个。
$ spark-shell scala> import org.apache.spark.mllib.linalg.Vectors scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.regression.LassoWithSGD scala> val points = Array( LabeledPoint(1,Vectors.dense(5,3,1,2,1,3,2,2,1)), LabeledPoint(2,Vectors.dense(9,8,8,9,7,9,8,7,9)) ) val rdd = sc.parallelize(points) val model = LassoWithSGD.train(rdd,100,0.02,2.0) //检查有多少predictor的系数被设为0 model.weights org.apache.spark.mllib.linalg.Vector = [0.13455106581619633,0.0224 0732644670294,0.0,0.0,0.0,0.01360995990267153,0.0,0.0,0.0]
9个predictors中有6个的系数被设置为0,这就是lasso的主要特点:认为没有用的predictor递归删除(通过设置系数为0)
岭回归
$ spark-shell scala> import org.apache.spark.mllib.linalg.Vectors scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.regression. RidgeRegressionWithSGD scala> val points = Array( LabeledPoint(1,Vectors.dense(5,3,1,2,1,3,2,2,1)), LabeledPoint(2,Vectors.dense(9,8,8,9,7,9,8,7,9)) ) val rdd = sc.parallelize(points) val model = RidgeRegressionWithSGD.train(rdd,100,0.02,2.0) model.weights org.apache.spark.mllib.linalg.Vector = [0.049805969577244584,0.029 883581746346748,0.009961193915448916,0.019922387830897833,0.009961 193915448916,0.029883581746346748,0.019922387830897833,0.019922387 830897833,0.009961193915448916]
第8章:分类
logistic 回归
$ spark-shell import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.classification. LogisticRegressionWithLBFGS val points = Array( LabeledPoint(0.0,Vectors.dense(0.245)), LabeledPoint(0.0,Vectors.dense(0.247)), LabeledPoint(1.0,Vectors.dense(0.285)), LabeledPoint(1.0,Vectors.dense(0.299)), LabeledPoint(1.0,Vectors.dense(0.327)), LabeledPoint(1.0,Vectors.dense(0.347)), LabeledPoint(0.0,Vectors.dense(0.356)), LabeledPoint(1.0,Vectors.dense(0.36)), LabeledPoint(0.0,Vectors.dense(0.363)), LabeledPoint(1.0,Vectors.dense(0.364)), LabeledPoint(0.0,Vectors.dense(0.398)), LabeledPoint(1.0,Vectors.dense(0.4)), LabeledPoint(0.0,Vectors.dense(0.409)), LabeledPoint(1.0,Vectors.dense(0.421)), LabeledPoint(0.0,Vectors.dense(0.432)), LabeledPoint(1.0,Vectors.dense(0.473)), LabeledPoint(1.0,Vectors.dense(0.509)), LabeledPoint(1.0,Vectors.dense(0.529)), LabeledPoint(0.0,Vectors.dense(0.561)), LabeledPoint(0.0,Vectors.dense(0.569)), LabeledPoint(1.0,Vectors.dense(0.594)), LabeledPoint(1.0,Vectors.dense(0.638)), LabeledPoint(1.0,Vectors.dense(0.656)), LabeledPoint(1.0,Vectors.dense(0.816)), LabeledPoint(1.0,Vectors.dense(0.853)), LabeledPoint(1.0,Vectors.dense(0.938)), LabeledPoint(1.0,Vectors.dense(1.036)), LabeledPoint(1.0,Vectors.dense(1.045))) val spiderRDD = sc.parallelize(points) //训练数据集 val lr = new LogisticRegressionWithLBFGS(). setIntercept(true) val model = lr.run(spiderRDD) val predict = model.predict(Vectors.dense(0.938))
支持向量机
$ hdfs dfs -put /opt/infoobjects/spark/data/mllib/sample_libsvm_ data.txt /user/hduser/sample_libsvm_data.txt $ spark-shell import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.evaluation. BinaryClassificationMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils val svmData = MLUtils.loadLibSVMFile(sc,"sample_libsvm_ data.txt") //记录数 svmData.count //二分为训练集和测试集 val trainingAndTest = svmData.randomSplit(Array(0.5,0.5)) val trainingData = trainingAndTest(0) val testData = trainingAndTest(1) val model = SVMWithSGD.train(trainingData,100) //创建一个tuple保存预测值和真实值 val predictionsAndLabels = testData.map( r => (model. predict(r.features),r.label)) predictionsAndLabels.filter(p => p._1 != p._2).count
使用决策树分类
决策树是最直观的机器学习算法,有很多有用的特点:易于理解和解释
可以处理类别变量和连续变岭
可以处理缺失特征
不需要特征归一化
训练数据 打球/下雨/刮风/气温 $vi tennis.csv 0.0,1.0,1.0,2.0 0.0,1.0,1.0,1.0 0.0,1.0,1.0,0.0 0.0,0.0,1.0,2.0 0.0,0.0,1.0,0.0 1.0,0.0,0.0,2.0 1.0,0.0,0.0,1.0 0.0,0.0,0.0,0.0
$ spark-shell import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.impurity.Entropy val data = sc.textFile("tennis.csv") val parsedData = data.map { line => val parts = line.split(',').map(_.toDouble) LabeledPoint(parts(0), Vectors.dense(parts.tail)) } val model = DecisionTree.train(parsedData, Classification, Entropy, 3) //新建一个特征 val v=Vectors.dense(0.0,1.0,0.0) model.predict(v)
相关文章推荐
- ACM —— 1001 Exponentiation
- JavaScript变量命名规则:匈牙利命名法
- reactor模式
- 利用jdk自带的运行监控工具JConsole观察分析Java程序的运行
- CSDN学院 免费技术答疑公开课,本周五场即将开播~~~
- smssdk导入AndroidStudio出现 com.android.dex.DexException: Multiple dex files define 解决方法
- laravel子域名和主站同站
- libevent API 介绍
- wc命令--linux
- MySQL数据库学习
- String.Split()函数
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf
- dffdfdf