论文: Learning Multi-Level Hierarchies with Hindsight





self.observation_space = spaces.Box(low=self.low_state, high=self.high_state, dtype=np.float32)
[position, velocity] between min value = [-1.2, -0.07] and max value = [0.6, 0.07],
self.action_space = spaces.Box(low=self.min_action, high=self.max_action,shape=(1,), dtype=np.float32)
the action space is between [-1, 1]

action = ( network output (Tanh) * bounds ) + offset
对于action space:
the action space is between (-1, 1), and as the mean value [ (1 + (-1)) / 2 ] is 0 we do not require an offset, and the value of bound = 1, since our network only outputs between (-1, 1), so,
action = ( network output (Tanh) * bounds ) + offset
i.e action = (network output * 1) + 0
对于state space:
here the position variable (-1.2, 0.6) is NOT normalised to (-1,1) and its mean value
[ (0.6 + (-1.2)) / 2 ] is 0.3
action = ( network output (Tanh) * bounds ) + offset
for position variable:
action = (network output * 0.9) + 0.3
this bounds the value of the action to (-1.2, 0.6)

similarly, the velocity variable (-0.07, 0.07) is NOT normalised to (-1,1) and its mean value [ (0.6 + (-1.2)) / 2 ] is 0, so,for velocity variable:
action = (network output * 0.07) + 0
this bounds the value of the action to (-0.07, 0.07)


接下来创建HAC agent并设置相关参数,前面定义了这是一个两层的网络,因此HAC类下的HAC属性为一个包含两个DDPG的列表。

class HAC:def __init__(self, k_level, H, state_dim, action_dim, render, threshold, action_bounds, action_offset, state_bounds, state_offset, lr):# adding lowest levelself.HAC = [DDPG(state_dim, action_dim, action_bounds, action_offset, lr, H)]self.replay_buffer = [ReplayBuffer()]# adding remaining levelsfor _ in range(k_level-1):self.HAC.append(DDPG(state_dim, state_dim, state_bounds, state_offset, lr, H))self.replay_buffer.append(ReplayBuffer())# set some parametersself.k_level = k_levelself.H = Hself.action_dim = action_dimself.state_dim = state_dimself.threshold = thresholdself.render = render# logging parametersself.goals = [None]*self.k_levelself.reward = 0self.timestep = 0def set_parameters(self, lamda, gamma, action_clip_low, action_clip_high, state_clip_low, state_clip_high, exploration_action_noise, exploration_state_noise):self.lamda = lamdaself.gamma = gammaself.action_clip_low = action_clip_lowself.action_clip_high = action_clip_highself.state_clip_low = state_clip_lowself.state_clip_high = state_clip_highself.exploration_action_noise = exploration_action_noiseself.exploration_state_noise = exploration_state_noise


# training procedure for i_episode in range(1, max_episodes+1):agent.reward = 0agent.timestep = 0state = env.reset()# collecting experience in environmentlast_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False)if agent.check_goal(last_state, goal_state, threshold):print("################ Solved! ################ ")name = filename + '_solved'agent.save(directory, name)# update all levelsagent.update(n_iter, batch_size)# logging updates:log_f.write('{},{}\n'.format(i_episode, agent.reward))log_f.flush()if i_episode % save_episode == 0:agent.save(directory, filename)print("Episode: {}\t Reward: {}".format(i_episode, agent.reward))



last_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False)
    def run_HAC(self, env, i_level, state, goal, is_subgoal_test):next_state = Nonedone = Nonegoal_transitions = []# logging updatesself.goals[i_level] = goal# H attemptsfor _ in range(self.H):# if this is a subgoal test, then next/lower level goal has to be a subgoal testis_next_subgoal_test = is_subgoal_testaction = self.HAC[i_level].select_action(state, goal)#   <================ high level policy ================>if i_level > 0:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2:  # p=0.8 exploratopmaction = action + np.random.normal(0, self.exploration_state_noise)action = action.clip(self.state_clip_low, self.state_clip_high)else:  # p=0.2 random sampleaction = np.random.uniform(self.state_clip_low, self.state_clip_high)# Determine whether to test subgoal (action)if np.random.random_sample() < self.lamda:is_next_subgoal_test = True# Pass subgoal to lower level next_state, done = self.run_HAC(env, i_level-1, state, action, is_next_subgoal_test)# if subgoal was tested but not achieved, add subgoal testing transitionif is_next_subgoal_test and not self.check_goal(action, next_state, self.threshold):self.replay_buffer[i_level].add((state, action, -self.H, next_state, goal, 0.0, float(done)))# for hindsight action transitionaction = next_state#   <================ low level policy ================>else:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2:action = action + np.random.normal(0, self.exploration_action_noise)action = action.clip(self.action_clip_low, self.action_clip_high)else:action = np.random.uniform(self.action_clip_low, self.action_clip_high)# take primitive actionnext_state, rew, done, _ = env.step(action)if self.render:# env.render() ##########if self.k_level == 2:env.unwrapped.render_goal(self.goals[0], self.goals[1])elif self.k_level == 3:env.unwrapped.render_goal_2(self.goals[0], self.goals[1], self.goals[2])# this is for loggingself.reward += rewself.timestep +=1#   <================ finish one step/transition ================># check if goal is achievedgoal_achieved = self.check_goal(next_state, goal, self.threshold)# hindsight action transitionif goal_achieved:self.replay_buffer[i_level].add((state, action, 0.0, next_state, goal, 0.0, float(done)))else:self.replay_buffer[i_level].add((state, action, -1.0, next_state, goal, self.gamma, float(done)))# copy for goal transitiongoal_transitions.append([state, action, -1.0, next_state, None, self.gamma, float(done)])state = next_stateif done or goal_achieved:break#   <================ finish H attempts ================># hindsight goal transition# last transition reward and discount is 0goal_transitions[-1][2] = 0.0goal_transitions[-1][5] = 0.0for transition in goal_transitions:# last state is goal for all transitionstransition[4] = next_stateself.replay_buffer[i_level].add(tuple(transition))return next_state, done


agent.check_goal(last_state, goal_state, threshold)
   def check_goal(self, state, goal, threshold):for i in range(self.state_dim):if abs(state[i]-goal[i]) > threshold[i]:return Falsereturn True


agent.update(n_iter, batch_size)


    def update(self, n_iter, batch_size):for i in range(self.k_level):self.HAC[i].update(self.replay_buffer[i], n_iter, batch_size)


    def update(self, buffer, n_iter, batch_size):for i in range(n_iter):# Sample a batch of transitions from replay buffer:state, action, reward, next_state, goal, gamma, done = buffer.sample(batch_size)# convert np arrays into tensorsstate = torch.FloatTensor(state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).reshape((batch_size,1)).to(device)next_state = torch.FloatTensor(next_state).to(device)goal = torch.FloatTensor(goal).to(device)gamma = torch.FloatTensor(gamma).reshape((batch_size,1)).to(device)done = torch.FloatTensor(done).reshape((batch_size,1)).to(device)# select next actionnext_action = self.actor(next_state, goal).detach()# Compute target Q-value:target_Q = self.critic(next_state, next_action, goal).detach()target_Q = reward + ((1-done) * gamma * target_Q)# Optimize Critic:critic_loss = self.mseLoss(self.critic(state, action, goal), target_Q)self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# Compute actor loss:actor_loss = -self.critic(state, self.actor(state, goal), goal).mean()# Optimize the actorself.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()



