您的位置:首页 > 其它

SparkKMeans

2014-07-16 13:51 204 查看
1. 读取每一行,按空格转成string数组,再对string数组每一个元素转型为Double, 构成Vector[Double]

def parseVector(line: String): Vector[Double] = {
DenseVector(line.split(' ').map(_.toDouble))
}
2. 与 LocalKMeans一样,给出K个中心点,求输入点Vector最近的点

def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = {
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 0 until centers.length) {
val tempDist = squaredDistance(p, centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}

bestIndex
}


3.参数个数检查,以及根据输入参数解析数据, data 聚类所有点,K个中心点, convergeDist收敛目标
if (args.length < 3) {
System.err.println("Usage: SparkKMeans <file> <k> <convergeDist>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("SparkKMeans")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile(args(0))
val data = lines.map(parseVector _).cache()
val K = args(1).toInt
val convergeDist = args(2).toDouble
4. 从data中取样K个点作为初始中心点

val kPoints = data.takeSample(withReplacement = false, K, 42).toArray
5. 对data中每一个点向量求 ==》最近的中心点索引, 并单个计数为1

val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
6. 根据中心点分别进行处理计算 属于该类所有点向量和,以及属于该类的总个数

val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
7.计算新的k个中心点 向量和/总个数

val newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap()
8. 计算新的中心点与上次中心点的距离(是否收敛依据)

tempDist = 0.0
for (i <- 0 until K) {
tempDist += squaredDistance(kPoints(i), newPoints(i))
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: