您的位置:首页 > 理论基础 > 计算机网络

spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)

2017-08-14 11:32 561 查看
相关文章

spark mllib源码分析之逻辑回归弹性网络ElasticNet(一)

spark源码分析之L-BFGS

spark mllib源码分析之OWLQN

spark中的online均值/方差统计

spark源码分析之二分类逻辑回归evaluation

spark正则化

2. 训练

2.1. 训练参数设置

设置用于控制训练的参数

setMaxIter,最大迭代次数,训练的截止条件,默认100次

setFamily,binomial(二分类)/multinomial(多分类)/auto,默认为auto。设为auto时,会根据schema或者样本中实际的class情况设置是二分类还是多分类,最好明确设置

setElasticNetParam,弹性参数,用于调节L1和L2之间的比例,两种正则化比例加起来是1,详见后面正则化的设置,默认为0,只使用L2正则化,设置为1就是只用L1正则化

setRegParam,正则化系数,默认为0,不使用正则化

setTol,训练的截止条件,两次迭代之间的改善小于tol训练将截止

setFitIntercept,是否拟合截距,默认为true

setStandardizatio
4000
n,是否使用归一化,这里归一化只针对各维特征的方差进行

setThresholds/setThreshold,设置多分类/二分类的判决阈值,多分类是一个数组,二分类是double值

setAggregationDepth,设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认为2

set*Col,因为训练时使用的样本先被搞成了DataFrame结构,这些是设置列名,方便训练时选取label,weight,feature等列,spark实现了默认libsvm格式的读取,如果需要,可以自己读取样本文件,转成DataFrame格式,并设置这些列名

2.2.数据准备

2.2.1. 样本处理

将封装成DataFrame的输入数据再转成简单结构的instance,包括label,weight,特征,默认每个样本的weight为1

val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] =
dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}


2.2.2. 统计

统计样本每个特征的方差,均值,label的分布情况,用到了MultivariateOnlineSummarizer和MultiClassSummarizer,前面有介绍

val (summarizer, labelSummarizer) = {
val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))

val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
(c1._1.merge(c2._1), c1._2.merge(c2._2))

instances.treeAggregate(
new MultivariateOnlineSummarizer, new MultiClassSummarizer
)(seqOp, combOp, $(aggregationDepth))
}
//各维特征的weightSum
val histogram = labelSummarizer.histogram
//label非法,主要是label非整数和小于0的情况
val numInvalid = labelSummarizer.countInvalid
val numFeatures = summarizer.mean.size
//如果有截距,相当于增加一维值全为1的特征
val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures


2.2.3. 其他参数推断

class数,如果在schema中指定,就要求其大于等于统计得到的class数,否则取统计得到的class数

val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
case Some(n: Int) =>
require(n >= histogram.length, s"Specified number of classes $n was " +
s"less than the number of unique labels ${histogram.length}.")
n
//最好是labelSummarizer.numClasses
case None => histogram.length
}


是否多分类,根据family参数确定。如果是二分类,要求numClasses为1或2,返回false;如果是多分类,则返回true;如果是默认的auto,则根据numClasses是否大于2得到

val isMultinomial = $(family) match {
case "binomial" =>
require(numClasses == 1 || numClasses == 2, s"Binomial family only supports 1 or 2 " +
s"outcome classes but found $numClasses.")
false
case "multinomial" => true
case "auto" => numClasses > 2
case other => throw new IllegalArgumentException(s"Unsupported family: $other")
}


系数矩阵包含的系数集个数,我们之前的文章说过,多分类的系数矩阵在实际模型训练时(breeze库)是将多个weight向量是拼在一起的,相当于[[w11,w12,...],[w21,w22,...],...],因此个数与class数相同,这里的系数矩阵是使用矩阵存储的,这个值作为矩阵的行;而二分类因为只有一个weight向量,因此为1

val numCoefficientSets = if (isMultinomial) numClasses else 1


阈值,因为这里兼容了二分类与多分类,thresholds实际是个Array,length应该等于class的个数;如果是二分类,阈值就只有一个,设置在threshold,Double类型

if (isDefined(thresholds)) {
require($(thresholds).length == numClasses, this.getClass.getSimpleName +
".train() called with non-matching numClasses and thresholds.length." +
s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
}


2.3. 确定待训练模型

根据二/多分类,是否拟合截距,L1/L2等确定训练使用的优化方法,损失函数

2.3.1. 拟合截距 && label唯一

判断条件为

$(fitIntercept) && isConstantLabel


label是否唯一的判断

val isConstantLabel = histogram.count(_ != 0.0) == 1


histogram是Array,里面放着样本中各label的数量,也就是说样本里只有一种label。

这种情况返回的系数矩阵为全0的SparseMatrix,对于截距,如果是多分类,返回稀疏向量,向量长度为numClasses,只有index为label的位置有值Double.PositiveInfinity;如果是二分类,返回dense vector,值为Double.PositiveInfinity

2.3.2. 不拟合截距 && label唯一

判断条件为

!$(fitIntercept) && isConstantLabel


此种情况下,算法可能不会收敛,给出了警告信息,但是会继续尝试优化。

2.3.3. 不拟合截距 && 各维特征的特征值相同却非0

判断条件

!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }


各维特征的方差为0,但是均值不为0,说明各维特征值各自相同且非0,这种情况从警告信息来看,spark最终会给出全为0的系数矩阵,但是会尝试优化。

2.3.4. 只用L2或者不使用正则化

判断条件

19e60
[code]$(elasticNetParam) == 0.0 || $(regParam) == 0.0


对应损失函数

//实际的L1正则化系数
val regParamL1 = $(elasticNetParam) * $(regParam)
//实际的L2正则化系数
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val bcFeaturesStd = instances.context.broadcast(featuresStd)
//损失函数,参见前文
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial,
$(aggregationDepth))


使用LBFGS模型优化

val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
}


2.3.5. 只使用L1 或者 L1和L2同时用

L2与损失函数类似2.3.4节,L1正则化

def regParamL1Fun = (index: Int) => {
// 正则化与截距无关,系数矩阵是按列展开的,每一列是一维特征,因此所有的截距在展开数组的最后
val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
if (isIntercept) {
0.0
} else {
if (standardizationParam) {
regParamL1
} else {
//因为优化时统一使用标准化,这里如果设置不使用,需要先进行反过程已使得目标函数一致
val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
regParamL1 / featuresStd(featureIndex)
} else {
0.0
}
}
}
}


使用OWLQN模型优化

new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))


2.4. 初始化weight矩阵

先将稀疏矩阵初始化成全0的矩阵,行是numCoefficientSets(numClasses),列为numFeaturesPlusIntercept(包括截距),然后根据是否有初始模型,二/多分类对weight矩阵进行初始化。

2.4.1. 有初始模型

这种情况是存在着一个初始的模型,里面存放着已经训练的系数矩阵,先验证模型的有效性

val initialModelIsValid = optInitialModel match {
case Some(_initialModel) =>
val providedCoefs = _initialModel.coefficientMatrix
//系数矩阵的行(numClasses)是否相同
val modelIsValid = (providedCoefs.numRows == numCoefficientSets) &&
//稀疏矩阵的列(特征数)是否相同
(providedCoefs.numCols == numFeatures) &&
//截距数是否正确(等于numClasses)
(_initialModel.interceptVector.size == numCoefficientSets) &&
//是否拟合截距
(_initialModel.getFitIntercept == $(fitIntercept))
if (!modelIsValid) {
logWarning(s"Initial coefficients will be ignored! Its dimensions " +
s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " +
s"expected size ($numCoefficientSets, $numFeatures)")
}
modelIsValid
case None => false
}


如果初始模型有效,则用初始化模型中的系数初始化

val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (classIndex, featureIndex, value) =>
// We need to scale the coefficients since they will be trained in the scaled space
//更新weight矩阵,行号classIndex,列号featureIndex,第三个参数是值
initialCoefWithInterceptMatrix.update(classIndex, featureIndex,
value * featuresStd(featureIndex))
}
//更新截距
if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (classIndex, value) =>
initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value)
}
}


2.4.2. 没有初始模型 && 拟合截距 && 多分类

对于多分类逻辑回归,如果系数矩阵初始化为0,则把截距初始化与label同分布,能够更快的收敛

P(1)=eb1/Z...P(K)=ebK/ZwhereZ=∑k=1Kebk

但是label的真正分布很难知道,一般用label在样本中的分布近似,即

ebk=countk∗eλbk=log(countk)+λ

我们让λ=0,并使其均值为0(均值中心化)

bk=log(countk)b′k=bk−E(bk)

对应到spark的实现

//histogram里面放的是weightSum
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
}


2.4.3. 没有初始模型 && 拟合截距&&二分类

类似于多分类,截距与label同分布可加快收敛

P(0)=1(1+eb)P(1)=eb1+eb

同理

b=logP(1)P(0)=logcount1count0

源码实现

initialCoefWithInterceptMatrix.update(0, numFeatures,
math.log(histogram(1) / histogram(0)))


2.5. 训练

优化求解是通过上面确定的optimizer,主要的训练过程都隐藏在iterations背后,无论是L-BFGS还是OWLQN都需要实现这个方法,返回所有轮的状态放在stats里面

val states = optimizer.iterations(new CachedDiffFunction(costFun),
//矩阵按列展开为数组,对照1.4.2.3节的第一种存储格式
new BDV[Double](initialCoefWithInterceptMatrix.toArray))


2.6. 获取训练结果

从训练结果里取回系数矩阵和截距

2.6.1. 初始化系数和截距

//训练结果以数组的形式返回
val allCoefficients = state.x.toArray.clone()
//数组转成矩阵,numClasses*numberFeature
val allCoefMatrix = new DenseMatrix(numCoefficientSets,
numFeaturesPlusIntercept, allCoefficients)
val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures,
new Array[Double](numCoefficientSets * numFeatures), isTransposed = true)
//需要拟合截距或者二分类的情况,每类的截距都是实际有值的,用dense vector
val interceptVec = if ($(fitIntercept) || !isMultinomial) {
Vectors.zeros(numCoefficientSets)
} else {
//sparse vector
Vectors.sparse(numCoefficientSets, Seq())
}


2.6.2. 用训练结果赋值

从原始结果中分别提取系数与截距

allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
//allCoefMatrix的列numFeaturesPlusIntercept,如果拟合了截距,每行的最后一列就是截距
val isIntercept = $(fitIntercept) && (featureIndex == numFeatures)
if (!isIntercept && featuresStd(featureIndex) != 0.0) {
//我们在分析L-BFGS源码的文章中提过,因为训练时特征是对标准差归一化的,为了在使用时直接使用特征值,这里相当于对返回的系数标准化
denseCoefficientMatrix.update(classIndex, featureIndex,
value / featuresStd(featureIndex))
}
if (isIntercept) interceptVec.toArray(classIndex) = value
}


2.6.3. 对待返回的系数进行调整

我们前面提到过,如果无pivot class多分类不使用正则化其系数矩阵不是唯一的,因此这里对系数矩阵进行0均值中心化(mean centered coefficients),使其可复现(reproducibility,我的理解是重复训练时结果一样)

//按行向量化
val denseValues = denseCoefficientMatrix.values
//均值
val coefficientMean = denseValues.sum / denseValues.length
denseCoefficientMatrix.update(_ - coefficientMean)


如果系数矩阵不是多分类,则根据系数非0的情况决定是否使用稀疏矩阵SparseMatrix

val compressedCoefficientMatrix = if (isMultinomial) {
denseCoefficientMatrix
} else {
//矩阵向量化后进行压缩
val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed
compressedVector match {
case dv: DenseVector =>
//系数稠密,不需要压缩
denseCoefficientMatrix
case sv: SparseVector =>
//0系数较多,返回稀疏矩阵
new SparseMatrix(1, numFeatures, Array(0, sv.indices.length), sv.indices, sv.values,
isTransposed = true)
}
}


多分类时,对截距进行中心化

val compressedCoefficientMatrix = if (isMultinomial) {
denseCoefficientMatrix
} else {
if ($(fitIntercept) && isMultinomial) {
val interceptArray = interceptVec.toArray
val interceptMean = interceptArray.sum / interceptArray.length
(0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean }
}


2.7. 构造逻辑回归模型

用训练得到的稀疏矩阵和截距向量,class数等构造逻辑回归模型

val model = copyValues(new LogisticRegressionModel(uid,
coefficientMatrix, interceptVector,numClasses, isMultinomial))
//如果是二分类,加入模型的评估指标,见BinaryLogisticRegressionSummary
val m = if (!isMultinomial) {
val (summaryModel, probabilityColName) = model.findSummaryModelAndProbabilityCol()
val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
//加计算每个样本的margin(rawPredictionCol),预测值(probabilityCol,margin带入sigmoid函数),预测label(predictionCol,与threshold比较)
summaryModel.transform(dataset),
probabilityColName,
$(labelCol),
$(featuresCol),
objectiveHistory)
model.setSummary(Some(logRegSummary))
} else {
model
}


2.8. 归一化

从上面的介绍中我们可以看到,在计算loss和梯度时,spark没有判断是否使用归一化(standardization),直接进行了归一化,也就是说如果我们设置了归一化,其损失函数

lossstd+L1+L2

并且在2.6.2节我们看到,返回的系数是统一进行了归一化,注释中也有解释,系数进行归一化,在预测的时候就不需要对特征进行归一化了,整个训练过程可以看做是对系数进行归一化,因此如果我们设置不进行归一化,loss和梯度计算部分仍旧归一化,则可以

lossstd+L1std+L2std2

所有项都除以std,对整体的目标函数是没有影响的,因此可以看到源码中L1正则化和L2正则化在设置不归一化的时候对方差进行了归一化,因为L2是平方项,因此分母也是std的平方。

在实际的业务场景中,其实很多都是非数值型特征,而且数值型的往往也会离散化,全部变成0/1的特征,这种情况使用归一化其实是没什么意义的,spark这里强制使用归一化我感觉欠妥了。

3. 结语

本文分析了spark ElasticNet的实现,其中用到的一些技巧等。其计算过程中使用归一化及要求label必须连续且为整数等其实限制了其使用场景,希望后续能够优化这块,多考虑一些实际的业务场景。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: