TensorFlow 基础知识笔记之队列和线程
2017-09-18 12:46
330 查看
TensorFlow 基础知识笔记之队列和线程
和 TensorFlow 中的其他组件一样, 队列(queue)本身也是图中的一个节点,是一种有状态的节点,其他节点,如入队节点(enqueue)和出队节点(dequeue),可以修改它的内容。例如,入队节点可以把新元素插到队列末尾,出队节点可以把队列前面的元素删除。
环境:win7 64位 tensorflow1.3-gpu python3.5
队列
TensorFlow 中主要有两种队列,即 FIFOQueue 和 RandomShuffleQueueFIFOQueue
FIFOQueue 创建一个先入先出队列。例如,我们在训练一些语音、文字样本时,使用循环神经网络的网络结构,希望读入的训练样本是有序的,就要用 FIFOQueue。import tensorflow as tf # 创建一个先入先出队列,初始化队列插入 0.1、 0.2、 0.3 三个数字 q = tf.FIFOQueue(3,"float") init = q.enqueue_many(([0.1,0.2,0.3],)) # 定义出队、+1,入队操作 x = q.dequeue() # 出队是从队首出,返回值为队首的元素 y = x+1 q_inc = q.enqueue([y]) # 入队是队尾 # 开启一个会话 with tf.Session() as sess: sess.run(init) for i in range(2): sess.run(q_inc) # 执行 2 次操作,队列中的值变为 0.3,1.1,1.2 quelen = sess.run(q.size()) for i in range(quelen): print (sess.run(q.dequeue())) # 输出队列的值
0.3 1.1 1.2
RandomShuffleQueue
RandomShuffleQueue 创建一个随机队列,在出队列时,是以随机的顺序产生元素的。例如,我们在训练一些图像样本时,使用 CNN 的网络结构,希望可以无序地读入训练样本,就要用RandomShuffleQueue,每次随机产生一个训练样本。RandomShuffleQueue 在 TensorFlow 使用异步计算时非常重要。因为 TensorFlow 的会话是支持多线程的,我们可以在主线程里执行训练操作,使用 RandomShuffleQueue 作为训练输入,开多个线程来准备训练样本,将样本压入队列后,主线程会从队列中每次取出 mini-batch 的样本进行训练。
import tensorflow as tf # 创建一个随机队列,队列最大长度为 10,出队后最小长度为 2 q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float") # 开启一个会话 sess = tf.Session() for i in range(0,10): sess.run(q.enqueue(i)) for i in range(0,8): print(sess.run(q.dequeue()))
5.0 9.0 2.0 0.0 1.0 7.0 6.0 4.0
发现每一次执行的结果都不一样,说明确实是随机乱序的。
我们尝试修改入队次数为 12 次,再运行,发现程序阻断不动,或者我们尝试修改出队此时为 10 次,即不保留队列最小长度,发现队列输出 8 次结果后,在终端仍然阻断了。
阻断一般发生在:
● 队列长度等于最小值,执行出队操作;
● 队列长度等于最大值,执行入队操作。
可以通过设置绘画在运行时的等待时间来解除阻断:
run_options = tf.RunOptions(timeout_in_ms = 10000) # 等待 10 秒 try: sess.run(q.dequeue(), options=run_options) except tf.errors.DeadlineExceededError: print('out of range')
out of range
队列管理器
上面的例子都是在会话的主线程中进行入队操作。当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练操作。会话中可以运行多个线程,我们
使用线程管理器 QueueRunner 创建一系列的新线程进行入队操作,让主线程继续使用数据,即训
练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。
import tensorflow as tf # 创建一个含有队列的图 q = tf.FIFOQueue(1000,"float") # 创建一个长度为1000的队列 counter = tf.Variable(0.0) # 计数器 increment_op = tf.assign_add(counter,tf.constant(1.0)) # 操作:给计数器加1 enqueque_op = q.enqueue(counter) # 操作:计数器值加入队列 # 创建一个队列管理器 QueueRunner,用这两个操作向队列 q 中添加元素,启动一个线程。 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 启动一个会话,从队列管理器qr中创建线程 #主线程 with tf.Session() as sess: sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) # 启动入队线程 #主线程 for i in range(10): print(sess.run(q.dequeue()))
1.0 1.0 2.0 3.0 5.0 6.0 7.0 8.0 9.0 10.0 ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled ERROR:tensorflow:Exception in QueueRunner: Session has been closed. Exception in thread Thread-22: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1245, in _single_tensor_run fetch_list_as_strings, [], status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Run call was cancelled Exception in thread Thread-23: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run target_list_as_strings, status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.
能输出结果,但最后会异常:
ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled
ERROR:tensorflow:Exception in QueueRunner: Session has been closed.
我们知道,使用with tf.Session的话,会话执行结束会自动关闭,相当于main函数已经结束,
固也就有 Session has been closed.的错误。
import tensorflow as tf # 创建一个含有队列的图 q = tf.FIFOQueue(1000,"float") # 创建一个长度为1000的队列 counter = tf.Variable(0.0) # 计数器 increment_op = tf.assign_add(counter,tf.constant(1.0)) # 操作:给计数器加1 enqueque_op = q.enqueue(counter) # 操作:计数器值加入队列 # 创建一个队列管理器 QueueRunner,用这两个操作向队列 q 中添加元素,启动一个线程。 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 启动一个会话,从队列管理器qr中创建线程 # 主线程 sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) # 主线程 for i in range(10): print(sess.run(q.dequeue()))
1.0 3.0 4.0 5.0 6.0 7.0 7.0 8.0 9.0 10.0
使用Session就不会自动关闭,也就没有了上面例子中的异常了,虽然没有了异常,但也和我们
设想的会打印顺序的1,2,3,4,5…不一样,而且像第一个例子中还会重复打印1.0,这是为什么呢?
这个本质是+1操作和入队操作是异步的,也就是说如果加1操作执行了很多次之后,才执行一次入队的话,就会出现入队不是按我们预想的顺序那样;反过来,当我执行几次入队之后,才执行一次加1操作就会出现一个数重复入队的情况。
那该怎么解决这个问题呢!下面为几种解决的方法
# 方法1 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把两个操作变成列表中的一个元素 # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法2 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) # 原 enqueque_op = q.enqueue(counter) # 把加一操作变成入队操作的依赖 with tf.control_dependencies([increment_op]): enqueque_op = q.enqueue(counter) # 由于将加1变成了入队的依赖,所以入队操作只需要传入enqueque_op就行了 qr = tf.train.QueueRunner(q,enqueue_ops=[enqueque_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法3 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把两个操作变成空操作的依赖 with tf.control_dependencies([increment_op,enqueque_op]): void_op = tf.no_op() # 由于将两个操作变成了空操作的依赖,所以入队操作只需要传入void_op就行了 qr = tf.train.QueueRunner(q,enqueue_ops=[void_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法4 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 用tf.group()把两个操作组合起来 qr = tf.train.QueueRunner(q,enqueue_ops=[tf.group(increment_op,enqueque_op)]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
线程和协调器
QueueRunner 有一个问题就是:入队线程自顾自地执行,在需要的出队操作完成之后,程序没法结束。这样就要使用 tf.train.Coordinator 来实现线程间的同步,终止其他线程。
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主线程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:协调器,协调线程间的关系可以被当做一种信号量,起同步作用 coord = tf.train.Coordinator() # 启动入队线程,协调器是线程的参数 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) # 主线程 for i in range(0,10): print(sess.run(q.dequeue())) coord.request_stop() # 通知其他线程关闭 # join操作等待其他线程结束,其他所有的线程关闭后,这个函数才能返回 coord.join(enqueue_threads)
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
异常
for i in range(10): print(sess.run(q.dequeue()))
21.0 22.0 23.0 24.0 --------------------------------------------------------------------------- OutOfRangeError Traceback (most recent call last) C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py in _do_call(self, fn, *args) 1326 try: -> 1327 return fn(*args) 1328 except errors.OpError as e: C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py in _run_fn(session, feed_dict, fetch_list, target_list, options, run_metadata) 1305 feed_dict, fetch_list, target_list, -> 1306 status, run_metadata) 1307
在关闭队列线程后,再执行出队操作,就会抛出 tf.errors.OutOfRange 错误。这种情况就需要
使用 tf.errors.OutOfRangeError 来捕捉错误,终止循环:
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主线程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:协调器,协调线程间的关系可以被当做一种信号量,起同步作用 coord = tf.train.Coordinator() # 启动入队线程,协调器是线程的参数 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) coord.request_stop() # 通知其他线程关闭 # 主线程 for i in range(0,10): try: print("i : ",i) print(sess.run(q.dequeue())) except tf.errors.OutOfRangeError: print('finish') break # join操作等待其他线程结束,其他所有的线程关闭后,这个函数才能返回 coord.join(enqueue_threads)
i : 0 1.0 i : 1 finish
说明:从打印出来的信息我们可以看出,将请求线程关闭放置在出队的前面,也就是说我还没有出
队之前就请求将线程关闭了,但关闭线程需要一定的时间,所以后来在遍历出队是还是可以执行的
线程关闭后,如果不抛异常的话就像上个例子那样会报错,所以这里执行了异常,并打印出了
“finish”。
参考文献:
http://blog.csdn.net/shenxiaolu1984/article/details/53024513
相关文章推荐
- Java基础知识强化之集合框架笔记20:数据结构之 栈 和 队列
- TensorFlow 基础知识笔记之一些概念
- Java基础知识强化之集合框架笔记80:HashMap的线程不安全性的体现
- 进程和线程的基础知识——Python学习笔记11
- 6.1 Tensorflow笔记(基础篇):队列与线程
- Java基础知识强化之多线程笔记03:进程与线程 和 多线程的意义
- tensorflow笔记(一)之基础知识
- Java多线程编程总结笔记——一多线程基础知识
- 线程基础知识笔记
- 学习笔记TF049:TensorFlow 模型存储加载、队列线程、加载数据、自定义操作
- Tensorflow学习笔记之一 —— 基础知识篇
- 学习笔记TF049:TensorFlow 模型存储加载、队列线程、加载数据、自定义操作
- Java多线程编程总结笔记——一多线程基础知识
- Java笔记 - 线程基础知识
- Java基础知识强化之网络编程笔记05:UDP之多线程实现聊天室案例
- 马哥2013年运维视频笔记 day01 系统基础知识
- 0038 Java学习笔记-多线程-传统线程间通信、Condition、阻塞队列、《疯狂Java讲义 第三版》进程间通信示例代码存在的一个问题
- 深度学习(DL)与卷积神经网络(CNN)学习笔记随笔-01-CNN基础知识点
- Java并发(基础知识)—— 阻塞队列和生产者消费者模式
- Java基础知识强化之IO流笔记29:BufferedOutputStream / BufferedInputStream(字节缓冲区流) 之BufferedInputStream读取数据