文章标题
2017-04-28 18:25
267 查看
private case class VocabWord( var word: String, var cn: Int, var point: Array[Int], var code: Array[Int], var codeLen: Int )
private def learnVocab(words: RDD[String]): Unit = { // 统计词频, 并根据word和count, 将其构建称为VocabWord类; // 根据最小词频minCount进行筛选; // collect后根据count进行排序. vocab = words.map(w => (w, 1)) .reduceByKey(_ + _) .map(x => VocabWord( x._1, x._2, new Array[Int](MAX_CODE_LENGTH), new Array[Int](MAX_CODE_LENGTH), 0)) .filter(_.cn >= minCount) .collect() .sortWith((a, b) => a.cn > b.cn) // 将字典的大小设置为vocba的长度,并且做出字典不为空的限制 vocabSize = vocab.length require(vocabSize > 0, "The vocabulary size should be > 0. You may need to check " + "the setting of minCount, which could be large enough to remove all your words in sentences.") // 将vocab中的词与a组成键值对,加入到vocabHash中去. // trainWordsCount用来统计处理的词数. var a = 0 while (a < vocabSize) { vocabHash += vocab(a).word -> a trainWordsCount += vocab(a).cn a += 1 } logInfo("trainWordsCount = " + trainWordsCount) }
创建exp表. 没看明白干嘛的.
private def createExpTable(): Array[Float] = { val expTable = new Array[Float](EXP_TABLE_SIZE) var i = 0 while (i < EXP_TABLE_SIZE) { val tmp = math.exp((2.0 * i / EXP_TABLE_SIZE - 1.0) * MAX_EXP) expTable(i) = (tmp / (tmp + 1.0)).toFloat i += 1 } expTable }
private def createBinaryTree(): Unit = { val count = new Array[Long](vocabSize * 2 + 1) val binary = new Array[Int](vocabSize * 2 + 1) val parentNode = new Array[Int](vocabSize * 2 + 1) val code = new Array[Int](MAX_CODE_LENGTH) val point = new Array[Int](MAX_CODE_LENGTH) // 提取vocab中词频到count数组中. 仅能覆盖前vocabSize项(0 ~ vocabSize-1),最后a=vocabSize var a = 0 while (a < vocabSize) { count(a) = vocab(a).cn a += 1 } // 将count后半部分(vocabSize ~ 2*vocabSize-1)赋值1e9.toInt,也就是极大值,最后a=2*vocabSize. while (a < 2 * vocabSize) { count(a) = 1e9.toInt a += 1 } // 将pos1指向前半部分的最后一项, pos2指向后半部分的第一项. var pos1 = vocabSize - 1 var pos2 = vocabSize // 初始化min1i和min2i var min1i = 0 var min2i = 0 // 构建Huffman树 // 如果pos>=0, 则比较pos1和pos2两个位置的值, 如果pos1较小, 则将pos1赋值给min1i,并将pos1减一,也就是数组的前端推进(数组越前端的元素值越小),否则将pos2赋值给min1i,并将pos2加1. 如果pos1<0, 则直接将pos2赋值给min1i,并将pos2加1. // 对min2i做同样的处理. a = 0 while (a < vocabSize - 1) { if (pos1 >= 0) { if (count(pos1) < count(pos2)) { min1i 4000 = pos1 pos1 -= 1 } else { min1i = pos2 pos2 += 1 } } else { min1i = pos2 pos2 += 1 } if (pos1 >= 0) { if (count(pos1) < count(pos2)) { min2i = pos1 pos1 -= 1 } else { min2i = pos2 pos2 += 1 } } else { min2i = pos2 pos2 += 1 } // 将找到的两个最大节点值进行相加,并且放到索引为vocabSize+a的节点上.并且将子节点和父节点对应的关系存储进parentNode数组中. 将右节点的binary赋值为1. count(vocabSize + a) = count(min1i) + count(min2i) parentNode(min1i) = vocabSize + a parentNode(min2i) = vocabSize + a binary(min2i) = 1 a += 1 } // Now assign binary code to each vocabulary word // 遍历前vocabSize项, 对每一项, 在parentNode数组中查找其父子关系,并将编码存入code中. var i = 0 a = 0 while (a < vocabSize) { var b = a i = 0 while (b != vocabSize * 2 - 2) { code(i) = binary(b) point(i) = b i += 1 b = parentNode(b) } vocab(a).codeLen = i vocab(a).point(0) = vocabSize - 2 b = 0 while (b < i) { vocab(a).code(i - b - 1) = code(b) // 这里是因为当初是从子往父找的,所以这里需要进行反转,才符合Huffman编码. vocab(a).point(i - b) = point(b) - vocabSize b += 1 } a += 1 } }
word2Vec fit方法
def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = { // 把语料库里的文章都转换为单词 val words = dataset.flatMap(x => x) // 构造词典 learnVocab(words) // 构造Huffman树(和初始词向量?) createBinaryTree() val sc = dataset.context val expTable = sc.broadcast(createExpTable()) // vocab: Array[VocabWord] val bcVocab = sc.broadcast(vocab) // vocabHash = mutable.HashMap.empty[String, Int] val bcVocabHash = sc.broadcast(vocabHash) val sentences: RDD[Array[Int]] = words.mapPartitions { iter => new Iterator[Array[Int]] { def hasNext: Boolean = iter.hasNext // 重构了partition内的迭代器, 每次next返回长度最多为MAX_SENTENCE_LENGTH的数组 // private val MAX_SENTENCE_LENGTH = 1000 def next(): Array[Int] = { val sentence = ArrayBuilder.make[Int] var sentenceLength = 0 while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH) { val word = bcVocabHash.value.get(iter.next()) word match { case Some(w) => sentence += w sentenceLength += 1 case None => } } sentence.result() } } } // repartition并且cache,提高效率 val newSentences = sentences.repartition(numPartitions).cache() // 设置初始随机数 val initRandom = new XORShiftRandom(seed) // 防止词向量超过内存大小. if (vocabSize.toLong * vectorSize * 8 >= Int.MaxValue) { throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" + " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " + "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue/8`.") } // 初始化syn0Global和syn1Global两个数组,长度为vocabSize*vectorSize val syn0Global = Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) val syn1Global = new Array[Float](vocabSize * vectorSize) // 学习率 var alpha = learningRate // 迭代次数 for (k <- 1 to numIterations) { val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) => val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) val syn0Modify = new Array[Int](vocabSize) val syn1Modify = new Array[Int](vocabSize) val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) { case ((syn0, syn1, lastWordCount, wordCount), sentence) => var lwc = lastWordCount var wc = wordCount if (wordCount - lastWordCount > 10000) { lwc = wordCount // TODO: discount by iteration? // 修正学习率,并且在学习率过小的时候赋予定值. alpha = learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001 logInfo("wordCount = " + wordCount + ", alpha = " + alpha) } wc += sentence.size var pos = 0 while (pos < sentence.size) { val word = sentence(pos) val b = random.nextInt(window) // Train Skip-gram var a = b while (a < window * 2 + 1 - b) { if (a != window) { val c = pos - window + a if (c >= 0 && c < sentence.size) { val lastWord = sentence(c) val l1 = lastWord * vectorSize val neu1e = new Array[Float](vectorSize) // Hierarchical softmax var d = 0 while (d < bcVocab.value(word).codeLen) { val inner = bcVocab.value(word).point(d) val l2 = inner * vectorSize // Propagate hidden -> output var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1) if (f > -MAX_EXP && f < MAX_EXP) { val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt f = expTable.value(ind) val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1) blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1) syn1Modify(inner) += 1 } d += 1 } blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1) syn0Modify(lastWord) += 1 } } a += 1 } pos += 1 } (syn0, syn1, lwc, wc) } val syn0Local = model._1 val syn1Local = model._2 // Only output modified vectors. Iterator.tabulate(vocabSize) { index => if (syn0Modify(index) > 0) { Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize))) } else { None } }.flatten ++ Iterator.tabulate(vocabSize) { index => if (syn1Modify(index) > 0) { Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize))) } else { None } }.flatten } val synAgg = partial.reduceByKey { case (v1, v2) => blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1) v1 }.collect() var i = 0 while (i < synAgg.length) { val index = synAgg(i)._1 if (index < vocabSize) { Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize) } else { Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize) } i += 1 } } newSentences.unpersist() val wordArray = vocab.map(_.word) new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global) }
JY
router-hadoop-online001-jylt.qiyi.virtual
IP 10.153.107.165
Port 22