spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)
2017-08-14 11:32
561 查看
相关文章
spark mllib源码分析之逻辑回归弹性网络ElasticNet(一)
spark源码分析之L-BFGS
spark mllib源码分析之OWLQN
spark中的online均值/方差统计
spark源码分析之二分类逻辑回归evaluation
spark正则化
setMaxIter,最大迭代次数,训练的截止条件,默认100次
setFamily,binomial(二分类)/multinomial(多分类)/auto,默认为auto。设为auto时,会根据schema或者样本中实际的class情况设置是二分类还是多分类,最好明确设置
setElasticNetParam,弹性参数,用于调节L1和L2之间的比例,两种正则化比例加起来是1,详见后面正则化的设置,默认为0,只使用L2正则化,设置为1就是只用L1正则化
setRegParam,正则化系数,默认为0,不使用正则化
setTol,训练的截止条件,两次迭代之间的改善小于tol训练将截止
setFitIntercept,是否拟合截距,默认为true
setStandardizatio
4000
n,是否使用归一化,这里归一化只针对各维特征的方差进行
setThresholds/setThreshold,设置多分类/二分类的判决阈值,多分类是一个数组,二分类是double值
setAggregationDepth,设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认为2
set*Col,因为训练时使用的样本先被搞成了DataFrame结构,这些是设置列名,方便训练时选取label,weight,feature等列,spark实现了默认libsvm格式的读取,如果需要,可以自己读取样本文件,转成DataFrame格式,并设置这些列名
是否多分类,根据family参数确定。如果是二分类,要求numClasses为1或2,返回false;如果是多分类,则返回true;如果是默认的auto,则根据numClasses是否大于2得到
系数矩阵包含的系数集个数,我们之前的文章说过,多分类的系数矩阵在实际模型训练时(breeze库)是将多个weight向量是拼在一起的,相当于[[w11,w12,...],[w21,w22,...],...],因此个数与class数相同,这里的系数矩阵是使用矩阵存储的,这个值作为矩阵的行;而二分类因为只有一个weight向量,因此为1
阈值,因为这里兼容了二分类与多分类,thresholds实际是个Array,length应该等于class的个数;如果是二分类,阈值就只有一个,设置在threshold,Double类型
label是否唯一的判断
histogram是Array,里面放着样本中各label的数量,也就是说样本里只有一种label。
这种情况返回的系数矩阵为全0的SparseMatrix,对于截距,如果是多分类,返回稀疏向量,向量长度为numClasses,只有index为label的位置有值Double.PositiveInfinity;如果是二分类,返回dense vector,值为Double.PositiveInfinity
此种情况下,算法可能不会收敛,给出了警告信息,但是会继续尝试优化。
各维特征的方差为0,但是均值不为0,说明各维特征值各自相同且非0,这种情况从警告信息来看,spark最终会给出全为0的系数矩阵,但是会尝试优化。
对应损失函数
使用LBFGS模型优化
使用OWLQN模型优化
如果初始模型有效,则用初始化模型中的系数初始化
P(1)=eb1/Z...P(K)=ebK/ZwhereZ=∑k=1Kebk
但是label的真正分布很难知道,一般用label在样本中的分布近似,即
ebk=countk∗eλbk=log(countk)+λ
我们让λ=0,并使其均值为0(均值中心化)
bk=log(countk)b′k=bk−E(bk)
对应到spark的实现
P(0)=1(1+eb)P(1)=eb1+eb
同理
b=logP(1)P(0)=logcount1count0
源码实现
如果系数矩阵不是多分类,则根据系数非0的情况决定是否使用稀疏矩阵SparseMatrix
多分类时,对截距进行中心化
lossstd+L1+L2
并且在2.6.2节我们看到,返回的系数是统一进行了归一化,注释中也有解释,系数进行归一化,在预测的时候就不需要对特征进行归一化了,整个训练过程可以看做是对系数进行归一化,因此如果我们设置不进行归一化,loss和梯度计算部分仍旧归一化,则可以
lossstd+L1std+L2std2
所有项都除以std,对整体的目标函数是没有影响的,因此可以看到源码中L1正则化和L2正则化在设置不归一化的时候对方差进行了归一化,因为L2是平方项,因此分母也是std的平方。
在实际的业务场景中,其实很多都是非数值型特征,而且数值型的往往也会离散化,全部变成0/1的特征,这种情况使用归一化其实是没什么意义的,spark这里强制使用归一化我感觉欠妥了。
spark mllib源码分析之逻辑回归弹性网络ElasticNet(一)
spark源码分析之L-BFGS
spark mllib源码分析之OWLQN
spark中的online均值/方差统计
spark源码分析之二分类逻辑回归evaluation
spark正则化
2. 训练
2.1. 训练参数设置
设置用于控制训练的参数setMaxIter,最大迭代次数,训练的截止条件,默认100次
setFamily,binomial(二分类)/multinomial(多分类)/auto,默认为auto。设为auto时,会根据schema或者样本中实际的class情况设置是二分类还是多分类,最好明确设置
setElasticNetParam,弹性参数,用于调节L1和L2之间的比例,两种正则化比例加起来是1,详见后面正则化的设置,默认为0,只使用L2正则化,设置为1就是只用L1正则化
setRegParam,正则化系数,默认为0,不使用正则化
setTol,训练的截止条件,两次迭代之间的改善小于tol训练将截止
setFitIntercept,是否拟合截距,默认为true
setStandardizatio
4000
n,是否使用归一化,这里归一化只针对各维特征的方差进行
setThresholds/setThreshold,设置多分类/二分类的判决阈值,多分类是一个数组,二分类是double值
setAggregationDepth,设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认为2
set*Col,因为训练时使用的样本先被搞成了DataFrame结构,这些是设置列名,方便训练时选取label,weight,feature等列,spark实现了默认libsvm格式的读取,如果需要,可以自己读取样本文件,转成DataFrame格式,并设置这些列名
2.2.数据准备
2.2.1. 样本处理
将封装成DataFrame的输入数据再转成简单结构的instance,包括label,weight,特征,默认每个样本的weight为1val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) }
2.2.2. 统计
统计样本每个特征的方差,均值,label的分布情况,用到了MultivariateOnlineSummarizer和MultiClassSummarizer,前面有介绍val (summarizer, labelSummarizer) = { val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)) val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer), c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) => (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( new MultivariateOnlineSummarizer, new MultiClassSummarizer )(seqOp, combOp, $(aggregationDepth)) } //各维特征的weightSum val histogram = labelSummarizer.histogram //label非法,主要是label非整数和小于0的情况 val numInvalid = labelSummarizer.countInvalid val numFeatures = summarizer.mean.size //如果有截距,相当于增加一维值全为1的特征 val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures
2.2.3. 其他参数推断
class数,如果在schema中指定,就要求其大于等于统计得到的class数,否则取统计得到的class数val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { case Some(n: Int) => require(n >= histogram.length, s"Specified number of classes $n was " + s"less than the number of unique labels ${histogram.length}.") n //最好是labelSummarizer.numClasses case None => histogram.length }
是否多分类,根据family参数确定。如果是二分类,要求numClasses为1或2,返回false;如果是多分类,则返回true;如果是默认的auto,则根据numClasses是否大于2得到
val isMultinomial = $(family) match { case "binomial" => require(numClasses == 1 || numClasses == 2, s"Binomial family only supports 1 or 2 " + s"outcome classes but found $numClasses.") false case "multinomial" => true case "auto" => numClasses > 2 case other => throw new IllegalArgumentException(s"Unsupported family: $other") }
系数矩阵包含的系数集个数,我们之前的文章说过,多分类的系数矩阵在实际模型训练时(breeze库)是将多个weight向量是拼在一起的,相当于[[w11,w12,...],[w21,w22,...],...],因此个数与class数相同,这里的系数矩阵是使用矩阵存储的,这个值作为矩阵的行;而二分类因为只有一个weight向量,因此为1
val numCoefficientSets = if (isMultinomial) numClasses else 1
阈值,因为这里兼容了二分类与多分类,thresholds实际是个Array,length应该等于class的个数;如果是二分类,阈值就只有一个,设置在threshold,Double类型
if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + ".train() called with non-matching numClasses and thresholds.length." + s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") }
2.3. 确定待训练模型
根据二/多分类,是否拟合截距,L1/L2等确定训练使用的优化方法,损失函数2.3.1. 拟合截距 && label唯一
判断条件为$(fitIntercept) && isConstantLabel
label是否唯一的判断
val isConstantLabel = histogram.count(_ != 0.0) == 1
histogram是Array,里面放着样本中各label的数量,也就是说样本里只有一种label。
这种情况返回的系数矩阵为全0的SparseMatrix,对于截距,如果是多分类,返回稀疏向量,向量长度为numClasses,只有index为label的位置有值Double.PositiveInfinity;如果是二分类,返回dense vector,值为Double.PositiveInfinity
2.3.2. 不拟合截距 && label唯一
判断条件为!$(fitIntercept) && isConstantLabel
此种情况下,算法可能不会收敛,给出了警告信息,但是会继续尝试优化。
2.3.3. 不拟合截距 && 各维特征的特征值相同却非0
判断条件!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }
各维特征的方差为0,但是均值不为0,说明各维特征值各自相同且非0,这种情况从警告信息来看,spark最终会给出全为0的系数矩阵,但是会尝试优化。
2.3.4. 只用L2或者不使用正则化
判断条件19e60 [code]$(elasticNetParam) == 0.0 || $(regParam) == 0.0
对应损失函数
//实际的L1正则化系数 val regParamL1 = $(elasticNetParam) * $(regParam) //实际的L2正则化系数 val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) //损失函数,参见前文 val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial, $(aggregationDepth))
使用LBFGS模型优化
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) }
2.3.5. 只使用L1 或者 L1和L2同时用
L2与损失函数类似2.3.4节,L1正则化def regParamL1Fun = (index: Int) => { // 正则化与截距无关,系数矩阵是按列展开的,每一列是一维特征,因此所有的截距在展开数组的最后 val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets if (isIntercept) { 0.0 } else { if (standardizationParam) { regParamL1 } else { //因为优化时统一使用标准化,这里如果设置不使用,需要先进行反过程已使得目标函数一致 val featureIndex = index / numCoefficientSets if (featuresStd(featureIndex) != 0.0) { regParamL1 / featuresStd(featureIndex) } else { 0.0 } } } }
使用OWLQN模型优化
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
2.4. 初始化weight矩阵
先将稀疏矩阵初始化成全0的矩阵,行是numCoefficientSets(numClasses),列为numFeaturesPlusIntercept(包括截距),然后根据是否有初始模型,二/多分类对weight矩阵进行初始化。2.4.1. 有初始模型
这种情况是存在着一个初始的模型,里面存放着已经训练的系数矩阵,先验证模型的有效性val initialModelIsValid = optInitialModel match { case Some(_initialModel) => val providedCoefs = _initialModel.coefficientMatrix //系数矩阵的行(numClasses)是否相同 val modelIsValid = (providedCoefs.numRows == numCoefficientSets) && //稀疏矩阵的列(特征数)是否相同 (providedCoefs.numCols == numFeatures) && //截距数是否正确(等于numClasses) (_initialModel.interceptVector.size == numCoefficientSets) && //是否拟合截距 (_initialModel.getFitIntercept == $(fitIntercept)) if (!modelIsValid) { logWarning(s"Initial coefficients will be ignored! Its dimensions " + s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " + s"expected size ($numCoefficientSets, $numFeatures)") } modelIsValid case None => false }
如果初始模型有效,则用初始化模型中的系数初始化
val providedCoef = optInitialModel.get.coefficientMatrix providedCoef.foreachActive { (classIndex, featureIndex, value) => // We need to scale the coefficients since they will be trained in the scaled space //更新weight矩阵,行号classIndex,列号featureIndex,第三个参数是值 initialCoefWithInterceptMatrix.update(classIndex, featureIndex, value * featuresStd(featureIndex)) } //更新截距 if ($(fitIntercept)) { optInitialModel.get.interceptVector.foreachActive { (classIndex, value) => initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value) } }
2.4.2. 没有初始模型 && 拟合截距 && 多分类
对于多分类逻辑回归,如果系数矩阵初始化为0,则把截距初始化与label同分布,能够更快的收敛P(1)=eb1/Z...P(K)=ebK/ZwhereZ=∑k=1Kebk
但是label的真正分布很难知道,一般用label在样本中的分布近似,即
ebk=countk∗eλbk=log(countk)+λ
我们让λ=0,并使其均值为0(均值中心化)
bk=log(countk)b′k=bk−E(bk)
对应到spark的实现
//histogram里面放的是weightSum val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing val rawMean = rawIntercepts.sum / rawIntercepts.length rawIntercepts.indices.foreach { i => initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) }
2.4.3. 没有初始模型 && 拟合截距&&二分类
类似于多分类,截距与label同分布可加快收敛P(0)=1(1+eb)P(1)=eb1+eb
同理
b=logP(1)P(0)=logcount1count0
源码实现
initialCoefWithInterceptMatrix.update(0, numFeatures, math.log(histogram(1) / histogram(0)))
2.5. 训练
优化求解是通过上面确定的optimizer,主要的训练过程都隐藏在iterations背后,无论是L-BFGS还是OWLQN都需要实现这个方法,返回所有轮的状态放在stats里面val states = optimizer.iterations(new CachedDiffFunction(costFun), //矩阵按列展开为数组,对照1.4.2.3节的第一种存储格式 new BDV[Double](initialCoefWithInterceptMatrix.toArray))
2.6. 获取训练结果
从训练结果里取回系数矩阵和截距2.6.1. 初始化系数和截距
//训练结果以数组的形式返回 val allCoefficients = state.x.toArray.clone() //数组转成矩阵,numClasses*numberFeature val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, allCoefficients) val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures, new Array[Double](numCoefficientSets * numFeatures), isTransposed = true) //需要拟合截距或者二分类的情况,每类的截距都是实际有值的,用dense vector val interceptVec = if ($(fitIntercept) || !isMultinomial) { Vectors.zeros(numCoefficientSets) } else { //sparse vector Vectors.sparse(numCoefficientSets, Seq()) }
2.6.2. 用训练结果赋值
从原始结果中分别提取系数与截距allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => //allCoefMatrix的列numFeaturesPlusIntercept,如果拟合了截距,每行的最后一列就是截距 val isIntercept = $(fitIntercept) && (featureIndex == numFeatures) if (!isIntercept && featuresStd(featureIndex) != 0.0) { //我们在分析L-BFGS源码的文章中提过,因为训练时特征是对标准差归一化的,为了在使用时直接使用特征值,这里相当于对返回的系数标准化 denseCoefficientMatrix.update(classIndex, featureIndex, value / featuresStd(featureIndex)) } if (isIntercept) interceptVec.toArray(classIndex) = value }
2.6.3. 对待返回的系数进行调整
我们前面提到过,如果无pivot class多分类不使用正则化其系数矩阵不是唯一的,因此这里对系数矩阵进行0均值中心化(mean centered coefficients),使其可复现(reproducibility,我的理解是重复训练时结果一样)//按行向量化 val denseValues = denseCoefficientMatrix.values //均值 val coefficientMean = denseValues.sum / denseValues.length denseCoefficientMatrix.update(_ - coefficientMean)
如果系数矩阵不是多分类,则根据系数非0的情况决定是否使用稀疏矩阵SparseMatrix
val compressedCoefficientMatrix = if (isMultinomial) { denseCoefficientMatrix } else { //矩阵向量化后进行压缩 val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed compressedVector match { case dv: DenseVector => //系数稠密,不需要压缩 denseCoefficientMatrix case sv: SparseVector => //0系数较多,返回稀疏矩阵 new SparseMatrix(1, numFeatures, Array(0, sv.indices.length), sv.indices, sv.values, isTransposed = true) } }
多分类时,对截距进行中心化
val compressedCoefficientMatrix = if (isMultinomial) { denseCoefficientMatrix } else { if ($(fitIntercept) && isMultinomial) { val interceptArray = interceptVec.toArray val interceptMean = interceptArray.sum / interceptArray.length (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean } }
2.7. 构造逻辑回归模型
用训练得到的稀疏矩阵和截距向量,class数等构造逻辑回归模型val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,numClasses, isMultinomial)) //如果是二分类,加入模型的评估指标,见BinaryLogisticRegressionSummary val m = if (!isMultinomial) { val (summaryModel, probabilityColName) = model.findSummaryModelAndProbabilityCol() val logRegSummary = new BinaryLogisticRegressionTrainingSummary( //加计算每个样本的margin(rawPredictionCol),预测值(probabilityCol,margin带入sigmoid函数),预测label(predictionCol,与threshold比较) summaryModel.transform(dataset), probabilityColName, $(labelCol), $(featuresCol), objectiveHistory) model.setSummary(Some(logRegSummary)) } else { model }
2.8. 归一化
从上面的介绍中我们可以看到,在计算loss和梯度时,spark没有判断是否使用归一化(standardization),直接进行了归一化,也就是说如果我们设置了归一化,其损失函数lossstd+L1+L2
并且在2.6.2节我们看到,返回的系数是统一进行了归一化,注释中也有解释,系数进行归一化,在预测的时候就不需要对特征进行归一化了,整个训练过程可以看做是对系数进行归一化,因此如果我们设置不进行归一化,loss和梯度计算部分仍旧归一化,则可以
lossstd+L1std+L2std2
所有项都除以std,对整体的目标函数是没有影响的,因此可以看到源码中L1正则化和L2正则化在设置不归一化的时候对方差进行了归一化,因为L2是平方项,因此分母也是std的平方。
在实际的业务场景中,其实很多都是非数值型特征,而且数值型的往往也会离散化,全部变成0/1的特征,这种情况使用归一化其实是没什么意义的,spark这里强制使用归一化我感觉欠妥了。
3. 结语
本文分析了spark ElasticNet的实现,其中用到的一些技巧等。其计算过程中使用归一化及要求label必须连续且为整数等其实限制了其使用场景,希望后续能够优化这块,多考虑一些实际的业务场景。相关文章推荐
- spark mllib源码分析之逻辑回归弹性网络ElasticNet
- spark mllib源码分析之逻辑回归弹性网络ElasticNet(一)
- spark mllib源码分析之二分类逻辑回归evaluation
- 弹性网络( Elastic Net) 多任务 Lasso回归 MultiTaskLasso
- 中兴ElasticNet 弹性网络解决方案
- Redis源码分析(二十一)--- anet网络通信的封装
- 【机器学习经典算法源码分析系列】-- 逻辑回归
- 高性能网络I/O框架-netmap源码分析(1) http://blog.chinaunix.net/uid-23629988-id-3594118.html
- 4.弹性网络( Elastic Net)
- 高性能网络I/O框架-netmap源码分析(2) http://blog.chinaunix.net/uid-23629988-id-3608622.html
- <转>Spark Mllib逻辑回归算法分析
- 高性能网络I/O框架-netmap源码分析(3) http://blog.chinaunix.net/uid-23629988-id-3614187.html
- contiki 源码分析之网络层(三)(core / net)
- 高性能网络I/O框架-netmap源码分析(4) http://blog.chinaunix.net/uid-23629988-id-3642305.html
- 高性能网络I/O框架-netmap源码分析(5) http://blog.chinaunix.net/uid-23629988-id-3693204.html
- 机器学习与算法(11)--弹性网络(Elastic Net)
- 高性能网络I/O框架-netmap源码分析(6) http://blog.chinaunix.net/uid-23629988-id-3803045.html
- 机器学习算法与Python实践(9) - 弹性网络(Elastic Net)
- Spark MLlib之线性回归源码分析
- <转>Spark Mllib逻辑回归算法分析