您的位置:首页 > 其它

sparkcookbook-GettingStarted with ML

2015-09-06 14:11 399 查看

创建向量

$ spark-shell
import org.apache.spark.mllib.linalg.{Vectors,Vector}
val dvPerson = Vectors.dense(160.0,69.0,24.0)
val svPerson = Vectors.sparse(3,Array(0,1,2),Array(160.0,69.0,24.0)


密集向量方法定义:def dense(values: Array[Double]): Vector

稀疏向量定义:def sparse(size: Int, indices: Array[Int], values: Array[Double]):

Vector

size是向量大小,indices是严格增的数组下标,数组值必须和indices等长

实战

1,创建各种标签点

scala> import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.regression.LabeledPoint
val willBuySUV = LabeledPoint(1.0,Vectors.
dense(300.0,80,40))
val willNotBuySUV = LabeledPoint(0.0,Vectors.
dense(150.0,60,25))
val willBuySUV = LabeledPoint(1.0,Vectors.sparse(3,Array(0,
1,2),Array(300.0,80,40)))
val willNotBuySUV = LabeledPoint(0.0,Vectors.sparse(3,Array
(0,1,2),Array(150.0,60,25)))


2,创建包含相同数据的libsvm文件:

$vi person_libsvm.txt (libsvm indices start with 1)
0 1:150 2:60 3:25
1 1:300 2:80 3:40


3,上传到hdfs:

$ hdfs dfs -put person_libsvm.txt person_libsvm.txt


4,加载文件

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val persons = MLUtils.loadLibSVMFile(sc,"person_libsvm.
txt")


创建矩阵

$spark-shell
import org.apache.spark.mllib.linalg.{Vectors,Matrix,
Matrices}
val people = Matrices.dense(3,2,Array(150d,60d,25d,
300d,80d,40d))
val personRDD = sc.parallelize(List(Vectors.
dense(150,60,25), Vectors.dense(300,80,40)))
import org.apache.spark.mllib.linalg.distributed.
{IndexedRow, IndexedRowMatrix,RowMatrix, CoordinateMatrix,
MatrixEntry}
val personMat = new RowMatrix(personRDD)
print(personMat.numRows)
print(personMat.numCols)

val personRDD = sc.parallelize(List(IndexedRow(0L, Vectors.
dense(150,60,25)), IndexedRow(1L, Vectors.dense(300,80,40))))
val pirmat = new IndexedRowMatrix(personRDD)
print(pirmat.numRows)
print(pirmat.numCols)
val personMat = pirmat.toRowMatrix

val meRDD = sc.parallelize(List(
MatrixEntry(0,0,150),   //(列,行,值)
MatrixEntry(1,0,60),
MatrixEntry(2,0,25),
MatrixEntry(0,1,300),
MatrixEntry(1,1,80),
MatrixEntry(2,1,40)
))

val pcmat = new CoordinateMatrix(meRDD)
print(pcmat.numRows)    //3
print(pcmat.numCols)    //2


计算统计量

$ spark-shell
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.stat.Statistics
val personRDD = sc.parallelize(List(Vectors.
dense(150,60,25), Vectors.dense(300,80,40)))
//计算列统计值
val summary = Statistics.colStats(personRDD)
print(summary.mean)
print(summary.variance)
print(summary.numNonzeros)//列非零值
print(summary.count)//样本大小
print(summary.max)//列最大值


计算相关系数

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
val sizes = sc.parallelize(List(2100, 2300, 2046, 4314,
1244, 4608, 2173, 2750, 4010, 1959.0))

val prices = sc.parallelize(List(1620000 , 1690000,
1400000, 2000000, 1060000, 3830000, 1230000, 2400000, 3380000,
1480000.00))
由于缺少第三个参数,默认使用pearson相关系数
val correlation = Statistics.corr(sizes,prices)
correlation: Double = 0.8577177736252577

val correlation = Statistics.corr(sizes,prices)

val correlation = Statistics.corr(sizes,prices,"spearman")


假设检验

$ spark-shell

scala> import org.apache.spark.mllib.stat.Statistics
scala> import org.apache.spark.mllib.linalg.{Vector,Vectors}
scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices}
val dems = Vectors.dense(32.0,41.0)
val reps= Vectors.dense(28.0,25.0)
val indies = Vectors.dense(34.0,26.0)

//Do the chi-square goodness of fit test of the observed data against uniform
//distribution:对观测数据做卡方拟合优度检验
scala> val dfit = Statistics.chiSqTest(dems)
dfit: org.apache.spark.mllib.stat.test.ChiSqTestResult =
Chi squared test summary:
method: pearson
degrees of freedom = 1
statistic = 1.1095890410958904
pValue = 0.29217129931682495
No presumption against null hypothesis: observed follows the same distribution as expected..

scala> val rfit = Statistics.chiSqTest(reps)
rfit: org.apache.spark.mllib.stat.test.ChiSqTestResult =
Chi squared test summary:
method: pearson
degrees of freedom = 1
statistic = 0.16981132075471697
pValue = 0.6802795473344502
No presumption against null hypothesis: observed follows the same distribution as expected..

scala> val ifit = Statistics.chiSqTest(indies)
ifit: org.apache.spark.mllib.stat.test.ChiSqTestResult =
Chi squared test summary:
method: pearson
degrees of freedom = 1
statistic = 1.0666666666666667
pValue = 0.3016995824783478
No presumption against null hypothesis: observed follows the same distribution as expected..

scala> print(dfit)
scala> print(rfit)
scala> print(ifit)

//输入矩阵
val mat = Matrices.dense(2,3,Array(32.0,41.0, 28.0,25.0,
34.0,26.0))
//卡方独立性检验
val in = Statistics.chiSqTest(mat)
print(in)

Chi squared test summary:
method: pearson
degrees of freedom = 2
statistic = 2.324830449774039
pValue = 0.31272995485237365
No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..


使用ML创建机器学习管道

Spark ML是一个用于构建机器学习管道的库,用于把多个机器学习算法组合为一个管道,向dataset一样使用dataframe

理解一些基本概念,使用transformers把一个数据框转为另一个数据框,例如添加一个列,类似数据库中的alter table.

Estimator代表一个机器学习算法,它从数据中学习,dataframe输入到estimator 得到transformer.每个Estimator 有一个fit()方法,执行训练算法。

机器学习管道可以定义为一系列stages,每个stages是一个estimator 或者一个transformer

下面我们判断一个人是否是一个篮球运动员,有一个estimator 和一个transformer组成的管道。

Estimator 得到训练数据训练算法,然后transformer做预测。

假设使用LogisticRegression算法

$ spark-shell
scala> import org.apache.spark.mllib.linalg.{Vector,Vectors}
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.ml.classification.
LogisticRegression

val lebron = LabeledPoint(1.0,Vectors.dense(80.0,250.0))
val tim = LabeledPoint(0.0,Vectors.dense(70.0,150.0))
val brittany = LabeledPoint(1.0,Vectors.dense(80.0,207.0))

val stacey = LabeledPoint(0.0,Vectors.dense(65.0,120.0))
//创建训练RDD
val trainingRDD = sc.parallelize(List(lebron,tim,brittany,s
tacey))
//创建训练数据框
val trainingDF = trainingRDD.toDF
//这里可能会报错:
INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/09/06 14:00:45 INFO metastore.ObjectStore: ObjectStore, initialize called
15/09/06 14:00:46 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/09/06 14:00:46 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/09/06 14:00:46 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
//看来是classpath没设置好

//这时需要用spark-shell  --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar来启动spark-shell

//创建一个LogisticRegression estimator
val estimator = new LogisticRegression
//通过拟合数据框创建一个transformer
val transformer = estimator.fit(trainingDF)
//测试数据集:
val john = Vectors.dense(90.0,270.0)//正样本
val tom = Vectors.dense(62.0,120.0) //负样本
//创建一个训练RDD
val testRDD = sc.parallelize(List(john,tom))
//创建一个特征case class
case class Feature(v:Vector)
val featuresRDD = testRDD.map( v => Feature(v))
val featuresDF = featuresRDD.toDF("features")
//添加predictions列
val predictionsDF = transformer.transform(featuresDF)
predictionsDF.foreach(println)
[[90.0,270.0],[-61.88475862420546,61.88475862420546],[1.329813739095893E-27,1.0],1.0]
[[62.0,120.0],[31.460769105952664,-31.460769105952664],[0.9999999999999782,2.1715087358995634E-14],0.0]

val shorterPredictionsDF = predictionsDF.
select("features","prediction")
//重命名
val playerDF = shorterPredictionsDF.toDF("features","isBask
etBallPlayer")

playerDF.printSchema
//root
// |-- features: vector (nullable = true)
// |-- isBasketBallPlayer: double (nullable = true)


监督学习

线性回归

$ spark-shell
scala> import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.regression.
LinearRegressionWithSGD

scala> val points = Array(
LabeledPoint(1620000,Vectors.dense(2100)),
LabeledPoint(1690000,Vectors.dense(2300)),
LabeledPoint(1400000,Vectors.dense(2046)),
LabeledPoint(2000000,Vectors.dense(4314)),
LabeledPoint(1060000,Vectors.dense(1244)),
LabeledPoint(3830000,Vectors.dense(4608)),
LabeledPoint(1230000,Vectors.dense(2173)),
LabeledPoint(2400000,Vectors.dense(2750)),
LabeledPoint(3380000,Vectors.dense(4010)),
LabeledPoint(1480000,Vectors.dense(1959))
)
val pricesRDD = sc.parallelize(points)

val model = LinearRegressionWithSGD.train(pricesRDD,100,0.0
000006,1.0,Vectors.zeros(1))
//递归100次,步长为0.0000006,每次递归用所有数据集,初始权重为1
//预测
val prediction = model.predict(Vectors.dense(2500))


惩罚函数(损失函数)

一般最小化惩罚函数是使用梯度下降法,但是spark是使用随机梯度下降法实现的。

lasso法线性回归

lasso最小化误差函数,限定系数绝对值的和。

普通最小二乘法有两个挑战:

预测精度,OLS预测通常有低预测偏差,高方差,可以通过减小一些系数来改善预测精度,可能会增加一些偏差,但是整个预测精度会得到提升。

解释,有很多预测变量,希望找到其中对相应贡献最大的几个。

$ spark-shell
scala> import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.regression.LassoWithSGD

scala> val points = Array(
LabeledPoint(1,Vectors.dense(5,3,1,2,1,3,2,2,1)),
LabeledPoint(2,Vectors.dense(9,8,8,9,7,9,8,7,9))
)
val rdd = sc.parallelize(points)
val model = LassoWithSGD.train(rdd,100,0.02,2.0)
//检查有多少predictor的系数被设为0
model.weights
org.apache.spark.mllib.linalg.Vector = [0.13455106581619633,0.0224
0732644670294,0.0,0.0,0.0,0.01360995990267153,0.0,0.0,0.0]


9个predictors中有6个的系数被设置为0,这就是lasso的主要特点:认为没有用的predictor递归删除(通过设置系数为0)

岭回归

$ spark-shell
scala> import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.regression.
RidgeRegressionWithSGD

scala> val points = Array(
LabeledPoint(1,Vectors.dense(5,3,1,2,1,3,2,2,1)),
LabeledPoint(2,Vectors.dense(9,8,8,9,7,9,8,7,9))
)

val rdd = sc.parallelize(points)

val model = RidgeRegressionWithSGD.train(rdd,100,0.02,2.0)

model.weights
org.apache.spark.mllib.linalg.Vector = [0.049805969577244584,0.029
883581746346748,0.009961193915448916,0.019922387830897833,0.009961
193915448916,0.029883581746346748,0.019922387830897833,0.019922387
830897833,0.009961193915448916]


第8章:分类

logistic 回归

$ spark-shell
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.
LogisticRegressionWithLBFGS

val points = Array(
LabeledPoint(0.0,Vectors.dense(0.245)),
LabeledPoint(0.0,Vectors.dense(0.247)),
LabeledPoint(1.0,Vectors.dense(0.285)),
LabeledPoint(1.0,Vectors.dense(0.299)),
LabeledPoint(1.0,Vectors.dense(0.327)),
LabeledPoint(1.0,Vectors.dense(0.347)),
LabeledPoint(0.0,Vectors.dense(0.356)),
LabeledPoint(1.0,Vectors.dense(0.36)),
LabeledPoint(0.0,Vectors.dense(0.363)),
LabeledPoint(1.0,Vectors.dense(0.364)),
LabeledPoint(0.0,Vectors.dense(0.398)),
LabeledPoint(1.0,Vectors.dense(0.4)),
LabeledPoint(0.0,Vectors.dense(0.409)),
LabeledPoint(1.0,Vectors.dense(0.421)),
LabeledPoint(0.0,Vectors.dense(0.432)),
LabeledPoint(1.0,Vectors.dense(0.473)),
LabeledPoint(1.0,Vectors.dense(0.509)),
LabeledPoint(1.0,Vectors.dense(0.529)),
LabeledPoint(0.0,Vectors.dense(0.561)),
LabeledPoint(0.0,Vectors.dense(0.569)),
LabeledPoint(1.0,Vectors.dense(0.594)),
LabeledPoint(1.0,Vectors.dense(0.638)),
LabeledPoint(1.0,Vectors.dense(0.656)),
LabeledPoint(1.0,Vectors.dense(0.816)),
LabeledPoint(1.0,Vectors.dense(0.853)),
LabeledPoint(1.0,Vectors.dense(0.938)),
LabeledPoint(1.0,Vectors.dense(1.036)),
LabeledPoint(1.0,Vectors.dense(1.045)))

val spiderRDD = sc.parallelize(points)
//训练数据集
val lr = new LogisticRegressionWithLBFGS().
setIntercept(true)
val model = lr.run(spiderRDD)

val predict = model.predict(Vectors.dense(0.938))


支持向量机

$ hdfs dfs -put /opt/infoobjects/spark/data/mllib/sample_libsvm_
data.txt /user/hduser/sample_libsvm_data.txt

$ spark-shell

import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.
BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

val svmData = MLUtils.loadLibSVMFile(sc,"sample_libsvm_
data.txt")

//记录数
svmData.count
//二分为训练集和测试集
val trainingAndTest = svmData.randomSplit(Array(0.5,0.5))

val trainingData = trainingAndTest(0)
val testData = trainingAndTest(1)

val model = SVMWithSGD.train(trainingData,100)

//创建一个tuple保存预测值和真实值
val predictionsAndLabels = testData.map( r => (model.
predict(r.features),r.label))

predictionsAndLabels.filter(p => p._1 != p._2).count


使用决策树分类

决策树是最直观的机器学习算法,有很多有用的特点:

易于理解和解释

可以处理类别变量和连续变岭

可以处理缺失特征

不需要特征归一化

训练数据
打球/下雨/刮风/气温
$vi tennis.csv
0.0,1.0,1.0,2.0
0.0,1.0,1.0,1.0
0.0,1.0,1.0,0.0
0.0,0.0,1.0,2.0
0.0,0.0,1.0,0.0
1.0,0.0,0.0,2.0
1.0,0.0,0.0,1.0
0.0,0.0,0.0,0.0


$ spark-shell

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.impurity.Entropy

val data = sc.textFile("tennis.csv")

val parsedData = data.map {
line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail)) }

val model = DecisionTree.train(parsedData, Classification,
Entropy, 3)
//新建一个特征
val v=Vectors.dense(0.0,1.0,0.0)

model.predict(v)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: