您的位置:首页 > 其它

(Deep Reinforcement Learning with Double Q-learning, H. van Hasselt et al., arXiv, 2015)(dqn)练习

2017-05-23 17:01 513 查看

代码使用python 2.x版本 ,tensorflow 使用1.1(cpu)版本

论文地址:https://arxiv.org/abs/1509.06461

===============第一个文件 replay_memory.py================================

import numpy as np

MEMORYSIZE = 600000

class Replay_memory:
def __init__(self):

self.previous_state = np.empty((MEMORYSIZE, 4), dtype=np.float32)
self.action = np.empty((MEMORYSIZE, 1), dtype=np.uint8)#0 is the 1st action,1 is the 2nd action
self.reward = np.empty((MEMORYSIZE, 1), dtype=np.float32)
self.next_state = np.empty((MEMORYSIZE, 4), dtype=np.float32)
self.terminal = np.empty((MEMORYSIZE, 1), dtype=np.bool)

self.index = 0
self.full_memory = False

def memory_in(self, previous_state, action, reward, next_state, terminal):
self.previous_state[self.index] = previous_state
self.action[self.index] = action
self.reward[self.index] = reward
self.next_state[self.index] = next_state

self.terminal[self.index] = terminal

self.index += 1
if self.index == MEMORYSIZE:
self.index = 0
self.full_memory = True

def memory_out(self,size_minibatch):
minib_previous_state = []
minib_action = []
minib_reward = []
minib_next_state = []
minib_terminal = []

if self.full_memory:
index_sample = np.random.randint(0,MEMORYSIZE,size=size_minibatch).tolist()
else:
index_sample = np.random.randint(0, self.index, size=size_minibatch).tolist()

for i in index_sample:
minib_previous_state.append(self.previous_state[i])
minib_action.append(self.action[i])
minib_reward.append(self.reward[i])
minib_next_state.append(self.next_state[i])
minib_terminal.append(self.terminal[i])

rs_minib_previous_state=np.asarray(minib_previous_state)
rs_minib_action=np.asarray(minib_action)
rs_minib_reward=np.asarray(minib_reward)
rs_minib_next_state=np.asarray(minib_next_state)
rs_minib_terminal=np.asarray(minib_terminal)
#return 5 np_mats with shape(size_minibatch,num_fea)
return rs_minib_previous_state, rs_minib_action, rs_minib_reward, rs_minib_next_state, rs_minib_terminal

def test_mempry_in(self):
for i in range(100):
self.memory_in([1., 1., 1., 1.], [0], [0.1], [1., 1., 1., 1.], [False])
#self.memory_in([1., 1., 1., 1.], [1], [0.1], [1., 1., 1., 1.], [False])
#self.memory_in([1., 1, 1., 1.], [0], [-1], [1., 1., 1., 1.], [True])

#test#test#test#test#test#test#test#test#test#test#test#test
'''
if __name__ == "__main__":
rm = Replay_memory()
for i in range(10):
rm.memory_in((1., 2., 3., 4.), [1], [0.1], [1., 2., 3., 4.], True)
rm.memory_in((2, 2, 3, 4), [0], [0.1], [2, 2, 3, 4], False)
rm.memory_in((3, 2, 3, 4), [1], [0.1], [3, 2, 3, 4], False)
s,a,r,ss,t = rm.memory_out(32)
print ss

'''

if __name__ == "__main__":
rm = Replay_memory()
rm.test_mempry_in()
s,a,r,ss,t = rm.memory_out(32)
print ss


Replay_memory主要有两个方法:memory_in和memory_out,分类用来往Replay_memory放置经验和从Replay_memory中提取经验。为了效率,使用numpy数组实现,而没有使用deque。方法test_mempry_in是测试时使用的,使用该方法后,将加载一些经验进入Replay_memory。

===========第二个文件 nn.py============================

import tensorflow as tf
import math

class Fcnn:
def __init__(self):
self.batch_size = 32
self.h1_size = 20

self.input = tf.placeholder(tf.float32, [None,4])
self.action = tf.placeholder(tf.uint8,[None,1])

self.create_and_init_var(self.h1_size)
self.Q_net_forward()

self.var_list = [self.Q_net_l1_w,self.Q_net_l1_b,self.Q_net_l2_w,self.Q_net_l2_b]

def create_and_init_var(self,H1_SIZE):
self.Q_net_l1_w = self.init_w([4, H1_SIZE], 0.01)
self.Q_net_l1_b = self.init_b([H1_SIZE])
self.Q_net_l2_w = self.init_w([H1_SIZE, 2], 0.01)
self.Q_net_l2_b = self.init_b([2])

def test_create_and_init_var(self,H1_SIZE):
self.Q_net_l1_w = self.test_init_w_1([4, H1_SIZE], 0.001)
self.Q_net_l1_b = self.test_init_b_1([H1_SIZE])
self.Q_net_l2_w = self.test_init_w_1([H1_SIZE, 2], 0.01)
self.Q_net_l2_b = self.test_init_b_1([2])

def Q_net_forward(self):
fc1 = tf.nn.relu(tf.matmul(self.input,self.Q_net_l1_w)+self.Q_net_l1_b)
Q_value = tf.matmul(fc1, self.Q_net_l2_w) + self.Q_net_l2_b#shape is [batch_size,2]
self.TEST_Q_value = Q_value
self.Q_action = tf.expand_dims(tf.arg_max(Q_value,dimension=1),dim=1) #shape is [batch_size,1]
self.Q_value = tf.reduce_sum(tf.multiply(Q_value,tf.one_hot(tf.squeeze(self.action,squeeze_dims=[1]),2)),reduction_indices=1)  #shape is [batch_size]

def init_w(self, shape,stddev):
return tf.Variable(tf.truncated_normal(shape,stddev=stddev))

def init_b(self, shape):
return tf.Variable(tf.ones(shape)*0.01)

def test_init_w_1(self,shape,stddev):
return tf.Variable(tf.ones(shape))

def test_init_b_1(self,shape):
return tf.Variable(tf.ones(shape))

#test#test#test#test#test#test#test#test
if __name__ == "__main__":
import numpy as np
a = Fcnn()

sess = tf.Session()
init = tf.initialize_all_variables()
sess.run(init)
print sess.run([a.Q_action,a.TEST_Q_value],feed_dict={a.input:np.array([[-1,-1,-1,-1],[1,1,1,1]]).reshape([2,4])})
#print sess.run([a.Q_value,a.TEST_Q_value],feed_dict={a.input:np.array([1,2,3,4]).reshape([1,4]),a.action:np.array([0]).reshape([1,1])})

sess.close()


类Fcnn 是构造一个全链接的神经网络,并实现两张计算图(有公共的部分),一张图是输入状态,得到Q值最大的动作序号,另一张图是输入状态和动作序号,得到对应动作序号的Q值。以‘test’开头的方法都是测试时使用的,先不要管。

=======================第三个文件 double_dqn.py===========================

from nn import Fcnn
import tensorflow as tf

class Double_dqn:
def __init__(self):

self.a_net = Fcnn()
self.b_net = Fcnn()
self.gamma = 0.90

self.reward = tf.placeholder(tf.float32,[None,1])
self.terminal = tf.placeholder(tf.bool,[None,1])

self.update_a_net()
self.update_b_net()

def update_a_net(self):

self.a_action_next_state = self.a_net.Q_action #shape is [batch_size,1]
a_q_value = self.a_net.Q_value   #shape is [batch_size]
a_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.b_net.Q_value)  #shape is [batch_size,1]
self.a_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(a_td_value - a_q_value)),-1.,1.)
#self.a_net_cost = tf.reduce_mean(tf.square(a_td_value - a_q_value))

def update_b_net(self):

self.b_action_next_state = self.b_net.Q_action  #shape is [batch_size,1]
b_q_value = self.b_net.Q_value  # shape is [batch_size]
b_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.a_net.Q_value)  #shape is [batch_size]
self.b_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(b_td_value - b_q_value)),-1.,1.)
#self.b_net_cost = tf.reduce_mean(tf.square(b_td_value - b_q_value))

#test#test#test#test#test#test#test#test
if __name__ == "__main__":
from replay_memory import Replay_memory
rm = Replay_memory()
rm.test_mempry_in()
DD = Double_dqn()

s,a,r,ss,t = rm.memory_out(32)
sess = tf.Session()
init = tf.initialize_all_variables()
sess.run(init)

td_action = sess.run(DD.a_action_next_state,feed_dict={
DD.a_net.input:ss})

print sess.run(DD.b_net_cost,feed_dict={
DD.a_net.input:  s,
DD.a_net.action: a,
DD.b_net.input:  ss,
DD.b_net.action: td_action,
DD.reward:       r,
DD.terminal:     t})
sess.close()


类Double_dqn对于每一个网络都实现两个计算图,分别是

1 根据经验回放里的s‘,计算Q值网络对应的动作序号,作为TD网络的动作,对应self.a_action_next_state和self.b_action_next_state

2 根据经验回放里的s,a,r,s‘,t以及上面计算的出的TD动作的序号a‘,更新Q值网络,对应self.a_net_cost和self.b_net_cost

======================================第四个文件train.py=================

import tensorflow as tf
from double_dqn import Double_dqn
from replay_memory import Replay_memory
import gym
import random
import numpy as np

class Train:
def __init__(self):
self.START_E_GREEDY = 0.6
self.END_E_GREEDY = 0.98
self.LEARN_RATE = 0.0001

self.POSITIVE_REWARD = 0.01
self.NEGATIVE_REWARD = -1.
self.rm = Replay_memory()
self.DD = Double_dqn()

self.e = self.START_E_GREEDY
self.sess = tf.Session()
self.a_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.a_net_cost,
var_list=self.DD.a_net.var_list)
self.b_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.b_net_cost,
var_list=self.DD.b_net.var_list)
self.sess.run(tf.global_variables_initializer())
self.add_summary()
self.merged = tf.summary.merge_all()
self.writer = tf.summary.FileWriter('/home/wd/tf/summary')
self.env = gym.make('CartPole-v0')
self.done = False

def a_generate_memory(self,observation):
action = self.sess.run(self.DD.a_net.Q_action,
feed_dict={self.DD.a_net.input:np.asarray(observation).reshape((1,4))})
greedy_action = self.egreedy_action(action)#output is int
observation_next, _, self.done, __ = self.env.step(greedy_action)
self.rm.memory_in(observation,
greedy_action,
self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD,
observation_next,
self.done)

return observation_next

def b_generate_memory(self,observation):
action = self.sess.run(self.DD.b_net.Q_action,
feed_dict={self.DD.b_net.input:np.asarray(observation).reshape((1,4))})
greedy_action = self.egreedy_action(action)#output is int
observation_next, _, self.done, __ = self.env.step(greedy_action)
self.rm.memory_in(observation,
greedy_action,
self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD,
observation_next,
self.done)

return observation_next

def a_train_model(self):
s, a, r, ss, t = self.rm.memory_out(32)
a_td_action = self.sess.run(self.DD.a_action_next_state, feed_dict={self.DD.a_net.input: ss})
summary,_ = self.sess.run([self.merged,self.a_adam_train_step],
feed_dict={self.DD.a_net.input:  s,
self.DD.a_net.action: a,
self.DD.b_net.input:  ss,
self.DD.b_net.action: a_td_action,
self.DD.reward:       r,
self.DD.terminal:     t})
return summary

def b_train_model(self):
s, a, r, ss, t = self.rm.memory_out(32)
b_td_action = self.sess.run(self.DD.b_action_next_state, feed_dict={self.DD.b_net.input: ss})
summary,_ = self.sess.run([self.merged,self.b_adam_train_step],
feed_dict={self.DD.b_net.input:  s,
self.DD.b_net.action: a,
self.DD.a_net.input:  ss,
self.DD.a_net.action: b_td_action,
self.DD.reward:       r,
self.DD.terminal:     t})
return summary
def egreedy_action(self,action):#output is int,input shape is [1,1]
ee = random.random()
if ee < self.e:
return action[0,0]
else:
return random.randint(0,1)

def e_decay(self):
if self.e < self.END_E_GREEDY:
self.e += (self.END_E_GREEDY - self.START_E_GREEDY)/2000# 0.98-0.5 =0.48

def session_close(self):
self.sess.close()

def variable_summary(self,var):
mean = tf.reduce_mean(var)
tf.summary.scalar('mean',mean)

def add_summary(self):
with tf.name_scope('a_net'):
#with tf.name_scope('w1'):
#self.variable_summary(self.DD.a_net.Q_net_l1_w)
tf.summary.scalar('q_value',tf.reduce_mean(self.DD.a_net.Q_value))
tf.summary.scalar('diff_div_q_value',(tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value))/tf.reduce_mean(self.DD.a_net.Q_value))
tf.summary.scalar('diff_q_value',tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value))

with tf.name_scope('dd_net'):
tf.summary.scalar('a_cost', self.DD.a_net_cost)

tt = Train()

count = 0
iters = 0
for i in range(10000):
tt.e_decay()
observation = tt.env.reset()
if i % 100 == 0:
print i,'iterations'
print 'average live time is :',count/100
count = 0
for j in range(10000):
count += 2
if i >8000:
tt.env.render()
observation = tt.a_generate_memory(observation)
summary = tt.a_train_model()
if j % 10 == 0:
tt.writer.add_summary(summary,iters)
iters += 1
if tt.done:
break

if i > 8000:
tt.env.render()
observation = tt.b_generate_memory(observation)
summary = tt.b_train_model()
if j % 10 == 0:
tt.writer.add_summary(summary, iters)
iters += 1
if tt.done:
break
print iters

tt.session_close()


类Train主要实现了4个方法,分别是

1:a_generate_memory用A网络感知环境并形成经验

2:b_generate_memory用B网络感知环境并形成经验

3:a_train_model从经验回放中取出经验训练A网络

4:b_train_model从经验回放中取出经验训练B网络

另外,还使用了egreedy和e_decay(其实e是增加的)

一般在1500次episodes训练后,就能一直保持在200(这个游戏的最大动作次数)了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: