您的位置:首页 > 其它

文章标题

2017-04-28 18:25 267 查看
private case class VocabWord(
var word: String,
var cn: Int,
var point: Array[Int],
var code: Array[Int],
var codeLen: Int
)


private def learnVocab(words: RDD[String]): Unit = {
// 统计词频, 并根据word和count, 将其构建称为VocabWord类;
// 根据最小词频minCount进行筛选;
// collect后根据count进行排序.
vocab = words.map(w => (w, 1))
.reduceByKey(_ + _)
.map(x => VocabWord(
x._1,
x._2,
new Array[Int](MAX_CODE_LENGTH),
new Array[Int](MAX_CODE_LENGTH),
0))
.filter(_.cn >= minCount)
.collect()
.sortWith((a, b) => a.cn > b.cn)

// 将字典的大小设置为vocba的长度,并且做出字典不为空的限制
vocabSize = vocab.length
require(vocabSize > 0, "The vocabulary size should be > 0. You may need to check " +
"the setting of minCount, which could be large enough to remove all your words in sentences.")

// 将vocab中的词与a组成键值对,加入到vocabHash中去.
// trainWordsCount用来统计处理的词数.
var a = 0
while (a < vocabSize) {
vocabHash += vocab(a).word -> a
trainWordsCount += vocab(a).cn
a += 1
}
logInfo("trainWordsCount = " + trainWordsCount)
}


创建exp表. 没看明白干嘛的.

private def createExpTable(): Array[Float] = {
val expTable = new Array[Float](EXP_TABLE_SIZE)
var i = 0
while (i < EXP_TABLE_SIZE) {
val tmp = math.exp((2.0 * i / EXP_TABLE_SIZE - 1.0) * MAX_EXP)
expTable(i) = (tmp / (tmp + 1.0)).toFloat
i += 1
}
expTable
}


private def createBinaryTree(): Unit = {
val count = new Array[Long](vocabSize * 2 + 1)
val binary = new Array[Int](vocabSize * 2 + 1)
val parentNode = new Array[Int](vocabSize * 2 + 1)
val code = new Array[Int](MAX_CODE_LENGTH)
val point = new Array[Int](MAX_CODE_LENGTH)
// 提取vocab中词频到count数组中. 仅能覆盖前vocabSize项(0 ~ vocabSize-1),最后a=vocabSize
var a = 0
while (a < vocabSize) {
count(a) = vocab(a).cn
a += 1
}
// 将count后半部分(vocabSize ~ 2*vocabSize-1)赋值1e9.toInt,也就是极大值,最后a=2*vocabSize.
while (a < 2 * vocabSize) {
count(a) = 1e9.toInt
a += 1
}
// 将pos1指向前半部分的最后一项, pos2指向后半部分的第一项.
var pos1 = vocabSize - 1
var pos2 = vocabSize
// 初始化min1i和min2i
var min1i = 0
var min2i = 0

// 构建Huffman树
// 如果pos>=0, 则比较pos1和pos2两个位置的值, 如果pos1较小, 则将pos1赋值给min1i,并将pos1减一,也就是数组的前端推进(数组越前端的元素值越小),否则将pos2赋值给min1i,并将pos2加1. 如果pos1<0, 则直接将pos2赋值给min1i,并将pos2加1.
// 对min2i做同样的处理.
a = 0
while (a < vocabSize - 1) {
if (pos1 >= 0) {
if (count(pos1) < count(pos2)) {
min1i
4000
= pos1
pos1 -= 1
} else {
min1i = pos2
pos2 += 1
}
} else {
min1i = pos2
pos2 += 1
}
if (pos1 >= 0) {
if (count(pos1) < count(pos2)) {
min2i = pos1
pos1 -= 1
} else {
min2i = pos2
pos2 += 1
}
} else {
min2i = pos2
pos2 += 1
}
// 将找到的两个最大节点值进行相加,并且放到索引为vocabSize+a的节点上.并且将子节点和父节点对应的关系存储进parentNode数组中. 将右节点的binary赋值为1.
count(vocabSize + a) = count(min1i) + count(min2i)
parentNode(min1i) = vocabSize + a
parentNode(min2i) = vocabSize + a
binary(min2i) = 1
a += 1
}
// Now assign binary code to each vocabulary word
// 遍历前vocabSize项, 对每一项, 在parentNode数组中查找其父子关系,并将编码存入code中.
var i = 0
a = 0
while (a < vocabSize) {
var b = a
i = 0
while (b != vocabSize * 2 - 2) {
code(i) = binary(b)
point(i) = b
i += 1
b = parentNode(b)
}
vocab(a).codeLen = i
vocab(a).point(0) = vocabSize - 2
b = 0
while (b < i) {
vocab(a).code(i - b - 1) = code(b)
// 这里是因为当初是从子往父找的,所以这里需要进行反转,才符合Huffman编码.
vocab(a).point(i - b) = point(b) - vocabSize
b += 1
}
a += 1
}
}


word2Vec fit方法

def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = {
// 把语料库里的文章都转换为单词
val words = dataset.flatMap(x => x)
// 构造词典
learnVocab(words)
// 构造Huffman树(和初始词向量?)
createBinaryTree()

val sc = dataset.context

val expTable = sc.broadcast(createExpTable())
// vocab: Array[VocabWord]
val bcVocab = sc.broadcast(vocab)
// vocabHash = mutable.HashMap.empty[String, Int]
val bcVocabHash = sc.broadcast(vocabHash)

val sentences: RDD[Array[Int]] = words.mapPartitions { iter =>
new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext

// 重构了partition内的迭代器, 每次next返回长度最多为MAX_SENTENCE_LENGTH的数组
// private val MAX_SENTENCE_LENGTH = 1000
def next(): Array[Int] = {
val sentence = ArrayBuilder.make[Int]
var sentenceLength = 0
while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH) {
val word = bcVocabHash.value.get(iter.next())
word match {
case Some(w) =>
sentence += w
sentenceLength += 1
case None =>
}
}
sentence.result()
}
}
}

// repartition并且cache,提高效率
val newSentences = sentences.repartition(numPartitions).cache()
// 设置初始随机数
val initRandom = new XORShiftRandom(seed)
// 防止词向量超过内存大小.
if (vocabSize.toLong * vectorSize * 8 >= Int.MaxValue) {
throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
" to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
"which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue/8`.")
}
// 初始化syn0Global和syn1Global两个数组,长度为vocabSize*vectorSize
val syn0Global =
Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
val syn1Global = new Array[Float](vocabSize * vectorSize)
// 学习率
var alpha = learningRate
// 迭代次数
for (k <- 1 to numIterations) {
val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
val syn0Modify = new Array[Int](vocabSize)
val syn1Modify = new Array[Int](vocabSize)
val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) {
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
var lwc = lastWordCount
var wc = wordCount
if (wordCount - lastWordCount > 10000) {
lwc = wordCount
// TODO: discount by iteration?
// 修正学习率,并且在学习率过小的时候赋予定值.
alpha =
learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1))
if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001
logInfo("wordCount = " + wordCount + ", alpha = " + alpha)
}
wc += sentence.size
var pos = 0
while (pos < sentence.size) {
val word = sentence(pos)
val b = random.nextInt(window)
// Train Skip-gram
var a = b
while (a < window * 2 + 1 - b) {
if (a != window) {
val c = pos - window + a
if (c >= 0 && c < sentence.size) {
val lastWord = sentence(c)
val l1 = lastWord * vectorSize
val neu1e = new Array[Float](vectorSize)
// Hierarchical softmax
var d = 0
while (d < bcVocab.value(word).codeLen) {
val inner = bcVocab.value(word).point(d)
val l2 = inner * vectorSize
// Propagate hidden -> output
var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
if (f > -MAX_EXP && f < MAX_EXP) {
val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
f = expTable.value(ind)
val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
syn1Modify(inner) += 1
}
d += 1
}
blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
syn0Modify(lastWord) += 1
}
}
a += 1
}
pos += 1
}
(syn0, syn1, lwc, wc)
}
val syn0Local = model._1
val syn1Local = model._2
// Only output modified vectors.
Iterator.tabulate(vocabSize) { index =>
if (syn0Modify(index) > 0) {
Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize)))
} else {
None
}
}.flatten ++ Iterator.tabulate(vocabSize) { index =>
if (syn1Modify(index) > 0) {
Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize)))
} else {
None
}
}.flatten
}
val synAgg = partial.reduceByKey { case (v1, v2) =>
blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1)
v1
}.collect()
var i = 0
while (i < synAgg.length) {
val index = synAgg(i)._1
if (index < vocabSize) {
Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
} else {
Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
}
i += 1
}
}
newSentences.unpersist()

val wordArray = vocab.map(_.word)
new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)
}


JY

router-hadoop-online001-jylt.qiyi.virtual

IP 10.153.107.165

Port 22
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: