[深度学习] (3)- Keras实现一个简单的翻译器( 从数字到对应的英文 )
2017-09-28 17:41
1231 查看
摘要
来搞NMT啊哈哈哈哈本文用Keras实现了一个简单的Seq2Seq翻译器( 从数字到对应的英文 ), 参考以下文章
http://blog.csdn.net/mmc2015/article/details/72773854
http://www.zmonster.me/2016/05/29/sequence_to_sequence_with_keras.html
注意,这里使用的翻译方法在句尾没有使用EOS标识符,所以对于复杂变长序列无法实现训练和预测,仅仅只是作为一个简单的演示。
源代码
# 使用Keras实现机器翻译-》完整框架 # coding: utf-8 import os import re import string from itertools import dropwhile from functools import reduce import click import numpy as np import time import pickle from keras.models import Sequential, model_from_json from keras.layers.embeddings import Embedding from keras.layers.recurrent import LSTM from keras.layers.recurrent import GRU from keras.layers.wrappers import TimeDistributed from keras.layers.core import Dense, RepeatVector, Activation import keras_model.Create_data_for_Keras_test5 as CK5 class Keras_NMT: < 4000 span class="hljs-keyword">def __init__(self): # 类管理的模型 self.MODEL = None # 包含了训练数据中,翻译源句和翻译目标句的所有单词的辞典的大小,也就是BOG的大小 self.VOCAB_SIZE = 0 # 训练数据中,翻译源句的最大长度 self.MAX_INPUT_LEN = 0 # 训练数据中,翻译目标句的最大长度 self.MAX_OUTPUT_LEN = 0 # 词语与ids的映射字典 self.word_to_idx = None # 编码时需要将字符映射成数字index self.idx_to_word = None # 解码时需要将数字index映射成字符 # LSTM的隐层单元个数 self.HIDDEN_DIM = 20 # 记录LOSS损失 self.loss_ = None self.MODEL_STRUCT_FILE = 'piglatin_struct.json' self.MODEL_WEIGHTS_FILE = 'piglatin_weights.h5' # 对句子进行分词 def tokenize(self, sent): return sent.split() # 将所有的句子进行长度对齐 def pad_sequences(self, sequences, maxlen=None, dtype='int32', padding='pre', truncating='pre', value=0.): '''''Pads each sequence to the same length: the length of the longest sequence. If maxlen is provided, any sequence longer than maxlen is truncated to maxlen. Truncation happens off either the beginning (default) or the end of the sequence. Supports post-padding and pre-padding (default). # Arguments sequences: list of lists where each element is a sequence maxlen: int, maximum length dtype: type to cast the resulting sequence. padding: 'pre' or 'post', pad either before or after each sequence. truncating: 'pre' or 'post', remove values from sequences larger than maxlen either in the beginning or in the end of the sequence value: float, value to pad the sequences to the desired value. # Returns x: numpy array with dimensions (number_of_sequences, maxlen) ''' lengths = [len(s) for s in sequences] nb_samples = len(sequences) if maxlen is None: maxlen = np.max(lengths) # take the sample shape from the first non empty sequence # checking for consistency in the main loop below. sample_shape = tuple() for s in sequences: if len(s) > 0: sample_shape = np.asarray(s).shape[1:] break x = (np.ones((nb_samples, maxlen) + sample_shape) * value).astype(dtype) for idx, s in enumerate(sequences): if len(s) == 0: continue # empty list was found if truncating == 'pre': trunc = s[-maxlen:] elif truncating == 'post': trunc = s[:maxlen] else: raise ValueError('Truncating type "%s" not understood' % truncating) # check `trunc` has expected shape trunc = np.asarray(trunc, dtype=dtype) if trunc.shape[1:] != sample_shape: raise ValueError( 'Shape of sample %s of sequence at position %s is different from expected shape %s' % (trunc.shape[1:], idx, sample_shape)) if padding == 'post': x[idx, :len(trunc)] = trunc elif padding == 'pre': x[idx, -len(trunc):] = trunc else: raise ValueError('Padding type "%s" not understood' % padding) return x # 将对齐后的句子进行id转化 def vectorize_stories(self, input_list, tar_list, word_idx, input_maxlen, tar_maxlen, vocab_size): x_set = [] Y = np.zeros((len(tar_list), tar_maxlen, vocab_size), dtype=np.int) for _sent in input_list: x = [word_idx[w] for w in _sent] x_set.append(x) for s_index, tar_tmp in enumerate(tar_list): for t_index, token in enumerate(tar_tmp): Y[s_index, t_index, word_idx[token]] = 1 return self.pad_sequences(x_set, maxlen=input_maxlen), Y def build_data(self, input_text, tar_text, tokenized=False): input_list = [] tar_list = [] # 判断当前数据是否已经tokenized化 if not tokenized: for tmp_input in input_text: input_list.append(self.tokenize(tmp_input)) for tmp_tar in tar_text: tar_list.append(self.tokenize(tmp_tar)) else: input_list = input_text tar_list = tar_text # 构建辞典 vocab = sorted(reduce(lambda x, y: x | y, (set(tmp_list) for tmp_list in input_list + tar_list))) self.VOCAB_SIZE = len(vocab) + 1 # keras进行embedding的时候必须进行len(vocab)+1 self.MAX_INPUT_LEN = max(map(len, (x for x in input_list))) self.MAX_OUTPUT_LEN = max(map(len, (x for x in tar_list))) print('-') print('Vocab size:', self.VOCAB_SIZE, 'unique words') print('Input max length:', self.MAX_INPUT_LEN, 'words') print('Target max length:', self.MAX_OUTPUT_LEN, 'words') print('Dimension of hidden vectors:', self.HIDDEN_DIM) print('Number of training stories:', len(input_list)) print('Number of test stories:', len(input_list)) print('-') print('Vectorizing the word sequences...') self.word_to_idx = dict((c, i + 1) for i, c in enumerate(vocab)) # 编码时需要将字符映射成数字index self.idx_to_word = dict((i + 1, c) for i, c in enumerate(vocab)) # 解码时需要将数字index映射成字符 inputs_train, tars_train = self.vectorize_stories( input_list=input_list, tar_list=tar_list, word_idx=self.word_to_idx, input_maxlen=self.MAX_INPUT_LEN, tar_maxlen=self.MAX_OUTPUT_LEN, vocab_size=self.VOCAB_SIZE ) return inputs_train, tars_train def build_model(self, rnn_model=LSTM): output_dim = self.VOCAB_SIZE hidden_dim = self.HIDDEN_DIM encoder_top_layer = LSTM(hidden_dim) decoder_top_layer = LSTM(hidden_dim, return_sequences=True) decoder_top_layer.get_weights() en_de_model = Sequential() en_de_model.add(Embedding(input_dim=output_dim, output_dim=hidden_dim, input_length=self.MAX_INPUT_LEN)) en_de_model.add(encoder_top_layer) en_de_model.add(RepeatVector(self.MAX_OUTPUT_LEN)) en_de_model.add(decoder_top_layer) en_de_model.add(TimeDistributed(Dense(output_dim=output_dim))) en_de_model.add(Activation('softmax')) print('Compiling...') time_start = time.time() en_de_model.compile(loss='categorical_crossentropy', optimizer='rmsprop') time_end = time.time() print('Compiled, cost time:%fsecond!' % (time_end - time_start)) return en_de_model def save_model_to_file(self, model, struct_file, weights_file): # save model structure model_struct = model.to_json() open(struct_file, 'w').write(model_struct) # save model weights model.save_weights(weights_file, overwrite=True) def build_model_from_file(self, struct_file, weights_file): model = model_from_json(open(struct_file, 'r').read()) # model.compile(loss="mse", optimizer='adam') model.compile(loss='categorical_crossentropy', optimizer='rmsprop') model.load_weights(weights_file) self.MODEL = model # X, y必须是以下格式的字符串数组 # X = ['1 2 3 4 5' # , '2 3 4 5 6' # , '6 7 8 9 10' # , '11 12 13 14 15' # , '16 17 18 19 20' # , '21 22 23 24 25'] # y = ['one two three four five' # , 'two three four five six' # , 'six seven eight nine ten' # , 'eleven twelve thirteen fourteen fifteen' # , 'sixteen seventeen eighteen nineteen twenty' # , 'twenty_one twenty_two twenty_three twenty_four twenty_five'] def train(self, X, y, epoch=1000, batch_size=1): # 得到经过序列化以及句子对齐之后的数据 inputs_train, tars_train = self.build_data(input_text=X, tar_text=y, tokenized=False) # 构建模型 self.MODEL = self.build_model() history_callback = self.MODEL.fit(inputs_train, tars_train, batch_size=batch_size, nb_epoch=epoch) # 记录每一步的loss代价 self.loss_ = (history_callback.history["loss"]) def predict_result(self, X): word = [self.tokenize(t) for t in X] x = [] for j in word: x.append([self.word_to_idx[i] for i in j]) # 对新输入的数据进行对齐,如1 2 3 4 => 1 2 3 4 0 (当maxlen为5时) x_align = self.pad_sequences(x, maxlen=self.MAX_INPUT_LEN, padding='post') # 这里要将x转化为np的数组,否则会报错 x = np.array(x_align) out_predicts = self.MODEL.predict(x) for i_idx, out_predict in enumerate(out_predicts): predict_sequence = [] for predict_vector in out_predict: next_index = np.argmax(predict_vector) # predict_vector is output_dim(here is 51) dimension vector next_token = self.idx_to_word[ next_ind d482 ex] # key of idx_to_word starts from 1, but next_index can be 0(the probability is very low!) predict_sequence.append(next_token) print('Target output:', X[i_idx]) print('Predict output:', predict_sequence) def main(): input_text = ['1 2 3 4 5 6' , '2 3 4 5 6' , '6 7 8 9 10' , '11 12 13 14 15' , '16 17 18 19 20' , '21 22 23 24 25'] tar_text = ['one two three four five six' , 'two three four five six' , 'six seven eight nine ten' , 'eleven twelve thirteen fourteen fifteen' , 'sixteen seventeen eighteen nineteen twenty' , 'twenty_one twenty_two twenty_three twenty_four twenty_five'] k = Keras_NMT() # 如果没有可用的模型 if not os.path.exists('./' + k.MODEL_STRUCT_FILE): # 生成epoch大小的训练集 input_text, tar_text = CK5.create_data(epoch=1000) k.train(X=input_text, y=tar_text, epoch=1000, batch_size=20) # 保存模型 # 1. 保存Keras模型 struct_file = os.path.join('./', k.MODEL_STRUCT_FILE) weights_file = os.path.join('./', k.MODEL_WEIGHTS_FILE) k.save_model_to_file(k.MODEL, struct_file, weights_file) # 2. 保存其他信息 model = { 'VOCAB_SIZE': k.VOCAB_SIZE, 'MAX_INPUT_LEN': k.MAX_INPUT_LEN, 'MAX_OUTPUT_LEN': k.MAX_OUTPUT_LEN, 'word_to_idx': k.word_to_idx, 'idx_to_word': k.idx_to_word, 'HIDDEN_DIM': k.HIDDEN_DIM, 'loss_': k.loss_ } pickle.dump(model, open('./model.pkl', 'wb'), protocol=-1) else: # 从文件中分别读取Keras模型 和 其他参数 k.build_model_from_file('./'+k.MODEL_STRUCT_FILE, './'+k.MODEL_WEIGHTS_FILE) model = pickle.load(open("./model.pkl", "rb+")) # 将模型参数导入到对象中 k.VOCAB_SIZE = model['VOCAB_SIZE'] k.MAX_INPUT_LEN = model['MAX_INPUT_LEN'] k.MAX_OUTPUT_LEN = model['MAX_OUTPUT_LEN'] k.word_to_idx = model['word_to_idx'] k.idx_to_word = model['idx_to_word'] k.HIDDEN_DIM = model['HIDDEN_DIM'] k.loss_ = model['loss_'] k.predict_result(X=['1 2 3 4 5']) if __name__ == '__main__': main()
Create_Data()
# 使用随机算法生成大量测试用例 import random def create_data(epoch=10): BASIC_DICT = { '1':'one', '2':'two', '3':'three', '4':'four', '5':'five', '6':'six', '7':'seven', '8':'eight', '9':'nine' } X = [] y = [] epoch = epoch # 生成数据的个数 data_range = (1, len(BASIC_DICT)) # 数字的范围 data_length_range = (5, 5) # 每一条数据的长度 for _ in range(epoch): length = random.randint(data_length_range[0], data_length_range[1]) x_t = '' y_t = '' for _ in range(length): num = random.randint(data_range[0], data_range[1]) x_t += (str(num)+' ') y_t += BASIC_DICT[str(num)]+' ' # 防止重复 if(x_t[0:-1] in X): continue X.append(x_t[0:-1]) y.append(y_t[0:-1]) return X, y
常见问题
如何保存Keras每一步的LOSShttps://stackoverflow.com/questions/38445982/how-to-log-keras-loss-output-to-a-file
Seq2Seq
TensorFlow 的Seq2Seq(还在学习中)
在r1.3版本对于Seq2Seq的封装已经改地方了,参考如下知乎文章https://www.zhihu.com/question/57469076
教程在此
https://www.tensorflow.org/api_docs/python/tf/contrib/seq2seq
PTB
TensorFlow中关于RNN进行PTB训练的教程https://www.tensorflow.org/versions/r0.12/tutorials/recurrent/
-
示例代码的Git地址
https://github.com/tensorflow/models/tree/master/tutorials/rnn/ptb
-
预处理过的数据
http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz
该数据进行过预处理:
The dataset is already preprocessed and contains overall 10000 different words, including the end-of-sentence marker and a special symbol
(\<unk>)for rare words. We convert all of them in the reader.py to unique integer identifiers to make it easy for the neural network to process.
相关文章推荐
- [深度学习] (1): 实现一个简单的XOR
- sql server 关于表中只增标识问题 C# 实现自动化打开和关闭可执行文件(或 关闭停止与系统交互的可执行文件) ajaxfileupload插件上传图片功能,用MVC和aspx做后台各写了一个案例 将小写阿拉伯数字转换成大写的汉字, C# WinForm 中英文实现, 国际化实现的简单方法 ASP.NET Core 2 学习笔记(六)ASP.NET Core 2 学习笔记(三)
- 深度学习(二):用CNTK在Python下实现一个简单的FeedForward网络
- [5]深度学习和Keras----一个图像识别的简单Demo
- 深度学习 python 脚本实现 keras mninst 数字识别 预测端 code
- 神经网络与深度学习 1.3 神经网络的架构 1.4 一个简单的分类手写数字的网络架构
- 一个简单猜数字游戏的实现(二)
- win32sdk学习 richedit实现的一个简单记事本
- 金山笔试题-字符串排序 :" 写一个函数,实现对给定的字符串(字符串里面包括:英文字母,数字,符号)的处理"
- [翻译]案例学习:仅使用Redis+PHP设计实现一个简单的Twitter
- JBPM学习(一):实现一个简单的工作流例子全过程
- 一个简单的算法---实现找出数组中一个数字出现次数最多的数字
- core java 8 学习笔记(一) 一个简单的图片查看器的实现
- 一个简单实现的字符串数字乘法。
- (Map实现)有一个字符串,其中包含中文字符、英文字符和数字字符,请统计和打印出各个字符的个数
- 一个简单猜数字游戏的实现(一)
- IOS学习笔记之实现一个简单的表
- 学习javascript的闭包特性用C#来实现一个简单的例子
- 转[翻译]案例学习:仅使用Redis+PHP设计实现一个简单的Twitter
- 数据结构学习三(一个简单的队列实现)