(Deep Reinforcement Learning with Double Q-learning, H. van Hasselt et al., arXiv, 2015)(dqn)练习
2017-05-23 17:01
513 查看
代码使用python 2.x版本 ,tensorflow 使用1.1(cpu)版本
论文地址:https://arxiv.org/abs/1509.06461===============第一个文件 replay_memory.py================================
import numpy as np MEMORYSIZE = 600000 class Replay_memory: def __init__(self): self.previous_state = np.empty((MEMORYSIZE, 4), dtype=np.float32) self.action = np.empty((MEMORYSIZE, 1), dtype=np.uint8)#0 is the 1st action,1 is the 2nd action self.reward = np.empty((MEMORYSIZE, 1), dtype=np.float32) self.next_state = np.empty((MEMORYSIZE, 4), dtype=np.float32) self.terminal = np.empty((MEMORYSIZE, 1), dtype=np.bool) self.index = 0 self.full_memory = False def memory_in(self, previous_state, action, reward, next_state, terminal): self.previous_state[self.index] = previous_state self.action[self.index] = action self.reward[self.index] = reward self.next_state[self.index] = next_state self.terminal[self.index] = terminal self.index += 1 if self.index == MEMORYSIZE: self.index = 0 self.full_memory = True def memory_out(self,size_minibatch): minib_previous_state = [] minib_action = [] minib_reward = [] minib_next_state = [] minib_terminal = [] if self.full_memory: index_sample = np.random.randint(0,MEMORYSIZE,size=size_minibatch).tolist() else: index_sample = np.random.randint(0, self.index, size=size_minibatch).tolist() for i in index_sample: minib_previous_state.append(self.previous_state[i]) minib_action.append(self.action[i]) minib_reward.append(self.reward[i]) minib_next_state.append(self.next_state[i]) minib_terminal.append(self.terminal[i]) rs_minib_previous_state=np.asarray(minib_previous_state) rs_minib_action=np.asarray(minib_action) rs_minib_reward=np.asarray(minib_reward) rs_minib_next_state=np.asarray(minib_next_state) rs_minib_terminal=np.asarray(minib_terminal) #return 5 np_mats with shape(size_minibatch,num_fea) return rs_minib_previous_state, rs_minib_action, rs_minib_reward, rs_minib_next_state, rs_minib_terminal def test_mempry_in(self): for i in range(100): self.memory_in([1., 1., 1., 1.], [0], [0.1], [1., 1., 1., 1.], [False]) #self.memory_in([1., 1., 1., 1.], [1], [0.1], [1., 1., 1., 1.], [False]) #self.memory_in([1., 1, 1., 1.], [0], [-1], [1., 1., 1., 1.], [True]) #test#test#test#test#test#test#test#test#test#test#test#test ''' if __name__ == "__main__": rm = Replay_memory() for i in range(10): rm.memory_in((1., 2., 3., 4.), [1], [0.1], [1., 2., 3., 4.], True) rm.memory_in((2, 2, 3, 4), [0], [0.1], [2, 2, 3, 4], False) rm.memory_in((3, 2, 3, 4), [1], [0.1], [3, 2, 3, 4], False) s,a,r,ss,t = rm.memory_out(32) print ss ''' if __name__ == "__main__": rm = Replay_memory() rm.test_mempry_in() s,a,r,ss,t = rm.memory_out(32) print ss
Replay_memory主要有两个方法:memory_in和memory_out,分类用来往Replay_memory放置经验和从Replay_memory中提取经验。为了效率,使用numpy数组实现,而没有使用deque。方法test_mempry_in是测试时使用的,使用该方法后,将加载一些经验进入Replay_memory。
===========第二个文件 nn.py============================
import tensorflow as tf import math class Fcnn: def __init__(self): self.batch_size = 32 self.h1_size = 20 self.input = tf.placeholder(tf.float32, [None,4]) self.action = tf.placeholder(tf.uint8,[None,1]) self.create_and_init_var(self.h1_size) self.Q_net_forward() self.var_list = [self.Q_net_l1_w,self.Q_net_l1_b,self.Q_net_l2_w,self.Q_net_l2_b] def create_and_init_var(self,H1_SIZE): self.Q_net_l1_w = self.init_w([4, H1_SIZE], 0.01) self.Q_net_l1_b = self.init_b([H1_SIZE]) self.Q_net_l2_w = self.init_w([H1_SIZE, 2], 0.01) self.Q_net_l2_b = self.init_b([2]) def test_create_and_init_var(self,H1_SIZE): self.Q_net_l1_w = self.test_init_w_1([4, H1_SIZE], 0.001) self.Q_net_l1_b = self.test_init_b_1([H1_SIZE]) self.Q_net_l2_w = self.test_init_w_1([H1_SIZE, 2], 0.01) self.Q_net_l2_b = self.test_init_b_1([2]) def Q_net_forward(self): fc1 = tf.nn.relu(tf.matmul(self.input,self.Q_net_l1_w)+self.Q_net_l1_b) Q_value = tf.matmul(fc1, self.Q_net_l2_w) + self.Q_net_l2_b#shape is [batch_size,2] self.TEST_Q_value = Q_value self.Q_action = tf.expand_dims(tf.arg_max(Q_value,dimension=1),dim=1) #shape is [batch_size,1] self.Q_value = tf.reduce_sum(tf.multiply(Q_value,tf.one_hot(tf.squeeze(self.action,squeeze_dims=[1]),2)),reduction_indices=1) #shape is [batch_size] def init_w(self, shape,stddev): return tf.Variable(tf.truncated_normal(shape,stddev=stddev)) def init_b(self, shape): return tf.Variable(tf.ones(shape)*0.01) def test_init_w_1(self,shape,stddev): return tf.Variable(tf.ones(shape)) def test_init_b_1(self,shape): return tf.Variable(tf.ones(shape)) #test#test#test#test#test#test#test#test if __name__ == "__main__": import numpy as np a = Fcnn() sess = tf.Session() init = tf.initialize_all_variables() sess.run(init) print sess.run([a.Q_action,a.TEST_Q_value],feed_dict={a.input:np.array([[-1,-1,-1,-1],[1,1,1,1]]).reshape([2,4])}) #print sess.run([a.Q_value,a.TEST_Q_value],feed_dict={a.input:np.array([1,2,3,4]).reshape([1,4]),a.action:np.array([0]).reshape([1,1])}) sess.close()
类Fcnn 是构造一个全链接的神经网络,并实现两张计算图(有公共的部分),一张图是输入状态,得到Q值最大的动作序号,另一张图是输入状态和动作序号,得到对应动作序号的Q值。以‘test’开头的方法都是测试时使用的,先不要管。
=======================第三个文件 double_dqn.py===========================
from nn import Fcnn import tensorflow as tf class Double_dqn: def __init__(self): self.a_net = Fcnn() self.b_net = Fcnn() self.gamma = 0.90 self.reward = tf.placeholder(tf.float32,[None,1]) self.terminal = tf.placeholder(tf.bool,[None,1]) self.update_a_net() self.update_b_net() def update_a_net(self): self.a_action_next_state = self.a_net.Q_action #shape is [batch_size,1] a_q_value = self.a_net.Q_value #shape is [batch_size] a_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.b_net.Q_value) #shape is [batch_size,1] self.a_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(a_td_value - a_q_value)),-1.,1.) #self.a_net_cost = tf.reduce_mean(tf.square(a_td_value - a_q_value)) def update_b_net(self): self.b_action_next_state = self.b_net.Q_action #shape is [batch_size,1] b_q_value = self.b_net.Q_value # shape is [batch_size] b_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.a_net.Q_value) #shape is [batch_size] self.b_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(b_td_value - b_q_value)),-1.,1.) #self.b_net_cost = tf.reduce_mean(tf.square(b_td_value - b_q_value)) #test#test#test#test#test#test#test#test if __name__ == "__main__": from replay_memory import Replay_memory rm = Replay_memory() rm.test_mempry_in() DD = Double_dqn() s,a,r,ss,t = rm.memory_out(32) sess = tf.Session() init = tf.initialize_all_variables() sess.run(init) td_action = sess.run(DD.a_action_next_state,feed_dict={ DD.a_net.input:ss}) print sess.run(DD.b_net_cost,feed_dict={ DD.a_net.input: s, DD.a_net.action: a, DD.b_net.input: ss, DD.b_net.action: td_action, DD.reward: r, DD.terminal: t}) sess.close()
类Double_dqn对于每一个网络都实现两个计算图,分别是
1 根据经验回放里的s‘,计算Q值网络对应的动作序号,作为TD网络的动作,对应self.a_action_next_state和self.b_action_next_state
2 根据经验回放里的s,a,r,s‘,t以及上面计算的出的TD动作的序号a‘,更新Q值网络,对应self.a_net_cost和self.b_net_cost
======================================第四个文件train.py=================
import tensorflow as tf from double_dqn import Double_dqn from replay_memory import Replay_memory import gym import random import numpy as np class Train: def __init__(self): self.START_E_GREEDY = 0.6 self.END_E_GREEDY = 0.98 self.LEARN_RATE = 0.0001 self.POSITIVE_REWARD = 0.01 self.NEGATIVE_REWARD = -1. self.rm = Replay_memory() self.DD = Double_dqn() self.e = self.START_E_GREEDY self.sess = tf.Session() self.a_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.a_net_cost, var_list=self.DD.a_net.var_list) self.b_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.b_net_cost, var_list=self.DD.b_net.var_list) self.sess.run(tf.global_variables_initializer()) self.add_summary() self.merged = tf.summary.merge_all() self.writer = tf.summary.FileWriter('/home/wd/tf/summary') self.env = gym.make('CartPole-v0') self.done = False def a_generate_memory(self,observation): action = self.sess.run(self.DD.a_net.Q_action, feed_dict={self.DD.a_net.input:np.asarray(observation).reshape((1,4))}) greedy_action = self.egreedy_action(action)#output is int observation_next, _, self.done, __ = self.env.step(greedy_action) self.rm.memory_in(observation, greedy_action, self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD, observation_next, self.done) return observation_next def b_generate_memory(self,observation): action = self.sess.run(self.DD.b_net.Q_action, feed_dict={self.DD.b_net.input:np.asarray(observation).reshape((1,4))}) greedy_action = self.egreedy_action(action)#output is int observation_next, _, self.done, __ = self.env.step(greedy_action) self.rm.memory_in(observation, greedy_action, self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD, observation_next, self.done) return observation_next def a_train_model(self): s, a, r, ss, t = self.rm.memory_out(32) a_td_action = self.sess.run(self.DD.a_action_next_state, feed_dict={self.DD.a_net.input: ss}) summary,_ = self.sess.run([self.merged,self.a_adam_train_step], feed_dict={self.DD.a_net.input: s, self.DD.a_net.action: a, self.DD.b_net.input: ss, self.DD.b_net.action: a_td_action, self.DD.reward: r, self.DD.terminal: t}) return summary def b_train_model(self): s, a, r, ss, t = self.rm.memory_out(32) b_td_action = self.sess.run(self.DD.b_action_next_state, feed_dict={self.DD.b_net.input: ss}) summary,_ = self.sess.run([self.merged,self.b_adam_train_step], feed_dict={self.DD.b_net.input: s, self.DD.b_net.action: a, self.DD.a_net.input: ss, self.DD.a_net.action: b_td_action, self.DD.reward: r, self.DD.terminal: t}) return summary def egreedy_action(self,action):#output is int,input shape is [1,1] ee = random.random() if ee < self.e: return action[0,0] else: return random.randint(0,1) def e_decay(self): if self.e < self.END_E_GREEDY: self.e += (self.END_E_GREEDY - self.START_E_GREEDY)/2000# 0.98-0.5 =0.48 def session_close(self): self.sess.close() def variable_summary(self,var): mean = tf.reduce_mean(var) tf.summary.scalar('mean',mean) def add_summary(self): with tf.name_scope('a_net'): #with tf.name_scope('w1'): #self.variable_summary(self.DD.a_net.Q_net_l1_w) tf.summary.scalar('q_value',tf.reduce_mean(self.DD.a_net.Q_value)) tf.summary.scalar('diff_div_q_value',(tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value))/tf.reduce_mean(self.DD.a_net.Q_value)) tf.summary.scalar('diff_q_value',tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value)) with tf.name_scope('dd_net'): tf.summary.scalar('a_cost', self.DD.a_net_cost) tt = Train() count = 0 iters = 0 for i in range(10000): tt.e_decay() observation = tt.env.reset() if i % 100 == 0: print i,'iterations' print 'average live time is :',count/100 count = 0 for j in range(10000): count += 2 if i >8000: tt.env.render() observation = tt.a_generate_memory(observation) summary = tt.a_train_model() if j % 10 == 0: tt.writer.add_summary(summary,iters) iters += 1 if tt.done: break if i > 8000: tt.env.render() observation = tt.b_generate_memory(observation) summary = tt.b_train_model() if j % 10 == 0: tt.writer.add_summary(summary, iters) iters += 1 if tt.done: break print iters tt.session_close()
类Train主要实现了4个方法,分别是
1:a_generate_memory用A网络感知环境并形成经验
2:b_generate_memory用B网络感知环境并形成经验
3:a_train_model从经验回放中取出经验训练A网络
4:b_train_model从经验回放中取出经验训练B网络
另外,还使用了egreedy和e_decay(其实e是增加的)
一般在1500次episodes训练后,就能一直保持在200(这个游戏的最大动作次数)了
相关文章推荐
- Continuous control with deep reinforcement learning(DDPG,深度确定策略梯度)练习
- 论文笔记之:Deep Reinforcement Learning with Double Q-learning
- Deep Q-Network,NIPS-2013:Playing Atari with Deep Reinforcement Learning
- 论文笔记之:Action-Decision Networks for Visual Tracking with Deep Reinforcement Learning
- Playing Atari with Deep Reinforcement Learning
- Deep Reinforcement Learning 基础知识(DQN方面)
- 从强化学习Reinforcement Learning到DQN(Deep Q-learning Network)学习笔记
- 深度增强学习Deep Reinforcement Learning (DQN方面)
- [Tue, 11 Aug 2015 ~ Mon, 17 Aug 2015] Deep Learning in arxiv
- [10 Jun 2015 ~ 11 Jun 2015] Deep Learning in arxiv
- Deep Reinforcement Learning-based Image Captioning with Embedding Reward
- Deep Reinforcement Learning-based Image Captioning with Embedding Reward
- 零基础10分钟运行DQN图文教程 Playing Flappy Bird Using Deep Reinforcement Learning (Based on Deep Q Learning DQN
- Deep Reinforcement Learning 基础知识(DQN方面)
- [Wed, 19 Aug 2015 ~ Tue, 25 Aug 2015] Deep Learning in arxiv
- 论文笔记之:Playing Atari with Deep Reinforcement Learning
- 【DQN】解析 DeepMind 深度强化学习 (Deep Reinforcement Learning) 技术
- [Tue, 21 Jul 2015 ~ Mon, 27 Jul 2015] Deep Learning in arxiv
- [Tue, 1 Dec 2015 ~ Fri, 4 Dec 2015] Deep Learning in arxiv
- Deep Reinforcement Learning 基础知识(DQN方面)