您的位置:首页 > 其它

Spark MLlib 1.6 -- 聚类

2016-02-26 17:35 369 查看
聚类是根据某种相似度量,将‘相似’的样本划分到同一个子类中,所以聚类是一种无监督学习。聚类常用于探索分析,或(和)看作分层监督学习管道上一个环节(在这个管道上,对每个聚类结果再深入进行分类或回归)。
Spark.mllib包支持以下模型:
·
K-means
·
Gaussian mixture

·
Power iteration clustering (PIC)

·
Latent Dirichlet allocation (LDA)

·
Bisecting k-means

·
Streaming k-means

1.1 K-means (k均值聚类)
K均值聚类是常用的聚类算法,它可以将样本点聚合到已给定的几个聚类集中。Spark.mllb实现了并行的K均值聚类算法(k-means++)
,称为kmeans||
。spark.mllib中提供以下参数:
1) k: 期望的类别数
2)maxIterations: 每次计算的最大的迭代总次数
3)initializationMode:确定k-means||算法的随机初始化或普通初始化。
4) run: k-means 算法计算的次数(因为算法不能保证得到全局最优解,固对同一个聚类数据集进行多次计算,以得出最优结果)
5)initializationSteps: K-means||
算法计算的步数。
6) epsilon: 一个阈值,用于判断k-means算法何时收敛
7) initialModel :每个初始聚类中心点的集合,如果提供此集合(非空),则算做一次计算步数。

例子:

下面的代码可以在spark-shelll中运行。
下例代码在加载数据后,使用KMeans算法对数据集聚成两类。可以修改传入算法的K值修改最终聚类的类别数。算法紧接着计算类内方差和(WSSSE)。此参数的期望的类别数有关,可以容易想到,当聚类类别数越多时(或类别聚合更细时),这个值会变小。在实践中,做多次实验后将k和WSSSE绘制曲线,最优的k值是曲线上波谷拐点。
KMeans Scaladocs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.KMeans
KMeansModelScala docs API :
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel
importorg.apache.spark.mllib.clustering.{KMeans,KMeansModel}
importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data
val data
= sc.textFile("data/mllib/kmeans_data.txt")
valparsedData
= data.map(s
=>Vectors.dense(s.split('
').map(_.toDouble))).cache()

// Clusterthe data into two classes using KMeans
valnumClusters
=2
valnumIterations
=20
val clusters
=KMeans.train(parsedData,
numClusters,numIterations)

//Evaluate clustering by computing Within Set Sum of Squared Errors
valWSSSE= clusters.computeCost(parsedData)
println("WithinSet Sum of Squared Errors = "+WSSSE)

// Saveand load model
clusters.save(sc,"myModelPath")
val sameModel=KMeansModel.load(sc,"myModelPath")

5.2 高斯混合聚类
高斯混合模型是一种概率分布的组合模型,每个点按一定的先检概率从k个子类中抽取,每个子类又服从高斯分布。Spark.mllib使用期望最大化算法,对给定样本集中点推断最大似然函数值,最终决定次样本聚合到哪一个子类中。该算法有以下参数:
1) k 期望聚类数
2)convergenceTol : 算法收敛的判定条件,log似然最大??(the maximum change in log-likehood)
3) maxIterations:最大的迭代次数
4) initialModel:期望最大算法的可选起始点,如果此参数省略,算法会构造一个随机的起始点。
例子
下例加载数据后,使用高斯混合聚类算法将数据聚成两类,可以修改k来修改期望聚合的类数。然后输出混合模型的参数。
GuassianMixtureScala Docs API :
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture
GaussianMixtureModelScala Docs API :

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixtureModel

importorg.apache.spark.mllib.clustering.GaussianMixture
importorg.apache.spark.mllib.clustering.GaussianMixtureModel
importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data
val data
= sc.textFile("data/mllib/gmm_data.txt")
valparsedData
= data.map(s
=>Vectors.dense(s.trim.split('
').map(_.toDouble))).cache()

// Clusterthe data into two classes using GaussianMixture
val gmm
=newGaussianMixture().setK(2).run(parsedData)

// Saveand load model
gmm.save(sc,"myGMMModel")
val sameModel=GaussianMixtureModel.load(sc,"myGMMModel")

// outputparameters of max-likelihood model
for(i
<-0 until gmm.k){
println("weight=%f\nmu=%s\nsigma=\n%s\n" format

(gmm.weights(i),
gmm.gaussians(i).mu,
gmm.gaussians(i).sigma))
}

5.3 幂迭代聚类(power iteration clustering PIC)
在算法中,对聚类顶点使用两两相似度作为边界点划分属性,可以提高处理聚类问题的效率(详细见Lin and Cohen, Power Iteration Clusteringhttp://www.icml2010.org/papers/387.pdf)。使用幂迭代算法对正交吸收矩阵计算伪特征矢量量,将这组特征矢量作为聚类的边界点。Spark.mllib
中PIC
算法使用GraphX
作为计算的基础,它的输入是(srcId, dstId,similarity)
三元组的RDD,输出聚类模型。算法中选取的相似度函数是非负的,PIC
算法假设相似函数还满足对称性。输入数据中(srcId,dstId0
对不考虑顺序下只能出现至多一次,如果输入数据中不含这个数据对(不考虑顺序),则这两个数据对的相似度为0. Spark-mllib在PIC算法需要设置以下假设参数
1) k : 期望聚类数
2) maxIterations: 幂迭代最大次数
3) initializationMode: 模型初始化,默认使用”random”
,即使用随机向量作为初始聚类的边界点,可以设置”degree”
使用正交相似度和。(?)

例子:
我们给出使用spark.mllibPIC算法的例子
PowerIterationClustering
实现PIC算法,输入参数(srcId:
Long , dstId : Long , similarity : Double ) 三元组的RDD 来表示吸收矩阵. 调用PowerIterationClustering.run
返回PowerIterationClusteringModel模型实例。
PowerIterationClusteringScala Docs API :
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClustering
PowerIterationClusteringModelScala Docs API :
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClusteringModel

importorg.apache.spark.mllib.clustering.{PowerIterationClustering,PowerIterationClusteringModel}
importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data
val data
= sc.textFile("data/mllib/pic_data.txt")
valsimilarities
= data.map
{ line
=>

val parts
= line.split(' ')

(parts(0).toLong,
parts(1).toLong, parts(2).toDouble)
}

// Clusterthe data into two classes using PowerIterationClustering
val pic
=newPowerIterationClustering()

.setK(2)

.setMaxIterations(10)
val model
= pic.run(similarities)

model.assignments.foreach
{ a
=>
println(s"${a.id}-> ${a.cluster}")
}

// Saveand load model
model.save(sc,"myModelPath")
val sameModel=PowerIterationClusteringModel.load(sc,"myModelPath")
完整例了见:examples下

5.4 隐性狄利克雷划分(LDA)
Latent Dirichletallocation (LDA)
是一种主题模型,即从文本文档集中推断文章的主题。同样,LDA也可以看作是一种聚类问题。
1) 主题相应于聚类的中心,每篇文章相应于样本数据集中一条记录
2) 主题和文档都存在于特征空间,特征向量的每个元素是在某篇文章中,每个词出现的次数(单词袋)
3) LDA使用基于统计模型的函数来计算什么样本点应该聚成一类,这个函数会根据每个文章选取的不同而使用不同的函数形式,它不使用传统的距离度量来计算是否归入同一类。

通过设置setOptimizer ,LDA
支持选取不同的推断算法。可选值为:
1 EMLDAOptimizer使用似然函数期望最大的原则来优化聚类模型,这样训练出来的结果易于解释.
2 OnlineLDAOptimizer对在线变分推断使用迭代最小批采样进行优化,这样训练的过程内存消耗稳定。
LDA 将每个文档作为一个向量,每个向量的元素是每个单词在文档中出现的次数,需要设置以下参数:
1) k: 主题数(或聚类中心数)
2) optimizer:LDA训练中使用的优化器,
可选EMLDAOptimizer / OnlineLDAOptimizer

3) docConcentration:狄利克雷参数,每篇文档的主题先验概率分布.
此值越大,则推断的分布越光滑。
4)topicConcentration: 狄利克雷参数,每个主题的词的先验概率分布,此值越大,则推断的公布越光滑
5) maxIterations: 迭代次数的上限值
6)checkpointInterval: 如果使用周期检查点(sparkconfiguration配置),这个参数确定创建检查点频次。如果
maxIterations越大,使用检查点可以降低在磁盘上对文件大小进行排序的次数,同时可以在任务失败时快速恢复。
Spark.mllib’sLDA 模型支持:
1) describeTopics:返回文档中最重要的关键词序列,以及每个关键词的权重
2) topicsMatrix:返回 vocabSize X k
的矩阵,矩阵每列是一个主题
注意: LDA
算法特征还处于实验开发阶段,这样,某种特征只能用优化器中一种处理,模型的训练完全依赖于优化器。现阶段,分布式的模型可以转化为本地运行模式,但是反之不成立。
下面分别讲解优化器和模型。

5.4.1 期望最大化
EMLDAOptimizer (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.EMLDAOptimizer)和 DistributedLDAModel
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel)

LDA 中参数。
1) docConcentration:只支持对称先验(?),所以k-维向量所有值是一致的。并且所有值要大于1.0(
> 1.0)。 vector(-1)
会使用默认值(如k
维向量所有元素值都为 (50 / k) + 1

2) topicConcentration:只支持对称先验(?) ,
所以值要大于1.0
,如果是-1
,则使用默认值,如0.1 + 1

3)maxIterations: 最大EM迭代次数.

注意:迭代足够次数是重要的,在最初的迭代中,EM会产生很多无用的主题,但随机迭代的继续,这些无用的主题会被替换。请至少使用20次迭代,根据数据集建议使用50
~ 100 次迭代。
EMLDAOptimizer 生成 DistributedLDAModel
模型,此模型不仅保存了推断的主题集,同时保存完整训练语料,和每个文档的主题分布。 DistributedLDAModel支持:
1) topTopicsPerDocument:每个训练语料最匹配的主题及相应的权重。
2)topDocumentsPerTopic: 每个主题最匹配的文档,以及在主题内的相应权重
3) logPrior : 估计主题的log概率,以及给定假设参数docConcentration和
topicConcentration 下文档-主题二维分布。
4) logLikelihood:训练语料的log
似然,给定推断主题和文档-主题分布

5.4.2 在线变分贝叶斯(online variational bayes)
OnlineLDAOptimizer和 LocalLDAModel
LDA 提供以下参数配置:
1) docConcentration:非对称先验(如狄利克雷参数值)组成K-维向量,且每个值大于0。Vector(-1)
会使用默认值(如k-维向量每个元素是(1.0 / k )

2)topicConcentration: 只支持对称先验,且值要不小于0 (>=0)
。如果给出 -1会使用默认值,如 1.0 / k

3) maxIterations: 最大可提交的最小批数。
除此以外,OnlineLDAOptimizer
接受以下参数
1)miniBatchFraction: 每次迭代使用采样语料的比例
2)optimizeDocConcentration: 如果设置为true ,
在每次最小批迭代后对假设参数docConcentration(aka alpha)
使用最大似然估计,并在返回的LocalLDAModel
中配置此最优的docConcentration值
3) tau0 and kappa: 在学习速率衰减中需要这两个参数,即计算 ( \Tau_0 + iter )^ -K
此处iter
是当前迭代序号

OnlineLDAOptimizer生成 LocalLDAModdel ,
此模型存储推断主题, LocalLDAModel支持:
1) logLikelihood(documents): 给定推断主题,计算每个文档的log-似然函数下界
2)logPerplexity(documents): 给定推断主题,计算每个文档的log-复杂度函数上界
例子
接下来的例子,我们加载文档的语料向量,即每个向量值是文档中词出来的频次。然后使用LDA算法预测文档的三个主题,通过修改k可以配置算法期望的聚类数,然后输出每个单词的概率分布。
LDA Scala DocsAPI : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.LDA DistributedLDAModelScala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel importorg.apache.spark.mllib.clustering.{LDA,DistributedLDAModel}
importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data
val data
= sc.textFile("data/mllib/sample_lda_data.txt")
valparsedData
= data.map(s
=>Vectors.dense(s.trim.split('
').map(_.toDouble)))
// Indexdocuments with unique IDs
val corpus
=parsedData.zipWithIndex.map(_.swap).cache()

// Clusterthe documents into three topics using LDA
val ldaModel
=newLDA().setK(3).run(corpus)

// Outputtopics. Each is a distribution over words (matching word count vectors)
println("Learnedtopics (as distributions over vocab of "+
ldaModel.vocabSize
+"words):")
val topics
= ldaModel.topicsMatrix
for(topic
<-Range(0,3)){
print("Topic"+ topic
+":")

for(word
<-Range(0, ldaModel.vocabSize)){
print(""+ topics(word,
topic));}
println()
}

// Saveand load model.
ldaModel.save(sc,"myLDAModel")
val sameModel=DistributedLDAModel.load(sc,"myLDAModel")

5.5 二分 K-means
二分k-means
算法通常会比普通的K-means算法要快,但两种算法生成的聚类结果却不同。
二分k-means
是一种分层聚类,分层聚类在聚类分析中经常用于,当需要构建一个多层聚类时。分层聚类的策略一般有两种:
1) Agglomerative
自下向上聚合:算法最初从每个点独立为一个聚类开始,每次递归地将多个聚类中点合并到一个聚类。
2) Divisive: 自上而下细分:算法最初从一个聚类开始,每次递归地将一个聚类中点细分成多个聚类。

二分k-means
算法是一种分治算法,MLLib中需要配置以下参数
1) k : 叶子聚类(?)的期望类别数(默认4)。如果最终叶子聚类没有细分,实际的聚类数是小于此值的。
2) maxIterations: k-means 每次迭代细分的最大聚类类别(默认20)
3)minDivisibleClusterSize : 如果此值不小于1.0,则是每个聚类最小点数,如果此值小于1.0
,则是每个聚类最小点的占比。
4) seed: 随机种子(默认类别名字的hash值)

例子
BisectingKMeansScala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.BisectingKMeans BisectingKMeansModelScala Docs API
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.BisectingKMeansModel
importorg.apache.spark.mllib.clustering.BisectingKMeans
importorg.apache.spark.mllib.linalg.{Vector,Vectors}

// Loads andparses data
def parse(line:String):Vector=Vectors.dense(line.split("
").map(_.toDouble))
val data
= sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clusteringthe data into 6 clusters by BisectingKMeans.
val bkm
=newBisectingKMeans().setK(6)
val model
= bkm.run(data)

// Show thecompute cost and the cluster centers
println(s"ComputeCost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach
{case(center, idx)=>
println(s"Cluster Center ${idx}: ${center}")
}
详细的例子代码见:examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala

5.6 流式k-means

当数据以流式产生,我们可能需要动态估计聚类,每当有数据到达就更新模型。Spark.mllib
支持流式k-means聚类,配置控制估计的衰减值。算法使用通用最小批k-means更新聚类规则。对每批次数据,算法把数据聚类到距离最近的聚类上。重新计算新聚类的中心,再更新其它所有聚类的参数
c_{t+1} = frac { c_t n_t \Alpha + x_t m_t } { n_t \Alpha+ m_t} (1)
n_{t+1} = n_t + m_t (2)
此处 c_t
是先前的聚类中心, n_t
是此聚类的点数, x_t
是当前批次产生新的聚类中心, m_t
是当前批次中新添加到此聚类的点数。衰减因子 \Alpha
可用来降低更早批次数据的权重,当 \Alpha = 1
时,所有点会从头到尾使用;
当 \Alpha = 0
里,最新一次迭代的数据不参考。这个类似于指数权重的移动平均。
可以设置halfLife
参数控制衰减,通过此参数可以配置适当的衰减因子\Alpha ,
对时间t
时的数据,经过halfLife
时间后原始数据会清除掉一半。这里的时间单位可以是批次,或点数,相应的更新规则也需要调整。
下例给出流式数据的聚类估计
StreamingKMeansScala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.StreamingKMeans 首先需要导入类
importorg.apache.spark.mllib.linalg.Vectors
importorg.apache.spark.mllib.regression.LabeledPoint
importorg.apache.spark.mllib.clustering.StreamingKMeans
然后指定训练的输入数据流向量,同时也要指定用于测试数据集。假设程序已创建StreamingContext ssc ,
见 spark steaming programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing val trainingData= ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData
= ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
先创建一个随机聚类的模型,然后确定聚类的类别数
val numDimensions=3
val numClusters=2
val model
=newStreamingKMeans()

.setK(numClusters)

.setDecayFactor(1.0)

.setRandomCenters(numDimensions,0.0)
现在可以注册训练数据流和测试数据流,并启动job ,
当新数据点到达时,打印出预测新数据的聚类标签
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp
=>(lp.label,
lp.features))).print()

ssc.start()
ssc.awaitTermination()
随着添加新的数据文件,全部聚类中心点会更新。每个训练点的格式是[x_1, x_2, x_3]
,同样每个测试数据的格式 ( y , [x_1,x_2, x_3])
,此处y
是类别标签或类别标识.
只要有新文件到达/training/data/dir
目录下,新文件中点实时预测结果打印出来,并且模型同时会更新聚类中心。持续的新数据使用聚类中心持续变化!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: