您的位置:首页 > 其它

Spark MLlib(一)K-Means

2016-08-27 14:01 162 查看
在此之前请先了解一下算法的原理:

http://www.jianshu.com/p/fc91fed8c77b

package com.qh

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

/**
* Created by hadoop on 8/25/16.
*
*
* 关闭spark app运行时在终端输出的大量日志信息,保留警告信息以及程序输出
* http://blog.chinaunix.net/uid-29454152-id-5645182.html */
object MLlib_K_Means {
private val path = "hdfs://master:9000/Spark/MLlib/K-Means"

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("MLlib K-Means").setMaster("spark://master:7077")
val sc = new SparkContext(conf)
/*
数据格式:
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

导入之后
scala> data.collect()
res1: Array[String] = Array(0.0 0.0 0.0, 0.1 0.1 0.1, 0.2 0.2 0.2, 9.0 9.0 9.0,
9.1 9.1 9.1, 9.2 9.2 9.2)
*/
val data = sc.textFile(path + "/spark", 1)

/*
将每个元素x根据空格拆分为数组 x.split("\\s+")
将数组转换为对应的double类型数组 .map(_.toDouble)
生成本地向量 Vectors
稠密向量 Vectors.dense
稀疏向量 Vectors.sparse
scala> parsedData.collect()
res3: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.0,0.0,0.0], [0.1,0.1,0.1],
[0.2,0.2,0.2], [9.0,9.0,9.0], [9.1,9.1,9.1], [9.2,9.2,9.2])
*/
val parsedData = data.map(x => Vectors.dense(x.split("\\s+").map(_.toDouble)))

/*
K的选择是K-means算法的关键
一种是我们知道拥有多小个类别,即多少个K.
一种是我们不知道或者不清楚K为多少,就需要通过其他方法来计算
MeansModel.computeCost通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果
一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好
但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使computeCost结果值最小的那个K
k=1 : cost = 364.62
k=2 : cost = 0.11999999999994547
k=3 : cost = 0.07500000000004324
k=4 : cost = 0.05999999999994543
k=5 : cost = 0.015000000000043201
k=6 : cost = 0.0
例如本例中很明显k=2,我们不能一味地选择cost小的那个K.
因为这显然这不是一个具有实际意义的聚类结果
*/
val k = Array(1, 2, 3, 4, 5, 6)
k.foreach(x => {
val model = KMeans.train(parsedData, x, 20)
val cost = model.computeCost(parsedData)
println("k=" + x + " : cost = " + cost)
})

/*
将数据集聚类,2个类,20次迭代,进行模型训练形成数据模型
首先KMeans.train 方法对数据集进行聚类训练,返回KMeansModel类实例
随后使用KMeansModel.predict方法对新的数据点进行所属聚类的预测

train方法有很多重构,在这选择最全的一种
data:一个Vector类型的RDD数据集
k:期望的聚类的个数
maxIterations:最大迭代次数
runs:表示算法被运行的次数.K-means算法不保证能返回全局最优的聚类结果,
所以在目标数据集上多次跑K-means算法,有助于返回最佳聚类结果.
initializationMode:表示初始聚类中心点的选择方式, 目前支持随机选择或者 K-means||方式。默认是 K-means||。
seed:集群初始化时的随机种子,默认是生成基于系统时间的种子
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, initializationMode: String,
seed: Long): KMeansModel

方式二:
val model = new KMeans()
.setK(2)
.setMaxIterations(20)
.run(parsedData)
*/
val model = KMeans.train(parsedData, 2, 20)
/*
model.clusterCenters  聚类的中心
scala> model.clusterCenters.foreach(x => println("  " + x.toString))
[9.1,9.1,9.1]
[0.1,0.1,0.1]
*/
println("Cluster centers:")
model.clusterCenters.foreach(x => println(x.toString))

/*
使用模型测试单点数据
model.predict(data)
KMeansModel.predict方法接受不同的参数,可以是向量,或者 RDD,返回是入参所属的聚类的索引号
根据模型model返回data的聚类结果标签
scala> model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble)))
res1: Int = 0
scala> model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble)))
res2: Int = 0
scala> model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble)))
res3: Int = 1
*/
println("[0.2 0.2 0.2]:" +
model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
println("[0.25 0.25 0.25]:" +
model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
println("[8 8 8]:" +
model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))

/*
返回聚类结果标签
scala> result1.collect()
res7: Array[Int] = Array(0, 0, 0, 1, 1, 1)
*/
val result1 = model.predict(parsedData)
result1.saveAsTextFile(path + "/result1")

/*
返回数据集和结果
scala> result2.collect()
res1: Array[String] = Array([0.0,0.0,0.0] 0, [0.1,0.1,0.1] 0, [0.2,0.2,0.2] 0, [9.0,9.0,9.0] 1,
[9.1,9.1,9.1] 1, [9.2,9.2,9.2] 1)
*/
val result2 = parsedData.map(x => x + " " + model.predict(x))
result2.saveAsTextFile(path + "/result2")
sc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark MLlib K-Means