您的位置:首页 > 其它

[深度学习] (3)- Keras实现一个简单的翻译器( 从数字到对应的英文 )

2017-09-28 17:41 1231 查看

摘要

来搞NMT啊哈哈哈哈

本文用Keras实现了一个简单的Seq2Seq翻译器( 从数字到对应的英文 ), 参考以下文章

http://blog.csdn.net/mmc2015/article/details/72773854

http://www.zmonster.me/2016/05/29/sequence_to_sequence_with_keras.html

注意,这里使用的翻译方法在句尾没有使用EOS标识符,所以对于复杂变长序列无法实现训练和预测,仅仅只是作为一个简单的演示。

源代码

# 使用Keras实现机器翻译-》完整框架

# coding: utf-8

import os
import re
import string
from itertools import dropwhile
from functools import reduce

import click
import numpy as np
import time
import pickle

from keras.models import Sequential, model_from_json
from keras.layers.embeddings import Embedding
from keras.layers.recurrent import LSTM
from keras.layers.recurrent import GRU
from keras.layers.wrappers import TimeDistributed
from keras.layers.core import Dense, RepeatVector, Activation

import keras_model.Create_data_for_Keras_test5 as CK5

class Keras_NMT:

<
4000
span class="hljs-keyword">def __init__(self):

# 类管理的模型
self.MODEL = None

# 包含了训练数据中,翻译源句和翻译目标句的所有单词的辞典的大小,也就是BOG的大小
self.VOCAB_SIZE = 0
# 训练数据中,翻译源句的最大长度
self.MAX_INPUT_LEN = 0
# 训练数据中,翻译目标句的最大长度
self.MAX_OUTPUT_LEN = 0

# 词语与ids的映射字典
self.word_to_idx = None  # 编码时需要将字符映射成数字index
self.idx_to_word = None  # 解码时需要将数字index映射成字符

# LSTM的隐层单元个数
self.HIDDEN_DIM = 20

# 记录LOSS损失
self.loss_ = None

self.MODEL_STRUCT_FILE = 'piglatin_struct.json'
self.MODEL_WEIGHTS_FILE = 'piglatin_weights.h5'

# 对句子进行分词
def tokenize(self, sent):
return sent.split()

# 将所有的句子进行长度对齐
def pad_sequences(self, sequences, maxlen=None, dtype='int32',
padding='pre', truncating='pre', value=0.):
'''''Pads each sequence to the same length:
the length of the longest sequence.
If maxlen is provided, any sequence longer
than maxlen is truncated to maxlen.
Truncation happens off either the beginning (default) or
the end of the sequence.
Supports post-padding and pre-padding (default).
# Arguments
sequences: list of lists where each element is a sequence
maxlen: int, maximum length
dtype: type to cast the resulting sequence.
padding: 'pre' or 'post', pad either before or after each sequence.
truncating: 'pre' or 'post', remove values from sequences larger than
maxlen either in the beginning or in the end of the sequence
value: float, value to pad the sequences to the desired value.
# Returns
x: numpy array with dimensions (number_of_sequences, maxlen)
'''
lengths = [len(s) for s in sequences]

nb_samples = len(sequences)
if maxlen is None:
maxlen = np.max(lengths)

# take the sample shape from the first non empty sequence
# checking for consistency in the main loop below.
sample_shape = tuple()
for s in sequences:
if len(s) > 0:
sample_shape = np.asarray(s).shape[1:]
break

x = (np.ones((nb_samples, maxlen) + sample_shape) * value).astype(dtype)
for idx, s in enumerate(sequences):
if len(s) == 0:
continue  # empty list was found
if truncating == 'pre':
trunc = s[-maxlen:]
elif truncating == 'post':
trunc = s[:maxlen]
else:
raise ValueError('Truncating type "%s" not understood' % truncating)

# check `trunc` has expected shape
trunc = np.asarray(trunc, dtype=dtype)
if trunc.shape[1:] != sample_shape:
raise ValueError(
'Shape of sample %s of sequence at position %s is different from expected shape %s' %
(trunc.shape[1:], idx, sample_shape))

if padding == 'post':
x[idx, :len(trunc)] = trunc
elif padding == 'pre':
x[idx, -len(trunc):] = trunc
else:
raise ValueError('Padding type "%s" not understood' % padding)
return x

# 将对齐后的句子进行id转化
def vectorize_stories(self, input_list, tar_list, word_idx, input_maxlen, tar_maxlen, vocab_size):
x_set = []
Y = np.zeros((len(tar_list), tar_maxlen, vocab_size), dtype=np.int)
for _sent in input_list:
x = [word_idx[w] for w in _sent]
x_set.append(x)
for s_index, tar_tmp in enumerate(tar_list):
for t_index, token in enumerate(tar_tmp):
Y[s_index, t_index, word_idx[token]] = 1

return self.pad_sequences(x_set, maxlen=input_maxlen), Y

def build_data(self, input_text, tar_text, tokenized=False):

input_list = []
tar_list = []

# 判断当前数据是否已经tokenized化
if not tokenized:
for tmp_input in input_text:
input_list.append(self.tokenize(tmp_input))
for tmp_tar in tar_text:
tar_list.append(self.tokenize(tmp_tar))
else:
input_list = input_text
tar_list = tar_text

# 构建辞典
vocab = sorted(reduce(lambda x, y: x | y, (set(tmp_list) for tmp_list in input_list + tar_list)))

self.VOCAB_SIZE = len(vocab) + 1  # keras进行embedding的时候必须进行len(vocab)+1
self.MAX_INPUT_LEN = max(map(len, (x for x in input_list)))
self.MAX_OUTPUT_LEN = max(map(len, (x for x in tar_list)))

print('-')
print('Vocab size:', self.VOCAB_SIZE, 'unique words')
print('Input max length:', self.MAX_INPUT_LEN, 'words')
print('Target max length:', self.MAX_OUTPUT_LEN, 'words')
print('Dimension of hidden vectors:', self.HIDDEN_DIM)
print('Number of training stories:', len(input_list))
print('Number of test stories:', len(input_list))
print('-')
print('Vectorizing the word sequences...')
self.word_to_idx = dict((c, i + 1) for i, c in enumerate(vocab))  # 编码时需要将字符映射成数字index
self.idx_to_word = dict((i + 1, c) for i, c in enumerate(vocab))  # 解码时需要将数字index映射成字符

inputs_train, tars_train = self.vectorize_stories(
input_list=input_list,
tar_list=tar_list,
word_idx=self.word_to_idx,
input_maxlen=self.MAX_INPUT_LEN,
tar_maxlen=self.MAX_OUTPUT_LEN,
vocab_size=self.VOCAB_SIZE
)

return inputs_train, tars_train

def build_model(self, rnn_model=LSTM):

output_dim = self.VOCAB_SIZE
hidden_dim = self.HIDDEN_DIM

encoder_top_layer = LSTM(hidden_dim)
decoder_top_layer = LSTM(hidden_dim, return_sequences=True)
decoder_top_layer.get_weights()

en_de_model = Sequential()
en_de_model.add(Embedding(input_dim=output_dim,
output_dim=hidden_dim,
input_length=self.MAX_INPUT_LEN))
en_de_model.add(encoder_top_layer)
en_de_model.add(RepeatVector(self.MAX_OUTPUT_LEN))
en_de_model.add(decoder_top_layer)
en_de_model.add(TimeDistributed(Dense(output_dim=output_dim)))
en_de_model.add(Activation('softmax'))

print('Compiling...')
time_start = time.time()
en_de_model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
time_end = time.time()
print('Compiled, cost time:%fsecond!' % (time_end - time_start))

return en_de_model

def save_model_to_file(self, model, struct_file, weights_file):
# save model structure
model_struct = model.to_json()
open(struct_file, 'w').write(model_struct)

# save model weights
model.save_weights(weights_file, overwrite=True)

def build_model_from_file(self, struct_file, weights_file):
model = model_from_json(open(struct_file, 'r').read())
# model.compile(loss="mse", optimizer='adam')
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.load_weights(weights_file)
self.MODEL = model

# X, y必须是以下格式的字符串数组
# X = ['1 2 3 4 5'
#     , '2 3 4 5 6'
#     , '6 7 8 9 10'
#     , '11 12 13 14 15'
#     , '16 17 18 19 20'
#     , '21 22 23 24 25']
# y = ['one two three four five'
#     , 'two three four five six'
#     , 'six seven eight nine ten'
#     , 'eleven twelve thirteen fourteen fifteen'
#     , 'sixteen seventeen eighteen nineteen twenty'
#     , 'twenty_one twenty_two twenty_three twenty_four twenty_five']
def train(self, X, y, epoch=1000, batch_size=1):

# 得到经过序列化以及句子对齐之后的数据
inputs_train, tars_train = self.build_data(input_text=X, tar_text=y, tokenized=False)

# 构建模型
self.MODEL = self.build_model()
history_callback = self.MODEL.fit(inputs_train, tars_train, batch_size=batch_size, nb_epoch=epoch)

# 记录每一步的loss代价
self.loss_ = (history_callback.history["loss"])

def predict_result(self, X):
word = [self.tokenize(t) for t in X]
x = []
for j in word:
x.append([self.word_to_idx[i] for i in j])

# 对新输入的数据进行对齐,如1 2 3 4 => 1 2 3 4 0 (当maxlen为5时)
x_align = self.pad_sequences(x, maxlen=self.MAX_INPUT_LEN, padding='post')

# 这里要将x转化为np的数组,否则会报错
x = np.array(x_align)
out_predicts = self.MODEL.predict(x)

for i_idx, out_predict in enumerate(out_predicts):
predict_sequence = []
for predict_vector in out_predict:
next_index = np.argmax(predict_vector)  # predict_vector is output_dim(here is 51) dimension vector
next_token = self.idx_to_word[
next_ind
d482
ex]  # key of idx_to_word starts from 1, but next_index can be 0(the probability is very low!)
predict_sequence.append(next_token)
print('Target output:', X[i_idx])
print('Predict output:', predict_sequence)

def main():
input_text = ['1 2 3 4 5 6'
, '2 3 4 5 6'
, '6 7 8 9 10'
, '11 12 13 14 15'
, '16 17 18 19 20'
, '21 22 23 24 25']
tar_text = ['one two three four five six'
, 'two three four five six'
, 'six seven eight nine ten'
, 'eleven twelve thirteen fourteen fifteen'
, 'sixteen seventeen eighteen nineteen twenty'
, 'twenty_one twenty_two twenty_three twenty_four twenty_five']

k = Keras_NMT()

# 如果没有可用的模型
if not os.path.exists('./' + k.MODEL_STRUCT_FILE):
# 生成epoch大小的训练集
input_text, tar_text = CK5.create_data(epoch=1000)
k.train(X=input_text, y=tar_text, epoch=1000, batch_size=20)

# 保存模型
# 1. 保存Keras模型
struct_file = os.path.join('./', k.MODEL_STRUCT_FILE)
weights_file = os.path.join('./', k.MODEL_WEIGHTS_FILE)
k.save_model_to_file(k.MODEL, struct_file, weights_file)
# 2. 保存其他信息
model = {
'VOCAB_SIZE': k.VOCAB_SIZE,
'MAX_INPUT_LEN': k.MAX_INPUT_LEN,
'MAX_OUTPUT_LEN': k.MAX_OUTPUT_LEN,
'word_to_idx': k.word_to_idx,
'idx_to_word': k.idx_to_word,
'HIDDEN_DIM': k.HIDDEN_DIM,
'loss_': k.loss_
}
pickle.dump(model, open('./model.pkl', 'wb'), protocol=-1)

else:
# 从文件中分别读取Keras模型 和 其他参数
k.build_model_from_file('./'+k.MODEL_STRUCT_FILE, './'+k.MODEL_WEIGHTS_FILE)
model = pickle.load(open("./model.pkl", "rb+"))

# 将模型参数导入到对象中
k.VOCAB_SIZE = model['VOCAB_SIZE']
k.MAX_INPUT_LEN = model['MAX_INPUT_LEN']
k.MAX_OUTPUT_LEN = model['MAX_OUTPUT_LEN']
k.word_to_idx = model['word_to_idx']
k.idx_to_word = model['idx_to_word']
k.HIDDEN_DIM = model['HIDDEN_DIM']
k.loss_ = model['loss_']

k.predict_result(X=['1 2 3 4 5'])

if __name__ == '__main__':
main()


Create_Data()

# 使用随机算法生成大量测试用例

import random

def create_data(epoch=10):
BASIC_DICT = {
'1':'one',
'2':'two',
'3':'three',
'4':'four',
'5':'five',
'6':'six',
'7':'seven',
'8':'eight',
'9':'nine'
}

X = []
y = []
epoch = epoch  # 生成数据的个数
data_range = (1, len(BASIC_DICT))  # 数字的范围
data_length_range = (5, 5)  # 每一条数据的长度

for _ in range(epoch):
length = random.randint(data_length_range[0], data_length_range[1])
x_t = ''
y_t = ''
for _ in range(length):
num = random.randint(data_range[0], data_range[1])
x_t += (str(num)+' ')
y_t += BASIC_DICT[str(num)]+' '

# 防止重复
if(x_t[0:-1] in X):
continue

X.append(x_t[0:-1])
y.append(y_t[0:-1])

return X, y


常见问题

如何保存Keras每一步的LOSS

https://stackoverflow.com/questions/38445982/how-to-log-keras-loss-output-to-a-file

Seq2Seq

TensorFlow 的Seq2Seq(还在学习中)

在r1.3版本对于Seq2Seq的封装已经改地方了,参考如下知乎文章

https://www.zhihu.com/question/57469076

教程在此

https://www.tensorflow.org/api_docs/python/tf/contrib/seq2seq

PTB

TensorFlow中关于RNN进行PTB训练的教程

https://www.tensorflow.org/versions/r0.12/tutorials/recurrent/

-

示例代码的Git地址

https://github.com/tensorflow/models/tree/master/tutorials/rnn/ptb

-

预处理过的数据

http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz

该数据进行过预处理:

The dataset is already preprocessed and contains overall 10000 different words, including the end-of-sentence marker and a special symbol
(\<unk>)
for rare words. We convert all of them in the reader.py to unique integer identifiers to make it easy for the neural network to process.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐