您的位置:首页 > 其它

tensorflow 程序挂起的原因,即整个进程不报错又不执行的原因

2016-11-28 09:42 288 查看
一、说明:在使用tensorflow的过程中,出现过程序不报错又不接下去执行的错误,后来分析了原因是tf的数据线程没有启动,导致数据流图没办法计算,整个程序就卡在哪里。

更深层次的原因是tensorflow的计算和数据读入是异步的,合理的方式是主线程进行模型的训练,然后开一个数据读入线程异步读入数据.tensorflow会在内存中维护一个队列,然后数据线程异步从磁盘中将样本推入队列当中。并且,因为tensorflow的训练和读数据是异步的,故即使当前没有数据进来,tensorflow也没办法报错,因为可能接下来会有数据进队列,所以,tensorflow就一直处于等待的状态

说明:我是在修改Tensorflow的源码ptb_word_lm.py的时候遇到上述的问题的。下面就该源码来解释说明这个问题:

tensorflow的reader.py文件:

"""Utilities for parsing PTB text files."""
#-*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import os

import tensorflow as tf

#将文件中所有的word收集起来
def _read_words(filename):
with tf.gfile.GFile(filename, "r") as f:
return f.read().decode("utf-8").replace("\n", "<eos>").split()

#将收集到的word映射到id
def _build_vocab(filename):
data = _read_words(filename)

counter = collections.Counter(data)
count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0]))

words, _ = list(zip(*count_pairs))
word_to_id = dict(zip(words, range(len(words))))

return word_to_id

#使用训练集的word建立word的映射表
#
def _file_to_word_ids(filename, word_to_id):
data = _read_words(filename)
return [word_to_id[word] for word in data if word in word_to_id]

def ptb_raw_data(data_path=None):
"""Load PTB raw data from data directory "data_path".

Reads PTB text files, converts strings to integer ids,
and performs mini-batching of the inputs.

The PTB dataset comes from Tomas Mikolov's webpage:
 http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz 
Args:
data_path: string path to the directory where simple-examples.tgz has
been extracted.

Returns:
tuple (train_data, valid_data, test_data, vocabulary)
where each of the data objects can be passed to PTBIterator.
"""

train_path = os.path.join(data_path, "ptb.train.txt")
valid_path = os.path.join(data_path, "ptb.valid.txt")
test_path = os.path.join(data_path, "ptb.test.txt")

word_to_id = _build_vocab(train_path)
train_data = _file_to_word_ids(train_path, word_to_id)
valid_data = _file_to_word_ids(valid_path, word_to_id)
test_data = _file_to_word_ids(test_path, word_to_id)
vocabulary = len(word_to_id)
return train_data, valid_data, test_data, vocabulary

def ptb_producer(raw_data, batch_size, num_steps, name=None):
"""Iterate on the raw PTB data.

This chunks up raw_data into batches of examples and returns Tensors that
are drawn from these batches.

Args:
raw_data: one of the raw data outputs from ptb_raw_data.
batch_size: int, the batch size.
num_steps: int, the number of unrolls.
name: the name of this operation (optional).

Returns:
A pair of Tensors, each shaped [batch_size, num_steps]. The second element
of the tuple is the same data time-shifted to the right by one.

Raises:
tf.errors.InvalidArgumentError: if batch_size or num_steps are too high.
"""
with tf.name_scope(name, "PTBProducer", [raw_data, batch_size, num_steps]):
raw_data = tf.convert_to_tensor(raw_data, name="raw_data", dtype=tf.int32)

data_len = tf.size(raw_data)
batch_len = data_len // batch_size
data = tf.reshape(raw_data[0 : batch_size * batch_len],
[batch_size, batch_len])

epoch_size = (batch_len - 1) // num_steps
assertion = tf.assert_positive(
epoch_size,
message="epoch_size == 0, decrease batch_size or num_steps")
with tf.control_dependencies([assertion]):
epoch_size = tf.identity(epoch_size, name="epoch_size")

i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
return x, y


说明:详解这个reader.py文件:

1、产生一个队列,里面的数是0到epoch_size-1.然后定义了一个出队操作,说明队列也是数据流图中的一个结点.使用了range_input_producer之后,会自动产生一个QueueRunner. A 
QueueRunner
 for
the Queue is added to the current 
Graph
's
QUEUE_RUNNER
 collection.

i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
2、定义了切片操作,返回训练样本的x和y

x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
3、具体使用说明:

    在使用的过程中,只要每次迭代的时候,我们取一下x,y。那么,就会触发跟x,y相关联的操作,也即出队操作和切片操作,为我们生成数据.但是,通过队列的方式来读入数据都是一种多线程读入数据的方式,要在session当中将该线程开启,不然就会挂起。

二、分析错误的情况&相应的修改办法

1、错误的情况

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#为何要进行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)

class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000

if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"

#wrong,使用session就会出现读不出数据的错误,读不出数据,整个数据流图就无法计算,整个程序就处于挂起的状态
#使用session会出错
with tf.Session() as sess:
for step in range(1):
print sess.run(train_input.input_data)
说明:在Session当中,没有启动数据读入线程。所以,sess.run(train_input.input_data)就是无数据可取,程序就处于一种挂起的状态。

2、解决方案

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#为何要进行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)

class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000

if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"

#right,使用Supervisor()
#sv = tf.train.Supervisor()
#with sv.managed_session() as sess:
#	for step in range(1):
#		print sess.run(train_input.input_data)

#right
# Create a session for running operations in the Graph.
sess = tf.Session()
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# Run training steps or whatever
try:
for step in range(2):
print sess.run(train_input.input_data)
except Exception,e:
#Report exceptions to the coordinator
coord.request_stop(e)
coord.request_stop()
# Terminate as usual.  It is innocuous to request stop twice.
coord.join(threads)
sess.close()


说明:使用tf.train.range_input_producer(epoch_size, shuffle=False),会默认将
QueueRunner
添加到全局图中,我们必须使用tf.train.start_queue_runners(sess=sess),去启动该线程。然后使用coord
= tf.train.Coordinator()去做一些线程的同步工作。

3、解决方案:

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#为何要进行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)

class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000

if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"

#right,使用Supervisor()
sv = tf.train.Supervisor()
with sv.managed_session() as sess:
for step in range(1):
print sess.run(train_input.input_data)


说明:使用sv = tf.train.Supervisor()会比较方便,文档上说,The Supervisor is a small wrapper around a 
Coordinator
,
Saver
, and a 
SessionManager


也即使用了Supervisor(),那么保存模型,线程同步的事情都不用我们去干涉了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐