环境和agent的基本框架

import randomclass Environment:# 环境初始化内部状态def __init__(self):self.steps_left = 10# 给agent返回对当前环境的观察def get_observation(self):return [0.0, 0.0, 0.0]# agent查询自己能执行的动作集def get_actions(self):return [0, 1]# agent片段结束的信号def is_done(self):return self.steps_left == 0# 动作 处理agent的动作和返回该动作的奖励def action(self, action):if self.is_done():raise Exception("Game is over")self.steps_left -= 1return random.random()class Agent:# 初始化计数器 保存片段中agent累计的总奖励def __init__(self):self.total_reward = 0.0# 观察环境# 基于观察决定动作# 向环境提交动作# 获取当前步骤的奖励def step(self, env):current_obs = env.get_observation()actions = env.get_actions()reward = env.action(random.choice(actions))self.total_reward += rewardif __name__ == "__main__":env = Environment()agent = Agent()while not env.is_done():agent.step(env)print("Total reward got: %.4f" % agent.total_reward)

OpenAI Gym API

动作空间

动作可以是离散的、连续的,或两者的组合
Gym定义了一个特殊的容器类,允许用户嵌入好几个动作组成一个动作

观察空间


Gym中Space类的层级

  • sample() 从该空间中返回随机样本
  • contains(x) 检验参数x是否属于空间
  • Discrete类表示一个互斥的元素集,例如Discrete(n=4)表示动作空间有4个方向
  • Box类表示有理数的n维张量,在[low,high]之间,比如油门的0.0~1.0之间的值
  • Tuple类将不同的Space实例组合起来,例如Tuple(spaces=(Box(low=-1.0, high=1.0, dtype=np.float32), Discrete(n=3), Discrete(n=2)))

环境

环境包括:

  • action_space:Space类的一个字段,限定了环境中允许执行的动作
  • observation_space:Space类的一个字段,限定了环境中允许出现的观察
  • reset():重置环境到初始状态,返回一个初始观察的向量
  • step():agent执行动作,并返回动作结果的信息–下一个观察、奖励以及片段是否结束的标记

示例

import gymif __name__ == "__main__":env = gym.make("CartPole-v0")total_reward = 0.0total_steps = 0obs = env.reset()while True:action = env.action_space.sample() # 随机采样一个动作obs, reward, done, _ = env.step(action) # 返回值total_reward += reward # 累计奖励total_steps += 1 # 执行步数if done: # 片段结束标记breakprint("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))

包装器Wrapper

Wrapper类将现有的环境包装起来并附加一些额外的逻辑

Wrapper继承自Env类,参数为被包装的Env类的实例

  • ObservationWrapper:重新定义父类的observation(obs)方法
  • RewardWrapper:修改给予智能体的奖励值
  • ActionWrapper:修改agent传给被包装环境的动作

下面以10%的概率干扰agent的动作流

import gymimport randomclass RandomActionWrapper(gym.ActionWrapper):# 以ActionWrapper为父类def __init__(self, env, epsilon=0.1):super(RandomActionWrapper, self).__init__(env)self.epsilon = epsilondef action(self, action):if random.random() < self.epsilon:print("Random!")return self.env.action_space.sample()return actionif __name__ == "__main__":env = RandomActionWrapper(gym.make("CartPole-v0"))obs = env.reset()total_reward = 0.0while True:obs, reward, done, _ = env.step(0)total_reward += rewardif done:breakprint("Reward got: %.2f" % total_reward)

监控器Monitor

Monitor将agent的性能信息写入文件

import gymif __name__ == "__main__":env = gym.make("CartPole-v0")env = gym.wrappers.Monitor(env, "存放的目录")total_reward = 0.0total_steps = 0obs = env.reset()while True:action = env.action_space.sample()obs, reward, done, _ = env.step(action)total_reward += rewardtotal_steps += 1if done:breakprint("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))env.close()env.env.close()

pytorch

在远程服务器运行TensorBoard时,添加--bind_all可从外部访问
使用python Ignite库使代码更加简洁、更具可拓展性

交叉熵方法

交叉熵比较容易收敛,如果环境很简单,没有复杂多样的策略需要探索及学习,也不是片段很短又有很多奖励的话,那么该方法通常表现很好。

在RL中,无模型表示该方法不构建环境或奖励的模型,直接将观察和动作(或者和动作相关的价值)连接起来,也就是说agent对当前的观察结果进行计算,计算结果就是它应该采取的动作。
而基于模型的RL,试图预测下一个观察或奖励是什么,根据预测,agent试图选择最好的动作来执行。

基于策略的方法直接计算agent的策略,策略通常被表示成可用动作的概率分布,即agent在每一步应该执行什么动作。
基于价值的方法agent计算每个可能动作的价值,然后选择最大价值的动作,而不是计算动作的概率。

交叉熵方法是无模型的、基于策略的在线策略的方法。

在实践中,策略通常表示为动作的概率分布,这和分类问题有点像,类型的数量和可能要执行的动作数量相同。
将agent的经历表示成片段,并且已经经过若干轮片段了,并计算每一片段的总奖励。由于环境的随机性以及agent选择动作的不同,某些片段表现会较好。交叉熵的核心就是将差的片段丢掉,并用好的片段来训练。步骤如下:

  1. 使用当前的模型和环境产生N次片段
  2. 计算每个片段的总奖励,并确定奖励边界,通常使用总奖励的百分位,例如第50,75百分位数。
  3. 将奖励在边界以下的片段丢掉
  4. 用观察值为输入,agent产生的动作作为目标输出,训练剩余的片段。
  5. 从第1步开始重复,一直到出现满意的结果。

CartPole交叉熵实现

#!/usr/bin/env python3
import gym
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriterimport torch
import torch.nn as nn
import torch.optim as optimHIDDEN_SIZE = 128  # 隐藏层神经元数量
BATCH_SIZE = 16  # 每次迭代训练的片段数
PERCENTILE = 70  # 奖励边界class Net(nn.Module):def __init__(self, obs_size, hidden_size, n_actions):super(Net, self).__init__()self.net = nn.Sequential(nn.Linear(obs_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, n_actions))def forward(self, x):return self.net(x)# 定义了两个命名元组类型的帮助类
# episode:agent在环境里面执行某个策略从开始到结束这一过程 片段
# Episode:单个片段 保存了总的无衰减奖励以及EpisodeStep集合
# EpisodeStep:表示智能体在片段中执行的一步,同时保存来自环境的观察以及agent执行的动作 后续的训练会用到
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])# 用片段生成批
def iterate_batches(env, net, batch_size):batch = []  # 累计批 episode实例的列表episode_reward = 0.0episode_steps = []obs = env.reset()sm = nn.Softmax(dim=1)  # 将NN的输出转换成动作的概率分布while True:obs_v = torch.FloatTensor([obs])act_probs_v = sm(net(obs_v))  # NN输出一个动作分数 然后使用Softmax函数处理# 返回包含梯度的张量 访问tensor.data提取数据 然后转换成numpy数组# 数组是二维结构,0轴是批的维度,所以提取第一个元素 得到动作概率的一维向量act_probs = act_probs_v.data.numpy()[0]# 对分布进行采样 得到动作action = np.random.choice(len(act_probs), p=act_probs)next_obs, reward, is_done, _ = env.step(action)episode_reward += reward# 保存用来选择动作的观察episode_steps.append(EpisodeStep(observation=obs, action=action))if is_done:batch.append(Episode(reward=episode_reward, steps=episode_steps))episode_reward = 0.0episode_steps = []next_obs = env.reset()if len(batch) == batch_size:yield batch  # yield把函数变成一个生成器batch = []# 更新观察 然后obs传给NN 继续重复obs = next_obs
# NN的训练和片段的生成是同时进行的 累计16个片段之后 调用方会使用梯度下降来训练NN
# 所以每当yield返回时,NN理论上都会有点进步 # 交叉熵核心函数
# 根据给定的一批片段和百分位值计算奖励奖励边界 淘汰一批片段
def filter_batch(batch, percentile):rewards = list(map(lambda s: s.reward, batch))# percentile根据给定的值列表和百分位计算百分位的值reward_bound = np.percentile(rewards, percentile)reward_mean = float(np.mean(rewards))  # 求平均奖励用于监控train_obs = []train_act = []for example in batch:if example.reward < reward_bound:continue# 如果奖励值高于边界 将观察和动作添加到要训练的列表中# map()分别以序列元素作为函数输入# lamba()函数速写 类似lamba x,y:x+ytrain_obs.extend(map(lambda step: step.observation, example.steps))train_act.extend(map(lambda step: step.action, example.steps))train_obs_v = torch.FloatTensor(train_obs)train_act_v = torch.LongTensor(train_act)return train_obs_v, train_act_v, reward_bound, reward_mean# 训练循环
if __name__ == "__main__":env = gym.make("CartPole-v0")# env = gym.wrappers.Monitor(env, directory="mon", force=True)obs_size = env.observation_space.shape[0]n_actions = env.action_space.nnet = Net(obs_size, HIDDEN_SIZE, n_actions)objective = nn.CrossEntropyLoss()  # 交叉熵损失函数optimizer = optim.Adam(params=net.parameters(), lr=0.01)writer = SummaryWriter(comment="-cartpole")# 迭代片段批次for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):# 用奖励边界过滤obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)optimizer.zero_grad()action_scores_v = net(obs_v)# 将动作的分数集传给objective函数 计算NN的输出和agent执行的动作之间的交叉熵# objective = nn.CrossEntropyLoss()loss_v = objective(action_scores_v, acts_v)loss_v.backward()optimizer.step()print("%d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (iter_no, loss_v.item(), reward_m, reward_b))writer.add_scalar("loss", loss_v.item(), iter_no)writer.add_scalar("reward_bound", reward_b, iter_no)writer.add_scalar("reward_mean", reward_m, iter_no)# 检查每一批片段的平均奖励>199 表示问题解决if reward_m > 199:print("Solved!")breakwriter.close()

FrozenLake交叉熵实现

FrozenLake就是网格世界类型的环境。

在FrozenLake环境中,只有到达终点才能获得1.0奖励,所以只有两种可能的片段,奖励为0的失败片段和奖励为1的成功片段,而且失败的片段在训练的开始会占主导地位,会导致训练失败。

这也是交叉熵方法的限制:

  • 对于训练来说,片段必须是有限的、优秀的、简短的
  • 片段的总奖励应该有足够的差异来区分好的片段和差的片段
  • 没有中间值来表明agent成功了还是失败了

所以,实现FrozenLake的代码与CartPole有所不同:

  • 每批次包含更多的片段:相对于CartPole的每批次16片段,FrozenLake起码需要每批次100片段
  • 对奖励使用衰减系数:这让总奖励考虑了片段的长度,较短的片段将比较长的片段得到更高的奖励,这增加了奖励分布的多样性
  • 让优秀的片段保存更长的时间:在CartPole训练中,每次只取最优秀的片段训练,然后丢弃。在FrozenLake中,将成功的片段保存并训练几次迭代
  • 降低学习率:让NN有机会来平均更多的训练样本
  • 更长的训练时间:由于成功片段的稀有性以及动作结果的随机性,要将NN训练片段的成功率达到50%,起码需要5000次迭代

修改的代码如下

#!/usr/bin/env python3
import random
import gym
import gym.spaces
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriterimport torch
import torch.nn as nn
import torch.optim as optimHIDDEN_SIZE = 128
BATCH_SIZE = 100
PERCENTILE = 30
GAMMA = 0.9# 神经网络需要一个数字向量输入 所以对离散的输入进行one-hot编码
# 所以网络的输入是16个浮点数 除了要编码的位置是1 其他位置都是0
class DiscreteOneHotWrapper(gym.ObservationWrapper):def __init__(self, env):super(DiscreteOneHotWrapper, self).__init__(env)assert isinstance(env.observation_space, gym.spaces.Discrete)self.observation_space = gym.spaces.Box(0.0, 1.0, (env.observation_space.n, ), dtype=np.float32)def observation(self, observation):res = np.copy(self.observation_space.low)res[observation] = 1.0return resclass Net(nn.Module):def __init__(self, obs_size, hidden_size, n_actions):super(Net, self).__init__()self.net = nn.Sequential(nn.Linear(obs_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, n_actions))def forward(self, x):return self.net(x)Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])def iterate_batches(env, net, batch_size):batch = []episode_reward = 0.0episode_steps = []obs = env.reset()sm = nn.Softmax(dim=1)while True:obs_v = torch.FloatTensor([obs])act_probs_v = sm(net(obs_v))act_probs = act_probs_v.data.numpy()[0]action = np.random.choice(len(act_probs), p=act_probs)next_obs, reward, is_done, _ = env.step(action)episode_reward += rewardepisode_steps.append(EpisodeStep(observation=obs, action=action))if is_done:batch.append(Episode(reward=episode_reward, steps=episode_steps))episode_reward = 0.0episode_steps = []next_obs = env.reset()if len(batch) == batch_size:yield batchbatch = []obs = next_obs# 计算衰减的奖励并返回需要保存下来的优秀片段
def filter_batch(batch, percentile):disc_rewards = list(map(lambda s: s.reward * (GAMMA ** len(s.steps)), batch))reward_bound = np.percentile(disc_rewards, percentile)train_obs = []train_act = []elite_batch = []for example, discounted_reward in zip(batch, disc_rewards):if discounted_reward > reward_bound:train_obs.extend(map(lambda step: step.observation, example.steps))train_act.extend(map(lambda step: step.action, example.steps))elite_batch.append(example)return elite_batch, train_obs, train_act, reward_boundif __name__ == "__main__":random.seed(12345)env = DiscreteOneHotWrapper(gym.make("FrozenLake-v0"))# env = gym.wrappers.Monitor(env, directory="mon", force=True)obs_size = env.observation_space.shape[0]n_actions = env.action_space.nnet = Net(obs_size, HIDDEN_SIZE, n_actions)objective = nn.CrossEntropyLoss()optimizer = optim.Adam(params=net.parameters(), lr=0.001)writer = SummaryWriter(comment="-frozenlake-tweaked")# 存下之前的优秀片段 并传递给下一次训练迭代的处理函数full_batch = []for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):reward_mean = float(np.mean(list(map(lambda s: s.reward, batch))))full_batch, obs, acts, reward_bound = filter_batch(full_batch + batch, PERCENTILE)if not full_batch:continueobs_v = torch.FloatTensor(obs)acts_v = torch.LongTensor(acts)full_batch = full_batch[-500:]optimizer.zero_grad()action_scores_v = net(obs_v)loss_v = objective(action_scores_v, acts_v)loss_v.backward()optimizer.step()print("%d: loss=%.3f, reward_mean=%.3f, reward_bound=%.3f, batch=%d" % (iter_no, loss_v.item(), reward_mean, reward_bound, len(full_batch)))writer.add_scalar("loss", loss_v.item(), iter_no)writer.add_scalar("reward_mean", reward_mean, iter_no)writer.add_scalar("reward_bound", reward_bound, iter_no)if reward_mean > 0.8:print("Solved!")breakwriter.close()

表格学习和Bellman方程

价值,定义为从状态获得的预期的总奖励(可选衰减)。价值始终根据agent遵循的某些策略来计算。也就是说,不仅会考虑agent采取某一动作的立即奖励,还会考虑加上状态的长期价值。

假设状态S0,执行一个动作后,会以不同的概率导致若干中不同的结果状态。
V0(a=1)=p1(r1+γV1)+p2(r2+γV2)+p3(r3+γV3)V_{0}(a=1)=p_{1}\left(r_{1}+\gamma V_{1}\right)+p_{2}\left(r_{2}+\gamma V_{2}\right)+p_{3}\left(r_{3}+\gamma V_{3}\right)V0​(a=1)=p1​(r1​+γV1​)+p2​(r2​+γV2​)+p3​(r3​+γV3​)
状态的最优价值等于动作所获得的最大预期的立即奖励,再加上下一状态的长期衰减奖励。

动作的价值Q(s, a)等于在状态s执行动作a可获得的总奖励
假设向前的动作,33%会按动作执行,33%向左滑行,33%向右滑行,则有
Q(s0,前)=0.33V1+0.33V2+0.33V3

Q等于在状态s时采取动作a所预期获得的立即奖励和目标状态衰减长期奖励之和,而且
Vs=max⁡a∈AQ(s,a)V_{s}=\max _{a \in A} Q(s, a)Vs​=maxa∈A​Q(s,a)
这意味着,某些状态的价值等于从该状态执行某动作能获取的最大价值。

在实践中,Q值要更加方便,对于agent而言,基于Q制定决策要比基于V简单。
对于Q而言,要基于状态选择动作,agent只要计算当前状态所有动作的Q值,然后选择Q值最大的动作即可。
对于状态价值(V)而言,agent不仅要知道价值,还需要知道转移概率。但在实践中这并不能实现知道。所以agent需要估计每个动作状态对的转移概率。

Gym提供给agent的接口为:观察状态、决定动作,然后获得下一个观察结果以及转移奖励。我们不知道从状态s0采取动作a0进入状态s1的概率。
在Bellman更新中,既需要转移的奖励,也需要转移概率。因此,利用agent的经验来估算这两个未知值。

状态价值迭代实现

#!/usr/bin/env python3
import gym
import collections
from tensorboardX import SummaryWriterENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
TEST_EPISODES = 20class Agent:def __init__(self):self.env = gym.make(ENV_NAME)self.state = self.env.reset()# 奖励表 带有复合键"源状态"+"动作"+"目标状态"的字典 该值是从立即奖励中获得的self.rewards = collections.defaultdict(float)# 转移表 记录了各转移次数的键是"状态"+"动作"的字典 例如键(0,1)是状态0执行动作1# 而值是另一个字典 是所观察到的目标状态和次数的映射# 例如带有键(0,1)的字典内容为{4:3,5:7}表示3次进入状态4,7次进入状态5self.transits = collections.defaultdict(collections.Counter)# 价值表 将状态映射到计算出该状态的价值的字典self.values = collections.defaultdict(float)# 该函数从环境中收集随机经验 并更新奖励表和转移表def play_n_random_steps(self, count):for _ in range(count):action = self.env.action_space.sample()new_state, reward, is_done, _ = self.env.step(action)self.rewards[(self.state, action, new_state)] = rewardself.transits[(self.state, action)][new_state] += 1self.state = self.env.reset() if is_done else new_state# 该函数根据转移表、奖励表和价值表计算从该状态采取某动作的价值# 可用来针对某状态选择最佳动作 并在价值迭代时计算状态的新价值def calc_action_value(self, state, action):# 从转移表中获取给定状态和动作的转移计数器 键为给定状态 值为历史转移次数target_counts = self.transits[(state, action)]# 求和 获得在某状态执行某动作的总次数total = sum(target_counts.values())# 对动作到达的每个目标状态进行迭代# 使用Bellman方程计算其对总动作价值的贡献(=立即奖励+目标状态的衰减价值)# 将和乘以转移概率 汇总到最终动作价值action_value = 0.0for tgt_state, count in target_counts.items():reward = self.rewards[(state, action, tgt_state)]action_value += (count / total) * (reward + GAMMA * self.values[tgt_state])return action_value# 使用calc_action_value()函数来决定某状态可采取的最佳动作# 迭代有可能的动作 计算其价值 返回价值最大的动作def select_action(self, state):best_action, best_value = None, Nonefor action in range(self.env.action_space.n):action_value = self.calc_action_value(state, action)if best_value is None or best_value < action_value:best_value = action_valuebest_action = actionreturn best_action# 利用select_action()函数返回的动作 在环境中运行一整个片段 累计奖励def play_episode(self, env):total_reward = 0.0state = env.reset()while True:action = self.select_action(state)new_state, reward, is_done, _ = env.step(action)self.rewards[(state, action, new_state)] = rewardself.transits[(state, action)][new_state] += 1total_reward += rewardif is_done:breakstate = new_statereturn total_reward# 价值迭代函数 遍历环境中所有的状态 计算该状态可到达的每一状态的价值# 从而获得状态价值的候选项 然后用可执行动作的最大价值来更新当前状态的价值def value_iteration(self):for state in range(self.env.observation_space.n):state_values = [self.calc_action_value(state, action)for action in range(self.env.action_space.n)]self.values[state] = max(state_values)if __name__ == "__main__":test_env = gym.make(ENV_NAME)agent = Agent()writer = SummaryWriter(comment="-v-iteration")iter_no = 0best_reward = 0.0while True:iter_no += 1# 执行100个随机步骤 获得新数据填充奖励表和转移表agent.play_n_random_steps(100)agent.value_iteration()reward = 0.0for _ in range(TEST_EPISODES):reward += agent.play_episode(test_env)reward /= TEST_EPISODESwriter.add_scalar("reward", reward, iter_no)if reward > best_reward:print("Best reward updated %.3f -> %.3f" % (best_reward, reward))best_reward = rewardif reward > 0.80:print("Solved in %d iterations!" % iter_no)breakwriter.close()

动作价值迭代实现

状态价值迭代实现中,价值表保留了状态价值。现在要加上Q函数的值,该函数有两个参数:状态和动作。
然后现在不需要calc_action_value()函数了,动作价值存储在价值表中了。

#!/usr/bin/env python3
import gym
import collections
from tensorboardX import SummaryWriterENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
TEST_EPISODES = 20class Agent:def __init__(self):self.env = gym.make(ENV_NAME)self.state = self.env.reset()self.rewards = collections.defaultdict(float)self.transits = collections.defaultdict(collections.Counter)self.values = collections.defaultdict(float)def play_n_random_steps(self, count):for _ in range(count):action = self.env.action_space.sample()new_state, reward, is_done, _ = self.env.step(action)self.rewards[(self.state, action, new_state)] = rewardself.transits[(self.state, action)][new_state] += 1self.state = self.env.reset() if is_done else new_state# 因为不再有calc_action_value()方法 所以选择动作只需要遍历动作并在价值中查找对应的价值def select_action(self, state):best_action, best_value = None, Nonefor action in range(self.env.action_space.n):action_value = self.values[(state, action)]if best_value is None or best_value < action_value:best_value = action_valuebest_action = actionreturn best_actiondef play_episode(self, env):total_reward = 0.0state = env.reset()while True:action = self.select_action(state)new_state, reward, is_done, _ = env.step(action)self.rewards[(state, action, new_state)] = rewardself.transits[(state, action)][new_state] += 1total_reward += rewardif is_done:breakstate = new_statereturn total_reward# 对于给定的状态和动作 通过动作达到目标状态的统计信息来计算动作的价值# 使用计数器估计目标状态的概率 根据Bellman方程计算状态的价值# 前面的实现从价值表中读取状态的价值 现在通过select_action()方法选择最大Q值的动作# 并将该Q值作为目标状态的价值def value_iteration(self):for state in range(self.env.observation_space.n):for action in range(self.env.action_space.n):action_value = 0.0target_counts = self.transits[(state, action)]total = sum(target_counts.values())for tgt_state, count in target_counts.items():reward = self.rewards[(state, action, tgt_state)]best_action = self.select_action(tgt_state)action_value += (count / total) * (reward + GAMMA * self.values[(tgt_state, best_action)])self.values[(state, action)] = action_valueif __name__ == "__main__":test_env = gym.make(ENV_NAME)agent = Agent()writer = SummaryWriter(comment="-q-iteration")iter_no = 0best_reward = 0.0while True:iter_no += 1agent.play_n_random_steps(100)agent.value_iteration()reward = 0.0for _ in range(TEST_EPISODES):reward += agent.play_episode(test_env)reward /= TEST_EPISODESwriter.add_scalar("reward", reward, iter_no)if reward > best_reward:print("Best reward updated %.3f -> %.3f" % (best_reward, reward))best_reward = rewardif reward > 0.80:print("Solved in %d iterations!" % iter_no)breakwriter.close()

深度Q-network

随着问题的深入,遍历状态空间的每个状态是不现实的,我们可以用从环境中获得的状态来更新状态价值。

Tabular Q-learning

将学习率考虑进来,即使用0~1的学习率α将新旧Q值平均。算法如下所示:

  1. 从一个Q(s,a)的空表开始
  2. 从环境中获取(s,a,r,s’),s旧状态 a动作 r奖励 s’新状态
  3. 进行Bellman更新:Qs,a←(1−α)Qs,a+α(r+γmax⁡a′∈AQs′,a′)Q_{s, a} \leftarrow(1-\alpha) Q_{s, a}+\alpha\left(r+\gamma \max _{a^{\prime} \in A} Q_{s^{\prime}, a^{\prime}}\right)Qs,a​←(1−α)Qs,a​+α(r+γmaxa′∈A​Qs′,a′​)
  4. 检查收敛条件 如果不符合 则从步骤2开始重复

代码实现实例如下

#!/usr/bin/env python3
import gym
import collections
from tensorboardX import SummaryWriterENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20class Agent:def __init__(self):self.env = gym.make(ENV_NAME)self.state = self.env.reset()# 只需要跟踪一个价值表即可self.values = collections.defaultdict(float)# 从环境中获取下一个状态转移 返回(s,a,r,s')def sample_env(self):action = self.env.action_space.sample()old_state = self.statenew_state, reward, is_done, _ = self.env.step(action)self.state = self.env.reset() if is_done else new_statereturn (old_state, action, reward, new_state)# 输入环境中的状态 查找当前状态可获得最大价值的动作# 在测试方法中 使用当前价值表运行一个片段(评估策略的质量)# 在执行价值更新时 用于获取下一个状态的价值def best_value_and_action(self, state):best_value, best_action = None, Nonefor action in range(self.env.action_space.n):action_value = self.values[(state, action)]if best_value is None or best_value < action_value:best_value = action_valuebest_action = actionreturn best_value, best_action# 使用Bellman更新价值表def value_update(self, s, a, r, next_s):best_v, _ = self.best_value_and_action(next_s)new_val = r + GAMMA * best_vold_val = self.values[(s, a)]self.values[(s, a)] = old_val * (1-ALPHA) + new_val * ALPHA# 使用测试环境运行一整个片段 用于评估当前的策略def play_episode(self, env):total_reward = 0.0state = env.reset()while True:_, action = self.best_value_and_action(state)new_state, reward, is_done, _ = env.step(action)total_reward += rewardif is_done:breakstate = new_statereturn total_rewardif __name__ == "__main__":test_env = gym.make(ENV_NAME)agent = Agent()writer = SummaryWriter(comment="-q-learning")iter_no = 0best_reward = 0.0while True:iter_no += 1s, a, r, next_s = agent.sample_env()agent.value_update(s, a, r, next_s)reward = 0.0for _ in range(TEST_EPISODES):reward += agent.play_episode(test_env)reward /= TEST_EPISODESwriter.add_scalar("reward", reward, iter_no)if reward > best_reward:print("Best reward updated %.3f -> %.3f" % (best_reward, reward))best_reward = rewardif reward > 0.80:print("Solved in %d iterations!" % iter_no)breakwriter.close()

Deep Q-learning

我们尝试用非线性表示将状态和动作都映射到一个值上,实际上就是尝试用NN来近似一个复杂的非线性函数Q(s,a),所以采用深度NN对Q-learning算法进行修改:

  1. 用一些初始近似值初始化Q(s,a)
  2. 与环境交互获得元组(s,a,r,s’)
  3. 计算损失L:如果片段结束,L=(Qs,a−r)2\mathcal{L}=\left(Q_{s, a}-r\right)^{2}L=(Qs,a​−r)2,否则L=(Qs,a−(r+γmax⁡a′∈AQs′,a′))2\mathcal{L}=\left(Q_{s, a}-\left(r+\gamma \max _{a^{\prime} \in A} Q_{s^{\prime}, a^{\prime}}\right)\right)^{2}L=(Qs,a​−(r+γmaxa′∈A​Qs′,a′​))2
  4. 通过最小化模型参数的损失,使用随机梯度下降SGD算法更新Q(s,a)
  5. 从步骤2开始重复,直到收敛为止

环境交互问题
并且使用ϵ\epsilonϵ-greedy方法来混合随机探索与较好的测试经验Q值,通常的做法是从ϵ\epsilonϵ=1.0开始,然后慢慢减小到某个较小的值。

SGD优化问题
为满足SGD优化的条件,我们要使用回访缓冲区技术,就是大量使用过去的经验,并设置一个固定大小的缓冲区,将新数据添加到缓冲区的末尾,同时将最旧的的经验数据移除。

步骤之间的相关性
Bellman方程通过Q(s’,a’)提供Q(s,a)的值。但是两个状态太相似了,NN无法区分。所以当NN的参数更新以使Q(s,a)更接近所需的结果是,可以间接更改Q(s’,a’)和附近其他状态产生的值,但这会使训练很不稳定。所以引入目标网络技术,来保留一个网络的副本并将其用于Bellman方程中的Q(s’,a’)值。该网络周期性地与主网络同步,例如每N(1000或10000次训练迭代)步进行一次同步。

DQN训练的最终形式:

  1. 使用随机权重(ϵ\epsilonϵ<—1.0)初始化Q(s,a)Q(s, a)Q(s,a)和Q^(s,a)\hat{Q}(s, a)Q^​(s,a)的参数,清空回放缓冲区
  2. 以概率ϵ\epsilonϵ选择一个随机动作a,否则a=arg⁡max⁡aQs,aa=\arg \max _{a} Q_{s, a}a=argmaxa​Qs,a​
  3. 在模拟器中执行动作a,观察奖励r和下一个状态s’
  4. 将转移过程(s,a,r,s’)存储在回放缓冲区中
  5. 从回放缓冲区中采样一个随机的小批量转移过程
  6. 对于回放缓冲区的每个转移过程,如果片段在这一步结束,则计算目标y=ry=ry=r,
    否则计算y=r+γmax⁡a′∈AQ^s′,a′y=r+\gamma \max _{a^{\prime} \in A} \hat{Q}_{s^{\prime}, a^{\prime}}y=r+γmaxa′∈A​Q^​s′,a′​
  7. 计算损失:L=(Qs,a−y)2\mathcal{L}=\left(Q_{s, a}-y\right)^{2}L=(Qs,a​−y)2
  8. 通过最小化模型参数的损失,使用SGD算法更新Q(s,a)Q(s, a)Q(s,a)
  9. 每N步,将权重从QQQ复制到Q^\hat{Q}Q^​
    10.从步骤2开始重复,知道收敛为止

示例代码分为三个模块:

  • wrappers.py:Atari环境包装程序
  • dqn_model.py:DQN NN层
  • 02_dqn_pong.py:主模块,包括训练循环、损失函数计算和经验回放缓冲区
wrappers.py
import cv2
import gym
import gym.spaces
import numpy as np
import collectionsclass FireResetEnv(gym.Wrapper):def __init__(self, env=None):"""For environments where the user need to press FIRE for the game to start."""# 按下FIRE按钮 并检查游戏中存在的几种极端情况super(FireResetEnv, self).__init__(env)assert env.unwrapped.get_action_meanings()[1] == 'FIRE'assert len(env.unwrapped.get_action_meanings()) >= 3def step(self, action):return self.env.step(action)def reset(self):self.env.reset()obs, _, done, _ = self.env.step(1)if done:self.env.reset()obs, _, done, _ = self.env.step(2)if done:self.env.reset()return obs# 组合了K帧中的重复动作和连续帧中的像素
class MaxAndSkipEnv(gym.Wrapper):def __init__(self, env=None, skip=4):"""Return only every `skip`-th frame"""super(MaxAndSkipEnv, self).__init__(env)# most recent raw observations (for max pooling across time steps)self._obs_buffer = collections.deque(maxlen=2)self._skip = skipdef step(self, action):total_reward = 0.0done = Nonefor _ in range(self._skip):obs, reward, done, info = self.env.step(action)self._obs_buffer.append(obs)total_reward += rewardif done:breakmax_frame = np.max(np.stack(self._obs_buffer), axis=0)return max_frame, total_reward, done, infodef reset(self):"""Clear past frame buffer and init. to first obs. from inner env."""self._obs_buffer.clear()obs = self.env.reset()self._obs_buffer.append(obs)return obs# 将模拟器的输入观察结果RGB 210x160 转换为84x84灰度图像
# 使用了比色灰度转换 调整大小和裁剪顶部和底部
class ProcessFrame84(gym.ObservationWrapper):def __init__(self, env=None):super(ProcessFrame84, self).__init__(env)self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)def observation(self, obs):return ProcessFrame84.process(obs)@staticmethoddef process(frame):if frame.size == 210 * 160 * 3:img = np.reshape(frame, [210, 160, 3]).astype(np.float32)elif frame.size == 250 * 160 * 3:img = np.reshape(frame, [250, 160, 3]).astype(np.float32)else:assert False, "Unknown resolution."img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)x_t = resized_screen[18:102, :]x_t = np.reshape(x_t, [84, 84, 1])return x_t.astype(np.uint8)# 将HWC(高度,宽度,通道)转换为pytorch需要的CHW(通道,高度,宽度)
class ImageToPyTorch(gym.ObservationWrapper):def __init__(self, env):super(ImageToPyTorch, self).__init__(env)old_shape = self.observation_space.shapeself.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]),dtype=np.float32)def observation(self, observation):return np.moveaxis(observation, 2, 0)# 将观察数据从字节转换为浮点数 并将每个像素的值缩小到[0.0 ... 1.0]范围
class ScaledFloatFrame(gym.ObservationWrapper):def observation(self, obs):return np.array(obs).astype(np.float32) / 255.0# 沿着第一个维度将随后的几帧叠加在一起 作为观察结果返回 使得网络感知到对象的动态信息
class BufferWrapper(gym.ObservationWrapper):def __init__(self, env, n_steps, dtype=np.float32):super(BufferWrapper, self).__init__(env)self.dtype = dtypeold_space = env.observation_spaceself.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),old_space.high.repeat(n_steps, axis=0), dtype=dtype)def reset(self):self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)return self.observation(self.env.reset())def observation(self, observation):self.buffer[:-1] = self.buffer[1:]self.buffer[-1] = observationreturn self.buffer# 创建环境 并应用包装器
def make_env(env_name):env = gym.make(env_name)env = MaxAndSkipEnv(env)env = FireResetEnv(env)env = ProcessFrame84(env)env = ImageToPyTorch(env)env = BufferWrapper(env, 4)return ScaledFloatFrame(env)
dqn_model.py
import torch
import torch.nn as nn
import numpy as npclass DQN(nn.Module):def __init__(self, input_shape, n_actions):super(DQN, self).__init__()self.conv = nn.Sequential(nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())conv_out_size = self._get_conv_out(input_shape)self.fc = nn.Sequential(nn.Linear(conv_out_size, 512),nn.ReLU(),nn.Linear(512, n_actions))# 接受卷积层的输入形状 返回参数数量 传递给第一个全连接层构造函数def _get_conv_out(self, shape):o = self.conv(torch.zeros(1, *shape))return int(np.prod(o.size()))"""输入4D张量(第一个维度是批量大小,第二个维度是颜色通道,也就是我们后续帧的堆栈第三、四个维度是图像维度)""" def forward(self, x):# 输入经过卷积层获得4D张量 然后view()将其展平为两个维度:# 批大小 该批次卷积返回的所有参数conv_out = self.conv(x).view(x.size()[0], -1)# 将展平的2D张量传递到全连接层 获取每批输入的Q值return self.fc(conv_out)
02_dqn_pong.py
#!/usr/bin/env python3
from lib import wrappers
from lib import dqn_model
import argparse
import time
import numpy as np
import collections
import torch
import torch.nn as nn
import torch.optim as optim
from tensorboardX import SummaryWriter# 训练的默认环境 最后100个片段的奖励边界
DEFAULT_ENV_NAME = "PongNoFrameskip-v4"
MEAN_REWARD_BOUND = 19.5GAMMA = 0.99    # γ用于Bellman近似
BATCH_SIZE = 32    # 回放缓冲区的采样批大小
REPLAY_SIZE = 10000    # 回放缓冲区的最大容量
LEARNING_RATE = 1e-4    # 优化器学习率
SYNC_TARGET_FRAMES = 1000    # 模型权重从训练模型同步到目标模型的频率
REPLAY_START_SIZE = 10000# epsilon在前10**5帧从1.0衰减到0.02
EPSILON_DECAY_LAST_FRAME = 10**5
EPSILON_START = 1.0
EPSILON_FINAL = 0.02# 定义经验回放缓冲区 存储从环境中获得的(状态,动作,奖励,完成标志,下一状态)元组
Experience = collections.namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])class ExperienceBuffer:def __init__(self, capacity):self.buffer = collections.deque(maxlen=capacity)def __len__(self):return len(self.buffer)# 在环境中每执行一步 都将状态转移情况推送到缓冲区def append(self, experience):self.buffer.append(experience)# 随机抽取一批状态转移样本def sample(self, batch_size):indices = np.random.choice(len(self.buffer), batch_size, replace=False)states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), \np.array(dones, dtype=np.uint8), np.array(next_states)class Agent:def __init__(self, env, exp_buffer):self.env = envself.exp_buffer = exp_bufferself._reset()# 存储对环境的引用和经验回放缓冲区 追踪观察和累计的总奖励def _reset(self):self.state = env.reset()self.total_reward = 0.0# 利用概率epsilon采取随机动作 否则将遍历旧模型所有动作的Q值 选择最佳值对应的动作def play_step(self, net, epsilon=0.0, device="cpu"):done_reward = Noneif np.random.random() < epsilon:action = env.action_space.sample()else:state_a = np.array([self.state], copy=False)state_v = torch.tensor(state_a).to(device)q_vals_v = net(state_v)_, act_v = torch.max(q_vals_v, dim=1)action = int(act_v.item())# do step in the environmentnew_state, reward, is_done, _ = self.env.step(action)self.total_reward += rewardexp = Experience(self.state, action, reward, is_done, new_state)self.exp_buffer.append(exp)self.state = new_state# 如果到达片段末尾 则返回总累计奖励 否则为Noneif is_done:done_reward = self.total_rewardself._reset()return done_reward# 输入批作为数组元组 训练网络 定期与训练网络同步的目标网络
def calc_loss(batch, net, tgt_net, device="cpu"):states, actions, rewards, dones, next_states = batch# 计算梯度 tgt_net用于计算下一个状态的价值states_v = torch.tensor(states).to(device)next_states_v = torch.tensor(next_states).to(device)actions_v = torch.tensor(actions).to(device)rewards_v = torch.tensor(rewards).to(device)done_mask = torch.ByteTensor(dones).to(device)# 将观察结果传递给第一个模型 使用gather()张量操作提取所采取动作的Q值state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)# 将目标网络应用于下一个状态观察值 max()返回最大值和索引 只取最大值next_state_values = tgt_net(next_states_v).max(1)[0]# 如果状态转移发生在片段的最后一步 那动作价值不会获得下一个状态的衰减奖励 收敛的关键next_state_values[done_mask] = 0.0# 将值与计算图分开 防止梯度流入用于计算下一状态Q近似值的NNnext_state_values = next_state_values.detach()# 计算Bellman近似值expected_state_action_values = next_state_values * GAMMA + rewards_v# 计算均方误差损失return nn.MSELoss()(state_action_values, expected_state_action_values)if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")parser.add_argument("--env", default=DEFAULT_ENV_NAME,help="Name of the environment, default=" + DEFAULT_ENV_NAME)parser.add_argument("--reward", type=float, default=MEAN_REWARD_BOUND,help="Mean reward boundary for stop of training, default=%.2f" % MEAN_REWARD_BOUND)args = parser.parse_args()device = torch.device("cuda" if args.cuda else "cpu")env = wrappers.make_env(args.env)net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)tgt_net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)writer = SummaryWriter(comment="-" + args.env)print(net)# 创建经验回放缓冲区buffer = ExperienceBuffer(REPLAY_SIZE)agent = Agent(env, buffer)epsilon = EPSILON_STARToptimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)total_rewards = []frame_idx = 0ts_frame = 0ts = time.time()best_mean_reward = None# epsilon在给定帧数内线性下降 最后保持在EPSILON_FINAL=0.01while True:frame_idx += 1epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)reward = agent.play_step(net, epsilon, device=device)if reward is not None:total_rewards.append(reward)speed = (frame_idx - ts_frame) / (time.time() - ts)ts_frame = frame_idxts = time.time()mean_reward = np.mean(total_rewards[-100:])print("%d: done %d games, mean reward %.3f, eps %.2f, speed %.2f f/s" % (frame_idx, len(total_rewards), mean_reward, epsilon,speed))writer.add_scalar("epsilon", epsilon, frame_idx)writer.add_scalar("speed", speed, frame_idx)writer.add_scalar("reward_100", mean_reward, frame_idx)writer.add_scalar("reward", reward, frame_idx)# 如果最近100个片段的平均奖励达到最高时 报告结果并保存模型参数if best_mean_reward is None or best_mean_reward < mean_reward:torch.save(net.state_dict(), args.env + "-best.dat")if best_mean_reward is not None:print("Best mean reward updated %.3f -> %.3f, model saved" % (best_mean_reward, mean_reward))best_mean_reward = mean_reward# 如果平均奖励超过奖励边界 就停止训练if mean_reward > args.reward:print("Solved in %d frames!" % frame_idx)break# 检查缓冲区是否达到可以训练if len(buffer) < REPLAY_START_SIZE:continue# 每隔SYNC_TARGET_FRAMES帧数将参数从主网络同步到目标网络if frame_idx % SYNC_TARGET_FRAMES == 0:tgt_net.load_state_dict(net.state_dict())# 梯度归零 采样数据 计算loss 优化optimizer.zero_grad()batch = buffer.sample(BATCH_SIZE)loss_t = calc_loss(batch, net, tgt_net, device=device)loss_t.backward()optimizer.step()writer.close()

《深度强化学习实践》学习内容整理相关推荐

  1. 苦七学习群提问内容整理210729

    前言: 01.整理群里的提问,相当于把最近群里的提问和解答都看一遍,这件事很有意义,因为苦七自己几乎一半以上的知识,都是在群里给大家解答的过程中学习到的.(很少看教程) 02.群里的每条提问基本都会看 ...

  2. PCB学习第一节内容整理

    日后做板子:嘉立创 在线下单 双层 10*10 厚度1.6 阻焊 绿 使用软件:Altium Designer 10 自建目录 (AD 10)E:/projects/AD 10/ 新建工程: File ...

  3. 【赠书】掌握人工智能重要主题,深度强化学习实践书籍推荐

    ‍‍ 今天要给大家介绍的书是深度强化学习实践的第二版,本书的主题是强化学习(Reinforcement Learning,RL),它是机器学习(Machine Learning,ML)的一个分支,强调 ...

  4. 深度强化学习落地指南:弥合DRL算法原理和落地实践之间的断层 | 文末送书

    魏宁 著 电子工业出版社-博文视点 2021-08-01 ISBN: 9787121416446 定价: 109.00 元 新书推荐 ????今日福利 |关于本书| 本书从工业界一线算法工作者的视角, ...

  5. 深度强化学习:如何在AI工程实践中选择合适的算法?

    关注公众号,发现CV技术之美 在使用深度强化学习(Deep Reinforcement Learning,DRL)算法解决实际问题的过程中,明确任务需求并初步完成问题定义后,就可以为相关任务选择合适的 ...

  6. 『干货』深度强化学习与自适应在线学习的阿里实践

    https://www.toutiao.com/a6683425529313362444/ 2019-04-24 19:44:27 1搜索算法研究与实践 1.1背景 淘宝的搜索引擎涉及对上亿商品的毫秒 ...

  7. 深度学习强化学习进化计算 入门资源整理

    深度学习&强化学习&进化计算 入门资源整理 深度学习 在线课程 在线书籍 学习Python 强化学习 在线课程 在线书籍 更多资源 进化计算 后记 深度学习 在线课程 深度学习是机器学 ...

  8. 倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5)

    倒立摆_DQN算法_边做边学深度强化学习:PyTorch程序设计实践(5) 0.相关系列文章 1.Agent.py 2.Brain.py 3.Environment.py 4.Val.py 5.Rep ...

  9. 深度强化学习应用实践技巧

    文章目录 参考资料 1. 如何应用深度强化学习 1.1 简单测试阶段 1.2 快速配置阶段 1.3 部署训练阶段 2. 实现阶段 3. 训练和调试阶段 参考资料 <深度强化学习>书籍第18 ...

  10. 30+博士、100+硕士整理的超全深度强化学习资源清单

    作者 | Deep-RL 来源 | 深度强化学习实验室(ID:Deep-RL) 今天为大家推荐一个开源.开发的 Github 好项目<A Guide for Deep Reinforcement ...

最新文章

  1. 如何获取NumPy数组中N个最大值的索引?
  2. JS event使用方法详解
  3. 如何判断添加的一个面要素是否与某一个面图层相交(AO)
  4. 31.水平居中总结-不定宽块状元素方法(三)
  5. Linux实时查看进程命令top笔记
  6. java中什么时候应用异常_生产Java应用程序中的十大异常类型-基于1B事件
  7. 多表查询,自连接,子查询
  8. [Tailwind] Control What Variations are Generated for Each Utility Class Module in Tailwind
  9. Equal-size partition problem
  10. vue-router小案例-后台管理路由
  11. 【游戏开发】游戏开发书籍汇总
  12. 转 波束成形 Beamforming 简述
  13. Java: null是什么??
  14. 什么是长元音和短元音
  15. 2022年电子造粒计数器市场前景分析及研究报告
  16. 医疗器械行业数据分析必备软件--全球可查
  17. 腾讯产品18讲10:把握产品需求的管理节奏
  18. Android平板电脑上的APP应用程序设计须知
  19. 思科路由器IOS系统和配置文件的备份、删除及还原
  20. 三星android 8升级包,三星G930K官方安卓8.0固件rom升级更新包:KTC-G930KKKU2ESI3

热门文章

  1. 学习什么技术 4年后最赚钱
  2. 测量地球半径的古希腊方法
  3. 8421BCD码加法的修正证明
  4. 百万年薪python之路 -- MySQL数据库之 完整性约束
  5. foxmail for linux 64,foxmail server 1.2 for linux数据备份手册_邮件服务器
  6. 关于公司离职后住房公积金问题
  7. QQ在线状态-可以与我直接QQ交流
  8. 光E电怎样让理财收益最大化
  9. CUDA out of memory(CUDA显存不足)
  10. 【重识云原生】第六章容器6.1.8节——Docker核心技术UnionFS