1,多臂老虎机

1.1,形式化描述

在多臂老虎机(Multi-Armed Bandit,MAB)问题中,有一个拥有  根拉杆的老虎机,每一个拉杆都对应一个关于奖励的概率分布 。我们每次拉下其中一根拉杆,就可以获得一个从该拉杆对应的奖励概率分布中获得一个奖励 。我们的目标是: 在各个拉杆奖励的概率分布未知的情况下,从头开始尝试,并在操作  次拉杆后,获得尽可能多的累积奖励。由于奖励的分布是未知的,我们就需要在“探索拉杆的获奖概率”和“根据经验选择获奖最多的拉杆”中进行权衡。于是,要采用怎样的操作策略,才能使获得的累积奖励最多,便是多臂老虎机问题。

多臂老虎机问题可以表示为一个元组 ,其中:

  •  为动作集合,其中一个动作表示拉动一个拉杆。若多臂老虎机一共有  个拉杆,那动作空间就是集合 ,我们用  表示任意一个动作。
  •  为奖励函数分布,每一个拉杆都对应一个奖励分布 ,不同拉杆之间的奖励分布通常是不同的。

假设每个时间步只能拉动一个拉杆,多臂老虎机的目标为最大化一段时间步  内累积的奖励:。其中  表示在第时间步  拉动的拉杆, 表示拉动拉杆 后获得的奖励。

累计懊悔:对于每一个拉杆 ,我们定义其期望奖励为  。于是,至少存在一个拉杆,它的期望奖励不小于其他任意一个拉杆。我们将该最优期望奖励表示为 。为了更加直观、方便地观察一个拉杆离最优拉杆期望奖励的差距,我们引 入懊悔概念。懊悔定义为当前拉杆  与最优拉杆的期望奖励差,即 。累积懊悔即操作  次拉杆后累积的懊悔总量,对于一次完整的  步决策 ,累计懊悔为 。MAB 问题的目标为最大化累积奖励,等价于最小化累积懊悔。

估计期望奖励:为了知道哪一个拉杆能获得更高的奖励,我们需要估计这个拉杆的期望奖励。只拉动一次获得的奖励存在随机性,所以需要拉动一个拉杆多次,然后计算得到的多次奖励的平均值。

(1)对于 ,初始化期望奖励估值  和计数器  。

(2)主循环 

  • 选取某个拉杆 
  • 得到奖励  
  • 更新计数器:
  • 更新估值:

如果采用所有数求和再除以次数,存在一个缺点是每次更新的时间和空间复杂度均为 。而采用增量式更新,时间和空间复杂度为 

以上第 4 步我们如此更新估值,是因为这样我们可以进行增量式的平均数更新,公式如下。

1.2,代码实现

下面我们将用代码实现一个多臂老虎机,拉杆数为 10。其中每个拉杆的奖励服从伯努利分布,即每次拉下拉杆有  的概率获得奖励为 1,剩下  的概率获得奖励为 0。奖励为 1 代表获奖,奖励为 0 代表没有获奖。

# 导入需要使用的库,其中numpy是支持数组和矩阵运算的科学计算库,而matplotlib是绘图库
import numpy as np
import matplotlib.pyplot as plt
class BernoulliBandit:""" 伯努利多臂老虎机,输入K为拉杆个数 """def __init__(self, K):self.probs = np.random.uniform(size=K) # 随机生成K个0-1之间的数,作为每个拉杆的获奖概率self.best_idx = np.argmax(self.probs) # 获奖概率最大的拉杆self.best_prob = self.probs[self.best_idx] # 最大的获奖概率self.K = Kdef step(self, k):# 当玩家选择了k号拉杆后,根据该老虎机k号拉杆获得奖励的概率返回 1(获奖)或 0(未获奖)if np.random.rand() < self.probs[k]:return 1else:return 0np.random.seed(1) # 设定随机种子,使实验具有可重复性
K = 10
bandit_10_arm = BernoulliBandit(K)
print("随机生成了一个 %d臂伯努利老虎机" % K)
print("获得奖励概率最大的拉杆为%d号,其获奖概率为%.4f" % (bandit_10_arm.best_idx, bandit_10_arm.best_prob))

接下来我们以一个 Solver 基础类来实现上述的多臂老虎机的求解方案。根据前文的算法框架,我们需要实现下列函数:根据策略选择动作,根据动作获取奖励,更新奖励估值,更新累积懊悔和计数。在下面的 MAB 算法基本框架中,我们将根据策略选择动作根据动作获取奖励 和更新期望奖励估值放在 run_one_step 函数中,由每个继承 Solver 类的策略具体实现,而更新累积懊悔和计数则直接放在主循环 run 中。

class Solver:""" 多臂老虎机算法基本框架 """def __init__(self, bandit):self.bandit = banditself.counts = np.zeros(self.bandit.K) # 每个拉杆的尝试次数self.regret = 0. # 当前步的累积懊悔self.actions = [] # 维护一个列表,记录每一步的动作self.regrets = [] # 维护一个列表,记录每一步的累积懊悔def update_regret(self, k):# 计算累积懊悔并保存,k为本次行动选择的拉杆的编号self.regret += self.bandit.best_prob - self.bandit.probs[k]self.regrets.append(self.regret)def run_one_step(self):# 返回当前行动选择哪一个拉杆,由每个具体的策略实现raise NotImplementedErrordef run(self, num_steps):# 运行一定次数,num_steps为总运行次数for _ in range(num_steps):k = self.run_one_step()self.counts[k] += 1self.actions.append(k)self.update_regret(k)

1.3,探索与利用问题

在以上算法流程中,还没有一个策略告诉我们应该采取哪个动作,即拉动哪个拉杆,所以接下来我们将来学习如何设计一个策略。例如一个最简单的策略就是一直采取第一个动作,但这就非常取决于运气。如果运气绝佳,可能刚好是能获得最大期望奖励的拉杆,即最优拉杆;但如果运气很糟糕,就有可能是最小的期望奖励。于是在多臂老虎机任务中,一个经典的问题就是探索与利用的平衡问题。策略的设计就需要考虑这个问题。

  • 探索(Exploration) 是指尝试更多可能的拉杆,这个拉杆不一定会获取最优奖励,但能够清楚所有拉杆的获奖情况,也即对于一个 10 臂老虎机,我们要去把所有的拉杆都试一下才知道哪个拉杆可能获得最大的奖励。
  • 利用 (Exploitation) 是指拉动已知平均奖励中最优的那个拉杆。由于已知的信息仅仅来自于有限次的交互观测,所以当前最优的拉杆不一定是真正全局最优。例如,对于一个 10 臂老虎机,我们只试过其中 3 个拉杆,接下来就一直拉动这 3 个拉杆中平均奖励最大的那个拉杆,但很有可能期望奖励最大的拉杆在剩下 7 个当中;即使我们对 10 个拉杆各自都尝试过了 20 次,发现 5 号拉杆的经验平均奖励是最高的,但仍然存在着微小的概率使得另一个 6 号拉杆的真实期望奖励是比 5 号拉杆更高的。

于是在多臂老虎机问题中,我们需要平衡探索和利用的次数,使得能最大化累积奖励。一个比较大致的思路是在一开始的时候做比较多的探索,在对每个拉杆都有比较准确的估计后,再进行利用。目前存在一些比较经典的算法来解决这个问题,它们是ϵ-贪心算法、UCB 算法、汤普森采样算法。

1.4,ϵ-贪心算法

如果是完全的贪心算法,即在每一时刻采取期望奖励估值最大的动作(拉杆),这就是纯粹的利用,而没有探索。所以需要对完全贪心算法进行一些修改。其中比较经典的一种方法为ϵ-贪心算法 (ϵ-Greedy),在完全贪心算法的基础上添加了噪声,每次以概率 ϵ 选择以往经验中期望奖励估值最高的那个拉杆,以概率 1-ϵ 随机选择一个拉杆,用公式表示如下:

随着时间的不断推进,我们对各个动作的奖励估计得越来越准,此时我们就没太大必要继续花大力气进行探索。所以在ϵ-贪心算法的具体实现中,可以令 ϵ 随时间衰减,也即探索的概率将会不断降低。但是请注意,ϵ 不会在有限的步数内衰减至 0,因为基于有限步数观测的完全贪心算法仍然是一个局部信息的贪心算法,永远距离最优解有一个固定的差距。

我们接下来用代码实现一个ϵ-贪心算法,并用它去解决前文生成的 10 臂老虎机。我们设置, 以及  。

class EpsilonGreedy(Solver):""" Epsilon贪心算法,继承Solver类 """def __init__(self, bandit, epsilon=0.01, init_prob=1.0):super(EpsilonGreedy, self).__init__(bandit)self.epsilon = epsilonself.estimates = np.array([init_prob] * self.bandit.K) #初始化对所有拉杆的奖励估值def run_one_step(self):if np.random.random() < self.epsilon:k = np.random.randint(0, self.bandit.K)  # 随机选择一个拉杆else:k = np.argmax(self.estimates)  # 选择奖励估值最大的拉杆r = self.bandit.step(k)  # 得到本次动作的奖励self.estimates[k] += 1. / (self.counts[k] + 1) * (r - self.estimates[k])return k

为了更加直观地展示,我们把每一时间步的累积函数画出来。于是我们定义了以下画图函数,方便之后调用。

def plot_results(solvers, solver_names):"""生成累积懊悔随时间变化的图像。输入solvers是一个列表,列表中每个元素是一种特定的策略。而solver_names也是一个列表,包含每个策略的名称"""for idx, solver in enumerate(solvers):time_list = range(len(solver.regrets))plt.plot(time_list, solver.regrets, label=solver_names[idx])plt.xlabel('Time steps')plt.ylabel('Cumulative regrets')plt.title('%d-armed bandit' % solvers[0].bandit.K)plt.legend()plt.show()
np.random.seed(1)
epsilon_greedy_solver = EpsilonGreedy(bandit_10_arm, epsilon=0.01)
epsilon_greedy_solver.run(5000)
print('epsilon贪心算法的累积懊悔为:', epsilon_greedy_solver.regret)
plot_results([epsilon_greedy_solver], ["EpsilonGreedy"])

通过上面实验我们发现,在经历了开始的一小段时间后,ϵ-贪心算法的累积懊悔几乎是线性增长的。因为一旦做出了随机拉杆的探索,那么带来的懊悔值是固定的。

那其他不同的 ϵ 取值又会带来怎样的变化呢?继续使用该 10 臂老虎机,我们尝试不同的参数: ,查看相应实验结果。

np.random.seed(0)
epsilons = [1e-4, 0.01, 0.1, 0.25, 0.5]
epsilon_greedy_solver_list = [EpsilonGreedy(bandit_10_arm, epsilon=e) for e in epsilons]
epsilon_greedy_solver_names = ["epsilon={}".format(e) for e in epsilons]
for solver in epsilon_greedy_solver_list:solver.run(5000)
plot_results(epsilon_greedy_solver_list, epsilon_greedy_solver_names)

通过实验结果,我们发现基本上无论ϵ取值多少,累积懊悔都是线性增长的。在这个例子中,随着 ϵ 的增大,累积懊悔增长的速率也会增大。

接下来我们尝试值随时间衰减的ϵ-贪心策略,采取的具体衰减形式为线性衰减,具体公式为  。 

class DecayingEpsilonGreedy(Solver):""" epsilon取值随时间衰减的Epsilon贪心算法,继承Solver类 """def __init__(self, bandit, init_prob=1.0):super(DecayingEpsilonGreedy, self).__init__(bandit)self.estimates = np.array([init_prob] * self.bandit.K)self.total_count = 0def run_one_step(self):self.total_count += 1if np.random.random() < 1 / self.total_count: # epsilon随时间衰减k = np.random.randint(0, self.bandit.K)else:k = np.argmax(self.estimates)r = self.bandit.step(k)self.estimates[k] += 1. / (self.counts[k] + 1) * (r - self.estimates[k])return k
np.random.seed(1)
decaying_epsilon_greedy_solver = DecayingEpsilonGreedy(bandit_10_arm)
decaying_epsilon_greedy_solver.run(5000)
print('epsilon衰减的贪心算法的累积懊悔为:', decaying_epsilon_greedy_solver.regret)
plot_results([decaying_epsilon_greedy_solver], ["DecayingEpsilonGreedy"])

随着时间做指数衰减的ϵ-贪心策略能够使得累计懊悔与时间的关系变成次线性 (Sublinear) 的,这明显优于固定ϵ值的ϵ-贪心策略。

1.5,上置信界算法

设想这样一种情况:对于一台 2 臂老虎机,其中第一个拉杆只尝试过一次,得到奖励为 0;第二个拉杆尝试过很多次,对它的奖励分布有了大致的把握。这时你会怎么做?或许你会进一步尝试第一个拉杆,来更加确定其奖励分布。这种思路主要是基于不确定性,因为此时第一个拉杆只试了一次,它的不确定性很高。如果一个拉杆它的不确定性越大,那么其就越具有探索的价值,因为探索之后可能发现它的奖励很大。我们在此引入不确定性度量 ,其会随着一个动作尝试的次数增加而减小。因此可以使用一种基于不确定性的策略为,综合考虑现有的期望奖励估值和不确定性,其核心问题是如何估计不确定性。类似于资格迹。

上置信界算法 UCB (Upper Confidence Bound) 是一种经典的基于不确定性的策略算法。它的思想用到了一个非常著名的数学原理:霍夫丁不等式。在霍夫丁不等式中,令  为  个独立同分布的随机变量,取值范围为 ,其经验期望为 ,则有: 。

现在我们将霍夫丁不等式运用于多臂老虎机问题中。将  代入  ,并认为不等式中的 参数  代表不确定性度量。给定一个概率 ,根据不等式, 至少以概率  成立。当  很小时, 就以很大概率成立,此时  便是期望奖励的上界。于是,上置信界算法便选取期望奖励上界最大的动作,即 。其中  根据  求解可得 。所以设定一个概率  后,就可以计算相应的不确定性度量  了。更直观地说,UCB 方法在每次选择拉杆前,先估计每根拉杆的期望奖励的上界,使得每根拉杆的期望奖励只有一个较小的概率  超过这个上界。接着选出期望奖励上界最大的拉杆,从而选择最有可能是期望奖励最大的拉杆。

我们下面将用代码实现 UCB 算法,并且仍然使用上文定义的 10 臂老虎机来观察实验结果。在具体实现过程中,我们设置 ,并且在分母中每个拉杆的次数加上常数 1,以免出现除以 0 的情形,即此时 。并且我们设定一个系数  来控制不确定性的比重,此时:

 

class UCB(Solver):""" UCB算法,继承Solver类 """def __init__(self, bandit, coef, init_prob=1.0):super(UCB, self).__init__(bandit)self.total_count = 0self.estimates = np.array([init_prob] * self.bandit.K)self.coef = coefdef run_one_step(self):self.total_count += 1ucb = self.estimates + self.coef * np.sqrt(np.log(self.total_count) / (2 * (self.counts + 1))) # 计算上置信界k = np.argmax(ucb) # 选出上置信界最大的拉杆r = self.bandit.step(k)self.estimates[k] += 1. / (self.counts[k] + 1) * (r - self.estimates[k])return k
np.random.seed(1)
coef = 0.1 # 控制不确定性比重的系数
UCB_solver = UCB(bandit_10_arm, coef)
UCB_solver.run(5000)
print('上置信界算法的累积懊悔为:', UCB_solver.regret)
plot_results([UCB_solver], ["UCB"])

1.6,汤普森采样算法

MAB 中还有一种经典算法为汤普森采样。我们先假设每个拉杆的奖励服从一个特定的分布,然后根据每个拉杆的期望奖励来进行选择。但是由于计算每个拉杆的期望奖励计算代价比较高,汤普森采样算法使用采样的方式,即根据当前每个动作  的奖励分布进行一轮采样,得到一组各个拉杆的奖励样本,再选择样本中奖励最大的动作。我们可以看出,汤普森采样是一种计算每个拉杆产生最高奖励概率的蒙特卡罗采样方法。

了解了汤普森采样的基础思路后,需要解决另一个问题:当前每个动作  的奖励分布怎样得到并且在过程中进行更新?在实际情况中,我们通常对当前每个动作的奖励分布用 Beta 分布进行建模。具体来说,若某拉杆选择了  次,其中  次奖励为 1, 次奖励为 0,则该拉杆的奖励服从参数为  的 Beta 分布。下图是汤普森采样的一个示例。

class ThompsonSampling(Solver):""" 汤普森采样算法,继承Solver类 """def __init__(self, bandit):super(ThompsonSampling, self).__init__(bandit)self._a = np.ones(self.bandit.K)  # 列表,表示每个拉杆奖励为1的次数self._b = np.ones(self.bandit.K)  # 列表,表示每个拉杆奖励为0的次数def run_one_step(self):samples = np.random.beta(self._a, self._b) # 按照Beta分布采样一组奖励k = np.argmax(samples) # 选出采样数值最大的拉杆r = self.bandit.step(k)self._a[k] += r # 更新Beta分布的第一个参数self._b[k] += (1 - r) # 更新Beta分布的第二个参数return k
np.random.seed(1)
thompson_sampling_solver = ThompsonSampling(bandit_10_arm)
thompson_sampling_solver.run(5000)
print('汤普森采样算法的累积懊悔为:', thompson_sampling_solver.regret)
plot_results([thompson_sampling_solver], ["ThompsonSampling"])

通过实验我们得到以下结论:ϵ-贪心策略的累积懊悔是随时间线性增长的,而另外三种算法(ϵ衰减贪心算法、上置信界算法、汤普森采样算法)的累积懊悔都是次线性增长的(具体为对数形式增长)。

2,动态规划

2.1,理论基础

强化学习:基本概念,马尔可夫,贝尔曼方程,动态规划_燕双嘤的博客-CSDN博客1,强化学习1.1,基本概念强化学习起源于动物心理学的相关原理,模仿人类和动物学习的试错机制,是一种通过与环境交互,学习状态到行为的映射关系,以获得最大积累期望回报的方法。强化学习包含环境,动作和奖励三部分,其本质是 agent 通过与环境的交互,使得其作出的action所得到的决策得到的总的奖励达到最大,或者说是期望最大。DL/ML中的loss function目的是使预测值和真实值之间的差距最小,而RL中的loss function是是奖励和的期望最大。在机器学习范畴内,根据反馈的不同,学https://shao12138.blog.csdn.net/article/details/123408225#t14

动态规划(Dynamic Programming)是程序设计算法中非常重要的内容,能够高效解决一些经典问题,例如背包问题和最短路径规划。动态规划的基本思想是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到原问题的解。在动态规划中,我们会保存已解决的子问题的答案,而在求解目标问题过程中,如果需要这些子问题答案时,就可以直接利用,避免重复计算。

基于动态规划的强化学习算法主要有两种:一是策略迭代(Policy Iteration),二是价值迭代(Value Iteration)。其中,策略迭代有两部分组成:策略评估(Policy Evaluation)和策略提升(Policy Improvement)。具体来说,策略迭代中的策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数,这是一个动态规划的过程。而价值迭代直接使用贝尔曼最优方程来进行动态规划,得到最终的最优状态价值。

基于动态规划的两大强化学习算法要求我们能事先知道环境的状态转移函数和奖励函数。在这样一个白盒环境中,我们不需要通过智能体和环境的大量交互来学习,可以直接用动态规划求解状态价值。但是,现实中白盒环境很少,这也是动态规划算法的局限所在,我们无法将其运用到很多实际场景中。其次,我们介绍的策略迭代和价值迭代通常只适用于有限马尔可夫决策过程中,即状态空间和动作空间是有限的。

2.2,Cliff Walking环境

Cliff Walking 是一个非常经典的强化学习环境,它要求一个智能体从起点出发,避开悬崖行走,最终到达目标位置。如图所示,有一个  的网格世界,每一个网格是一个状态,起点是左下角的状态,目标是右下角的状态。智能体在每一个状态都可以采取 4 种动作:上,下,左,右,如果采取动作后触碰到边界墙壁则状态不发生改变,否则就会相应到达下一个状态。其中有一段悬崖,智能体到达目标状态或掉入悬崖都会结束并回到起点,也就是说它们是终止状态。每走一步的奖励是-1,掉入悬崖的奖励是-100。

import copyclass CliffWalkingEnv:""" Cliff Walking环境"""def __init__(self, ncol=12, nrow=4):self.ncol = ncol # 定义环境的宽self.nrow = nrow # 定义环境的高self.P = self.createP() # 转移矩阵P[state][action] = [(p, next_state, reward, done)],包含下一个状态和奖励def createP(self):P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)] # 初始化change = [[0, -1], [0, 1], [-1, 0], [1, 0]] # 4 种动作, 0:上, 1:下, 2:左, 3:右。原点(0,0)定义在左上角for i in range(self.nrow):for j in range(self.ncol):for a in range(4):if i == self.nrow - 1 and j > 0:  # 位置在悬崖或者终点,因为无法继续交互,任何动作奖励都为0P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]continue# 其他位置next_x = min(self.ncol - 1, max(0, j + change[a][0]))next_y = min(self.nrow - 1, max(0, i + change[a][1]))next_state = next_y * self.ncol + next_xreward = -1done = Falseif next_y == self.nrow - 1 and next_x > 0: # 下一个位置在悬崖或者终点done = Trueif next_x != self.ncol - 1: # 下一个位置在悬崖reward = -100P[i * self.ncol + j][a] = [(1, next_state, reward, done)]return P

策略迭代

class PolicyIteration:""" 策略迭代 """def __init__(self, env, theta, gamma):self.env = envself.v = [float(0.00000)] * self.env.ncol * self.env.nrow  # 初始化价值为0self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略self.theta = theta  # 策略评估收敛阈值self.gamma = gamma  # 折扣因子def policy_evaluation(self):  # 策略评估cnt = 1  # 计数器while 1:max_diff = 0new_v = [0] * self.env.ncol * self.env.nrowfor s in range(self.env.ncol * self.env.nrow):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += round(p * (r + self.gamma * self.v[next_state] * (1 - done)), 4)  # 比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘qsa_list.append(self.pi[s][a]*qsa)new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系max_diff = max(max_diff, abs(new_v[s] - self.v[s]))self.v[s] = sum(qsa_list)if max_diff < self.theta: breakcnt += 1print("状态价值:")for i in range(self.env.nrow):for j in range(self.env.ncol):print('%6.6s' % ('%.3f' % self.v[i * self.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符print()print("策略评估进行%d轮后完成" % cnt)def policy_improvement(self):  # 策略提升for s in range(self.env.nrow * self.env.ncol):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += p * (r + self.gamma * self.v[next_state] * (1 - done))qsa_list.append(qsa)maxq = max(qsa_list)cntq = qsa_list.count(maxq)self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]  # 让相同的动作价值均分概率print("策略提升完成")return self.pidef policy_iteration(self):  # 策略迭代while 1:self.policy_evaluation()old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较new_pi = self.policy_improvement()if old_pi == new_pi: break

现在我们已经写好了环境代码和策略迭代代码。为了更好的展现最终的策略,我们下面增加一个打印策略的函数,来打印当前策略每个状态下的价值以及会采取的动作。对于打印出来的动作,我们用"^o<o"表示等概率采取向左和向上两种动作,"ooo>"表示在当前状态只采取向右动作。

def print_agent(agent, action_meaning, disaster=[], end=[]):print("状态价值:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ') # 为了输出美观,保持输出6个字符print()print("策略:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):if (i * agent.env.ncol + j) in disaster: # 一些特殊的状态,例如Cliff Walking中的悬崖print('****', end=' ')elif (i * agent.env.ncol + j) in end: # 终点print('EEEE', end=' ')else:a = agent.pi[i * agent.env.ncol + j]pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])
策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值:
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000
策略:
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 

我们发现经过五次策略评估和策略提升的循环迭代,策略收敛了。此时获得的策略也已经打印出来了,我们可以用贝尔曼最优方程去检验每一个状态的价值,我们可以发现最终输出的策略的确是最优策略。

值迭代

class ValueIteration:""" 价值迭代 """def __init__(self, env, theta, gamma):self.env = envself.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0self.theta = theta  # 价值收敛阈值self.gamma = gammaself.pi = [None for i in range(self.env.ncol * self.env.nrow)] # 价值迭代结束后得到的策略def value_iteration(self):cnt = 0while 1:max_diff = 0new_v = [0] * self.env.ncol * self.env.nrowfor s in range(self.env.ncol * self.env.nrow):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += p * (r + self.gamma * self.v[next_state] * (1-done))qsa_list.append(qsa) # 这一行和下一行是和策略迭代的主要区别new_v[s] = max(qsa_list)max_diff = max(max_diff, abs(new_v[s] - self.v[s]))self.v = new_vif max_diff < self.theta: breakcnt += 1print("价值迭代一共进行%d轮" % cnt)self.get_policy()def get_policy(self): # 根据价值函数导出一个贪心策略for s in range(self.env.nrow * self.env.ncol):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += r + p * self.gamma * self.v[next_state] * (1-done)qsa_list.append(qsa)maxq = max(qsa_list)cntq = qsa_list.count(maxq)self.pi[s] = [1/cntq if q == maxq else 0 for q in qsa_list]  # 让相同的动作价值均分概率
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])
价值迭代一共进行14轮
状态价值:
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000
策略:
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 

价值迭代总共用了 14 轮而策略迭代中策略评估总共用了 263 轮,可以发现价值迭代中循环次数远少于策略迭代。

2.3,Frozen Lake环境

该环境的状态空间和动作空间是有限的,我们在 Frozen Lake 环境也尝试策略迭代和价值迭代算法。Frozen Lake 是 OpenAI Gym 库中的一个环境。OpenAl Gym 库中包含了很多有名环境,例如 Atari 和 MuJoCo,并且它支持我们定制自己的环境。具体来说,Frozen Lake 环境和 Cliff Walking 一样也是一个方格世界,大小为 。每一个方格是一个状态,起点状态(S)在左上角,终点状态(G)在右下角,中间还有若干冰洞(H)。在每一个状态都可以采取上下左右 4 个动作。由于是冰面,每次行走有一定的概率滑行到附近的其它状态,并且到达冰洞和终点会提前结束。每一步奖励是 0,到达终点奖励是 1。

import gym
env = gym.make("FrozenLake-v1")  # 创建环境
env = env.unwrapped  # 解封装才能访问状态转移矩阵Pholes = set()
ends = set()
for s in env.P:for a in env.P[s]:for s_ in env.P[s][a]:if s_[2] == 1.0: # 获得奖励为1,代表是终点ends.add(s_[1])if s_[3] == True:holes.add(s_[1])
holes = holes - ends
print("冰洞的索引:", holes)
print("终点的索引", ends)for a in env.P[14]:  # 查看终点左边一格的状态转移信息print(env.P[14][a])
冰洞的索引: {11, 12, 5, 7}
终点的索引 {15}
[(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
[(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
[(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]

首先我们发现冰洞的索引是 ,这说明在 Frozen Lake 这个环境中,原点(第一个状态)的定义是在左上角,这和 Cliff Walking 环境一样。其次,根据第 15 个状态(也即终点左边一格,数组下标索引为 14)的信息,我们可以看到每个动作都会等概率“滑行”到 3 种可能的结果,这和 Cliff Walking 环境是不一样的。

action_meaning = ['<', 'v', '>', '^'] # 这个动作意义是gym库中对FrozenLake这个环境事先规定好的
theta = 1e-5
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])
冰洞的索引: {11, 12, 5, 7}
终点的索引 {15}
策略评估进行25轮后完成
策略提升完成
策略评估进行58轮后完成
策略提升完成
状态价值:0.069  0.061  0.074  0.056 0.092  0.000  0.112  0.000 0.145  0.247  0.300  0.000 0.000  0.380  0.639  0.000
策略:
<ooo ooo^ <ooo ooo^
<ooo **** <o>o ****
ooo^ ovoo <ooo ****
**** oo>o ovoo EEEE
[(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
[(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
[(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]

这个最优策略很看上去比较反直觉,但其实原因是因为这是一个会随机滑向其他状态的冰冻湖面。例如在终点左侧的状态,如果智能体采取向右的动作,它有可能会滑到冰洞,所以此时采取向下的动作是更为保险的,并且有一定概率能够滑到终点。然后我们来试一下价值迭代:

action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])
冰洞的索引: {11, 12, 5, 7}
终点的索引 {15}
[(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
[(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
[(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]
价值迭代一共进行60轮
状态价值:0.069  0.061  0.074  0.056 0.092  0.000  0.112  0.000 0.145  0.247  0.300  0.000 0.000  0.380  0.639  0.000
策略:
<ooo ooo^ <ooo ooo^
<ooo **** <o>o ****
ooo^ ovoo <ooo ****
**** oo>o ovoo EEEE 

2.4,寻宝环境

策略迭代

class CliffWalkingEnv:def __init__(self, ncol, nrow, end=()):self.ncol = ncol  # 定义环境的宽self.nrow = nrow  # 定义环境的高self.end = end  # 终点self.P = self.createP()  # 转移矩阵P[state][action] = [(p, next_state, reward, done)],包含下一个状态和奖励def createP(self):P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]  # 初始化#原点(0,0)定义在左上角for i in range(self.nrow):for j in range(self.ncol):for a in range(4):if (j, i) == self.end:  # 位置在或者终点,因为无法继续交互,任何动作奖励都为0P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]continue# 其他位置next_x=jnext_y=iif a == 0:next_x -= 1  # leftelif a == 1:next_x += 1  # rightelif a == 2:next_y += 1  # downelif a == 3:next_y -= 1  # upif next_x < 0:next_x = 0if next_x >= self.nrow:next_x = self.nrow - 1if next_y < 0:next_y = 0if next_y >= self.ncol:next_y = self.ncol - 1next_state = next_y * self.ncol + next_xreward = -1done = Falseif (j, i) == self.end:  # Endreward = 0done = TrueP[i * self.ncol + j][a] = [(1, next_state, reward, done)]return P
import copy
from environment import CliffWalkingEnvclass PolicyIteration:""" 策略迭代 """def __init__(self, env, theta, gamma):self.env = envself.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略self.theta = theta  # 策略评估收敛阈值self.gamma = gamma  # 折扣因子def policy_evaluation(self):  # 策略评估cnt = 1  # 计数器while 1:max_diff = 0new_v = [0] * self.env.ncol * self.env.nrowfor s in range(self.env.ncol * self.env.nrow):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += self.pi[s][a] * p * (r + self.gamma * self.v[next_state] * (1 - done)) # 本章节环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘qsa_list.append(qsa)new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系max_diff = max(max_diff, abs(new_v[s] - self.v[s]))self.v[s] = sum(qsa_list)# print("状态价值:", cnt)# for i in range(self.env.nrow):#     for j in range(self.env.ncol):#         print('%6.6s' % ('%.3f' % self.v[i * self.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符#     print()if max_diff < self.theta: breakcnt += 1print("策略评估进行%d轮后完成" % cnt)def policy_improvement(self):  # 策略提升for s in range(self.env.nrow * self.env.ncol):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += p * (r + self.gamma * self.v[next_state] * (1 - done))qsa_list.append(qsa)maxq = max(qsa_list)cntq = qsa_list.count(maxq)self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]  # 让相同的动作价值均分概率print("策略提升完成")return self.pidef policy_iteration(self):  # 策略迭代while 1:self.policy_evaluation()old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较new_pi = self.policy_improvement()if old_pi == new_pi: breakdef print_agent(agent, action_meaning, disaster=[], end=[]):print("状态价值:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符print()print("策略:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):if (i * agent.env.ncol + j) in disaster:  # 一些特殊的状态,例如Cliff Walking中的悬崖print('****', end=' ')elif (i * agent.env.ncol + j) in end:  # 终点print('EEEE', end=' ')else:a = agent.pi[i * agent.env.ncol + j]pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()env = CliffWalkingEnv(ncol=5, nrow=5,  end=(3, 1))  # 横坐标3,纵坐标1
action_meaning = ['<', '>', 'v','^']
theta = 0.00001
gamma = 1
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [], [8])  # 5, 7, 11, 12
===========================================================
策略评估进行338轮后完成
策略提升完成
策略评估进行5轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值:
-4.000 -3.000 -2.000 -1.000 -2.000
-3.000 -2.000 -1.000  0.000 -1.000
-4.000 -3.000 -2.000 -1.000 -2.000
-5.000 -4.000 -3.000 -2.000 -3.000
-6.000 -5.000 -4.000 -3.000 -4.000
策略:
o>vo o>vo o>vo oovo <ovo
o>oo o>oo o>oo EEEE <ooo
o>o^ o>o^ o>o^ ooo^ <oo^
o>o^ o>o^ o>o^ ooo^ <oo^
o>o^ o>o^ o>o^ ooo^ <oo^ 

值迭代

import copy
from environment import CliffWalkingEnvclass ValueIteration:""" 价值迭代 """def __init__(self, env, theta, gamma):self.env = envself.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0self.theta = theta  # 价值收敛阈值self.gamma = gammaself.pi = [None for i in range(self.env.ncol * self.env.nrow)]  # 价值迭代结束后得到的策略def value_iteration(self):cnt = 0while 1:max_diff = 0new_v = [0] * self.env.ncol * self.env.nrowfor s in range(self.env.ncol * self.env.nrow):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += p * (r + self.gamma * self.v[next_state] * (1 - done))qsa_list.append(qsa)  # 这一行和下一行是和策略迭代的主要区别new_v[s] = max(qsa_list)max_diff = max(max_diff, abs(new_v[s] - self.v[s]))self.v = new_vif max_diff < self.theta: breakcnt += 1print("价值迭代一共进行%d轮" % cnt)self.get_policy()def get_policy(self):  # 根据价值函数导出一个贪心策略for s in range(self.env.nrow * self.env.ncol):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += r + p * self.gamma * self.v[next_state] * (1 - done)qsa_list.append(qsa)maxq = max(qsa_list)cntq = qsa_list.count(maxq)self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]  # 让相同的动作价值均分概率
def print_agent(agent, action_meaning, disaster=[], end=[]):print("状态价值:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符print()print("策略:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):if (i * agent.env.ncol + j) in disaster:  # 一些特殊的状态,例如Cliff Walking中的悬崖print('****', end=' ')elif (i * agent.env.ncol + j) in end:  # 终点print('EEEE', end=' ')else:a = agent.pi[i * agent.env.ncol + j]pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()env = CliffWalkingEnv(ncol=5, nrow=5,  end=(3, 1))  # 横坐标3,纵坐标1
action_meaning = ['<', '>', 'v','^']
theta = 0.00001
gamma = 1
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, [], [8])  # 5, 7, 11, 12
===========================================================
价值迭代一共进行6轮
状态价值:
-4.000 -3.000 -2.000 -1.000 -2.000
-3.000 -2.000 -1.000  0.000 -1.000
-4.000 -3.000 -2.000 -1.000 -2.000
-5.000 -4.000 -3.000 -2.000 -3.000
-6.000 -5.000 -4.000 -3.000 -4.000
策略:
o>vo o>vo o>vo oovo <ovo
o>oo o>oo o>oo EEEE <ooo
o>o^ o>o^ o>o^ ooo^ <oo^
o>o^ o>o^ o>o^ ooo^ <oo^
o>o^ o>o^ o>o^ ooo^ <oo^ 

2.5,寻宝环境(障碍,仿Frozen Lake )

class CliffWalkingEnv:def __init__(self, ncol, nrow, block=[], end=()):self.ncol = ncol  # 定义环境的宽self.nrow = nrow  # 定义环境的高self.end = end  # 终点self.block = block #障碍self.P = self.createP()  # 转移矩阵P[state][action] = [(p, next_state, reward, done)],包含下一个状态和奖励def createP(self):P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]  # 初始化# 原点(0,0)定义在左上角for i in range(self.nrow):for j in range(self.ncol):for a in range(4):if (j, i) == self.end:  # 位置在或者终点,因为无法继续交互,任何动作奖励都为0P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]continue# 其他位置next_x = jnext_y = iif a == 0:next_x -= 1  # leftelif a == 1:next_x += 1  # rightelif a == 2:next_y += 1  # downelif a == 3:next_y -= 1  # upif next_x < 0:next_x = 0if next_x >= self.nrow:next_x = self.nrow - 1if next_y < 0:next_y = 0if next_y >= self.ncol:next_y = self.ncol - 1next_state = next_y * self.ncol + next_xreward = -1done = Falseif (j, i) == self.end:  # Endreward = 0done = Trueelif (j,i) in self.block:reward = -10P[i * self.ncol + j][a] = [(1, next_state, reward, done)]return P
import copy
from environment import CliffWalkingEnvclass PolicyIteration:""" 策略迭代 """def __init__(self, env, theta, gamma):self.env = envself.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略self.theta = theta  # 策略评估收敛阈值self.gamma = gamma  # 折扣因子def policy_evaluation(self):  # 策略评估cnt = 1  # 计数器while 1:max_diff = 0new_v = [0] * self.env.ncol * self.env.nrowfor s in range(self.env.ncol * self.env.nrow):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += self.pi[s][a] * p * (r + self.gamma * self.v[next_state] * (1 - done)) # 本章节环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘qsa_list.append(qsa)new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系max_diff = max(max_diff, abs(new_v[s] - self.v[s]))self.v[s] = sum(qsa_list)# print("状态价值:", cnt)# for i in range(self.env.nrow):#     for j in range(self.env.ncol):#         print('%6.6s' % ('%.3f' % self.v[i * self.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符#     print()if max_diff < self.theta: breakcnt += 1print("策略评估进行%d轮后完成" % cnt)def policy_improvement(self):  # 策略提升for s in range(self.env.nrow * self.env.ncol):qsa_list = []for a in range(4):qsa = 0for res in self.env.P[s][a]:p, next_state, r, done = resqsa += p * (r + self.gamma * self.v[next_state] * (1 - done))qsa_list.append(qsa)maxq = max(qsa_list)cntq = qsa_list.count(maxq)self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]  # 让相同的动作价值均分概率print("策略提升完成")return self.pidef policy_iteration(self):  # 策略迭代while 1:self.policy_evaluation()old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较new_pi = self.policy_improvement()if old_pi == new_pi: breakdef print_agent(agent, action_meaning, disaster=[], end=[]):print("状态价值:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')  # 为了输出美观,保持输出6个字符print()print("策略:")for i in range(agent.env.nrow):for j in range(agent.env.ncol):if (i * agent.env.ncol + j) in disaster:  # 一些特殊的状态,例如Cliff Walking中的悬崖print('****', end=' ')elif (i * agent.env.ncol + j) in end:  # 终点print('EEEE', end=' ')else:a = agent.pi[i * agent.env.ncol + j]pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()env = CliffWalkingEnv(ncol=4, nrow=4, block=[(0,3),(1,1),(3,1),(3,2)], end=(3, 3))  # 横坐标3,纵坐标1
action_meaning = ['<', '>', 'v','^']
theta = 0.00001
gamma = 1
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])  # 5, 7, 11, 12
=========================================================================
策略评估进行421轮后完成
策略提升完成
策略评估进行7轮后完成
策略提升完成
策略评估进行2轮后完成
策略提升完成
状态价值:
-6.000 -5.000 -4.000 -5.000
-5.000 -13.00 -3.000 -13.00
-4.000 -3.000 -2.000 -10.00
-12.00 -2.000 -1.000  0.000
策略:
o>vo o>oo oovo <ooo
oovo **** oovo ****
o>oo o>vo oovo ****
**** o>oo o>oo EEEE 

3,时序差分

3.1,Sarsa算法

我们仍然在 Cliff Walking 环境下来尝试一下 Sarsa 算法。首先我们先来看一下 Cliff Walking 环境的代码,这份环境代码和动态规划中的不一样,因为此时环境不需要提供奖励函数和状态转移函数,而是需要提供和智能体进行交互的函数 step,该函数将智能体的动作作为输入,输出奖励和下一个状态给智能体。

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm # tqdm是显示循环进度条的库class CliffWalkingEnv:def __init__(self, ncol, nrow):self.nrow = nrowself.ncol = ncolself.x = 0 # 记录当前智能体位置的横坐标self.y = self.nrow - 1 # 记录当前智能体位置的纵坐标def step(self, action): # 外部调用这个函数来让当前位置改变change = [[0, -1], [0, 1], [-1, 0], [1, 0]] # 4 种动作, 0:上, 1:下, 2:左, 3:右。原点(0,0)定义在左上角self.x = min(self.ncol - 1, max(0, self.x + change[action][0]))self.y = min(self.nrow - 1, max(0, self.y + change[action][1]))next_state = self.y * self.ncol + self.xreward = -1done = Falseif self.y == self.nrow - 1 and self.x > 0: # 下一个位置在悬崖或者终点done = Trueif self.x != self.ncol - 1:reward = -100return next_state, reward, donedef reset(self): # 回归初始状态,坐标轴原点在左上角self.x = 0self.y = self.nrow - 1return self.y * self.ncol + self.x

然后我们来实现 Sarsa 算法,主要维护一个表格 Q_table 用来储存当前策略下所有状态动作对的价值,在用 Sarsa 算法和环境交互时,用 ϵ-greedy 策略进行采样,在更新 Sarsa 算法时,使用时序差分的公式。我们默认终止状态时所有动作的价值都是 0,这些价值在初始化为 0 后就不会进行更新。

class Sarsa:""" Sarsa算法 """def __init__(self, ncol, nrow, epsilon, alpha, gamma, n_action=4):self.Q_table = np.zeros([nrow * ncol, n_action]) # 初始化Q(s,a)表格self.n_action = n_action # 动作个数self.alpha = alpha # 学习率self.gamma = gamma # 折扣因子self.epsilon = epsilon # epsilon-greedy策略中的参数def take_action(self, state): #选取下一步的操作if np.random.random() < self.epsilon:action = np.random.randint(self.n_action)else:action = np.argmax(self.Q_table[state])return actiondef best_action(self, state): # 用于打印策略Q_max = np.max(self.Q_table[state])a = [0 for _ in range(self.n_action)]for i in range(self.n_action): # 若两个动作的价值一样,都会记录下来if self.Q_table[state, i] == Q_max:a[i] = 1return adef update(self, s0, a0, r, s1, a1):td_error = r + self.gamma * self.Q_table[s1, a1] - self.Q_table[s0, a0]self.Q_table[s0, a0] += self.alpha * td_error
ncol = 12
nrow = 4
env = CliffWalkingEnv(ncol, nrow)
np.random.seed(0)
epsilon = 0.1
alpha = 0.1
gamma = 0.9
agent = Sarsa(ncol, nrow, epsilon, alpha, gamma)
num_episodes = 500 # 智能体在环境中运行多少条序列return_list = [] # 记录每一条序列的回报
for i in range(10): # 显示10个进度条with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar: # tqdm的进度条功能for i_episode in range(int(num_episodes/10)): # 每个进度条的序列数episode_return = 0state = env.reset()action = agent.take_action(state)done = Falsewhile not done:next_state, reward, done = env.step(action)next_action = agent.take_action(next_state)episode_return += reward # 这里回报的计算不进行折扣因子衰减agent.update(state, action, reward, next_state, next_action)state = next_stateaction = next_actionreturn_list.append(episode_return)if (i_episode+1) % 10 == 0: # 每10条序列打印一下这10条序列的平均回报pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Sarsa on {}'.format('Cliff Walking'))
plt.show()
Iteration 0: 100%|██████████| 50/50 [00:00<00:00, 335.12it/s, episode=50, return=-119.400]
Iteration 1: 100%|██████████| 50/50 [00:00<00:00, 602.44it/s, episode=100, return=-63.000]
Iteration 2: 100%|██████████| 50/50 [00:00<00:00, 1615.70it/s, episode=150, return=-51.200]
Iteration 3: 100%|██████████| 50/50 [00:00<00:00, 2177.05it/s, episode=200, return=-48.100]
Iteration 4: 100%|██████████| 50/50 [00:00<00:00, 2007.61it/s, episode=250, return=-35.700]
Iteration 5: 100%|██████████| 50/50 [00:00<00:00, 3121.74it/s, episode=300, return=-29.900]
Iteration 6: 100%|██████████| 50/50 [00:00<00:00, 3132.93it/s, episode=350, return=-28.300]
Iteration 7: 100%|██████████| 50/50 [00:00<00:00, 3311.73it/s, episode=400, return=-27.700]
Iteration 8: 100%|██████████| 50/50 [00:00<00:00, 2761.30it/s, episode=450, return=-28.500]
Iteration 9: 100%|██████████| 50/50 [00:00<00:00, 3127.60it/s, episode=500, return=-18.900]

我们发现 Sarsa 算法随着训练,获得的回报越来越高。在进行 500 条序列的学习后,可以获得负二十左右的回报,此时已经非常接近最优策略了。然后我们看一下 Sarsa 算法得到的策略在各个状态下会采取怎么样的动作。

def print_agent(agent, env, action_meaning, disaster=[], end=[]):for i in range(env.nrow):for j in range(env.ncol):if (i * env.ncol + j) in disaster:print('****', end=' ')elif (i * env.ncol + j) in end:print('EEEE', end=' ')else:a = agent.best_action(i * env.ncol + j)pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()action_meaning = ['^', 'v', '<', '>']
print('Sarsa算法最终收敛得到的策略为:')
print_agent(agent, env, action_meaning, list(range(37, 47)), [47])
Sarsa算法最终收敛得到的策略为:
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo ooo> ^ooo ooo> ooo> ooo> ooo> ^ooo ^ooo ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 

发现 Sarsa 算法会采取比较远离悬崖的策略来抵达终点。

宝藏环境

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm  # tqdm是显示循环进度条的库class CliffWalkingEnv:def __init__(self, ncol, nrow, end):self.nrow = nrowself.ncol = ncolself.x = 0  # 记录当前智能体位置的横坐标self.y = 0  # 记录当前智能体位置的纵坐标self.end = enddef step(self, action):  # 外部调用这个函数来让当前位置改变if action == 0:self.x -= 1  # leftelif action == 1:self.x += 1  # rightelif action == 2:self.y += 1  # downelif action == 3:self.y -= 1  # upreward = -1if self.x < 0:self.x = 0#reward = -2if self.x >= self.nrow:self.x = self.nrow - 1#reward = -2if self.y  < 0:self.y  = 0#reward = -2if self.y  >= self.ncol:self.y  = self.ncol - 1#reward = -2next_state = self.y  * self.ncol + self.xdone = False#print(self.x,self.y,self.end)if (self.x, self.y) == self.end:  # Endreward = 0done = Truereturn next_state, reward, donedef reset(self):  # 回归初始状态,坐标轴原点在左上角self.x = 0self.y = 0return self.y * self.ncol + self.xclass Sarsa:""" Sarsa算法 """def __init__(self, ncol, nrow, epsilon, alpha, gamma, n_action=4):self.Q_table = np.zeros([nrow * ncol, n_action])  # 初始化Q(s,a)表格self.n_action = n_action  # 动作个数self.alpha = alpha  # 学习率self.gamma = gamma  # 折扣因子self.epsilon = epsilon  # epsilon-greedy策略中的参数def take_action(self, state):  # 选取下一步的操作if np.random.random() < self.epsilon:action = np.random.randint(self.n_action)else:action = np.argmax(self.Q_table[state])return actiondef best_action(self, state):  # 用于打印策略Q_max = np.max(self.Q_table[state])a = [0 for _ in range(self.n_action)]for i in range(self.n_action):  # 若两个动作的价值一样,都会记录下来if self.Q_table[state, i] == Q_max:a[i] = 1return adef update(self, s0, a0, r, s1, a1):td_error = r + self.gamma * self.Q_table[s1, a1] - self.Q_table[s0, a0]self.Q_table[s0, a0] += self.alpha * td_errorncol = 5
nrow = 5
end = (3,1)
env = CliffWalkingEnv(ncol, nrow,end)
np.random.seed(0)
epsilon = 0.1
alpha = 0.1
gamma = 0.9
agent = Sarsa(ncol, nrow, epsilon, alpha, gamma)
num_episodes = 5000  # 智能体在环境中运行多少条序列return_list = []  # 记录每一条序列的回报
for i in range(10):  # 显示10个进度条with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:  # tqdm的进度条功能for i_episode in range(int(num_episodes / 10)):  # 每个进度条的序列数episode_return = 0state = env.reset()action = agent.take_action(state)done = Falsewhile not done:next_state, reward, done = env.step(action)next_action = agent.take_action(next_state)episode_return += reward  # 这里回报的计算不进行折扣因子衰减agent.update(state, action, reward, next_state, next_action)state = next_stateaction = next_actionreturn_list.append(episode_return)if (i_episode + 1) % 10 == 0:  # 每10条序列打印一下这10条序列的平均回报pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1),'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)
print(max(return_list))
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Sarsa on {}'.format('Cliff Walking'))
plt.show()def print_agent(agent, env, action_meaning, disaster=[], end=[]):for i in range(env.nrow):for j in range(env.ncol):if (i * env.ncol + j) in disaster:print('****', end=' ')elif (i * env.ncol + j) in end:print('EEEE', end=' ')else:a = agent.best_action(i * env.ncol + j)pi_str = ''for k in range(len(action_meaning)):pi_str += action_meaning[k] if a[k] > 0 else 'o'print(pi_str, end=' ')print()action_meaning = [ '<', '>','v','^', ]
print('Sarsa算法最终收敛得到的策略为:')
print_agent(agent, env, action_meaning,[], [8])

3.2,多步Sarsa算法

class nstep_Sarsa:""" n步Sarsa算法 """def __init__(self, n, ncol, nrow, epsilon, alpha, gamma, n_action=4):self.Q_table = np.zeros([nrow * ncol, n_action])self.n_action = n_actionself.alpha = alphaself.gamma = gammaself.epsilon = epsilonself.n = n # 采用n步Sarsaself.state_list = [] # 保存之前的状态self.action_list = [] # 保存之前的动作self.reward_list = [] # 保存之前的奖励def take_action(self, state):if np.random.random() < self.epsilon:action = np.random.randint(self.n_action)else:action = np.argmax(self.Q_table[state])return actiondef best_action(self, state): # 用于打印策略Q_max = np.max(self.Q_table[state])a = [0 for _ in range(self.n_action)]for i in range(self.n_action):if self.Q_table[state, i] == Q_max:a[i] = 1return adef update(self, s0, a0, r, s1, a1, done):self.state_list.append(s0)self.action_list.append(a0)self.reward_list.append(r)if len(self.state_list) == self.n: # 若保存的数据可以进行n步更新G = self.Q_table[s1, a1] # 得到Q(s_{t+n}, a_{t+n})for i in reversed(range(self.n)):G = self.gamma * G + self.reward_list[i] # 不断向前计算每一步的回报if done and i > 0: # 如果到达终止状态,最后几步虽然长度不够n步,我们也对其进行更新s = self.state_list[i]a = self.action_list[i]self.Q_table[s, a] += self.alpha * (G - self.Q_table[s, a])s = self.state_list.pop(0) # 需要更新的状态动作从列表中删除,下次不必更新a = self.action_list.pop(0)self.reward_list.pop(0)self.Q_table[s, a] += self.alpha * (G - self.Q_table[s, a]) # n-step sarsa的主要更新步骤if done: # 如果到达终止状态,即将开始下一条序列,则将列表全清空self.state_list = []self.action_list = []self.reward_list = []
ncol = 12
nrow = 4
env = CliffWalkingEnv(ncol, nrow)
np.random.seed(0)
n_step = 5 # 5步Sarsa算法
alpha = 0.1
epsilon = 0.1
gamma = 0.9
agent = nstep_Sarsa(n_step, ncol, nrow, epsilon, alpha, gamma)
num_episodes = 500 # 智能体在环境中运行多少条序列return_list = [] # 记录每一条序列的回报
for i in range(10): # 显示10个进度条with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar: # tqdm的进度条功能for i_episode in range(int(num_episodes/10)): # 每个进度条的序列数episode_return = 0state = env.reset()action = agent.take_action(state)done = Falsewhile not done:next_state, reward, done = env.step(action)next_action = agent.take_action(next_state)episode_return += reward # 这里回报的计算不进行折扣因子衰减agent.update(state, action, reward, next_state, next_action, done)state = next_stateaction = next_actionreturn_list.append(episode_return)if (i_episode+1) % 10 == 0: # 每10条序列打印一下这10条序列的平均回报pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('5-step Sarsa on {}'.format('Cliff Walking'))
plt.show()

通过实验结果我们发现,5 步 Sarsa 算法的收敛速度比单步 Sarsa 更快。我们来看一下此时的策略表现。

action_meaning = ['^', 'v', '<', '>']
print('5步Sarsa算法最终收敛得到的策略为:')
print_agent(agent, env, action_meaning, list(range(37, 47)), [47])
5步Sarsa算法最终收敛得到的策略为:
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo ^ooo ^ooo oo<o ^ooo ^ooo ^ooo ^ooo ooo> ooo> ^ooo ovoo
ooo> ^ooo ^ooo ^ooo ^ooo ^ooo ^ooo ooo> ooo> ^ooo ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 

我们发现此时多步 Sarsa 算法得到的策略会在最远离悬崖的一边行走,保证最大的安全性。

3.3,Q-Learning算法

class QLearning:""" Q-learning算法 """def __init__(self, ncol, nrow, epsilon, alpha, gamma, n_action=4):self.Q_table = np.zeros([nrow * ncol, n_action]) # 初始化Q(s,a)表格self.n_action = n_action # 动作个数self.alpha = alpha # 学习率self.gamma = gamma # 折扣因子self.epsilon = epsilon # epsilon-greedy策略中的参数def take_action(self, state): #选取下一步的操作if np.random.random() < self.epsilon:action = np.random.randint(self.n_action)else:action = np.argmax(self.Q_table[state])return actiondef best_action(self, state): # 用于打印策略Q_max = np.max(self.Q_table[state])a = [0 for _ in range(self.n_action)]for i in range(self.n_action):if self.Q_table[state, i] == Q_max:a[i] = 1return adef update(self, s0, a0, r, s1):td_error = r + self.gamma * self.Q_table[s1].max() - self.Q_table[s0, a0]self.Q_table[s0, a0] += self.alpha * td_error
ncol = 12
nrow = 4
env = CliffWalkingEnv(ncol, nrow)
np.random.seed(0)
epsilon = 0.1
alpha = 0.1
gamma = 0.9
agent = QLearning(ncol, nrow, epsilon, alpha, gamma)
num_episodes = 500 # 智能体在环境中运行多少条序列return_list = [] # 记录每一条序列的回报
for i in range(10): # 显示10个进度条with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar: # tqdm的进度条功能for i_episode in range(int(num_episodes/10)): # 每个进度条的序列数episode_return = 0state = env.reset()done = Falsewhile not done:action = agent.take_action(state)next_state, reward, done = env.step(action)episode_return += reward # 这里回报的计算不进行折扣因子衰减agent.update(state, action, reward, next_state)state = next_statereturn_list.append(episode_return)if (i_episode+1) % 10 == 0: # 每10条序列打印一下这10条序列的平均回报pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Q-learning on {}'.format('Cliff Walking'))
plt.show()action_meaning = ['^', 'v', '<', '>']
print('Q-learning算法最终收敛得到的策略为:')
print_agent(agent, env, action_meaning, list(range(37, 47)), [47])
Q-learning算法最终收敛得到的策略为:
^ooo ovoo ovoo ^ooo ^ooo ovoo ooo> ^ooo ^ooo ooo> ooo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ^ooo ooo> ooo> ooo> ooo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 

需要注意的是,打印出来的回报是行为策略在环境中交互得到的,而不是 Q-learning 算法在学习的目标策略的真实回报。把目标策略的行为打印出来后,发现其更偏向于走在悬崖边上,这与 Sarsa 算法得到的比较保守的策略相比是更优的。

强化学习(实践):多臂老虎机,动态规划,时序差分相关推荐

  1. ML之RL:基于MovieLens电影评分数据集利用强化学习算法(多臂老虎机+EpsilonGreedy策略)实现对用户进行Top电影推荐案例

    ML之RL:基于MovieLens电影评分数据集利用强化学习算法(多臂老虎机+EpsilonGreedy策略)实现对用户进行Top电影推荐案例 目录 基于MovieLens电影评分数据集利用强化学习算 ...

  2. 【科普】强化学习之多臂老虎机问题(bandit算法:softmax,random,UCB)

    本博客上的文章分为两类:一类是科普性文章,以通俗易懂的语言风格介绍专业性的概念及其应用场景(公式极少或没有),适合入门阶段.另一类是专业性文章,在科普阶段上做出详细的专业性数学推导,深刻理解其概念的内 ...

  3. 强化学习——day31 多臂老虎机MAB的代码实现(Python)

    多臂老虎机MAB的代码实现 2.3 算法基本框架搭建 2.4 epsilon贪心算法 2.4.1 参数为0.01的绘图 2.4.2 不同的参数 2.4.3 值随时间衰减的 epsilon-贪婪算法 2 ...

  4. 求助-强化学习基础-K-摇臂老虎机Python

    按照周志华西瓜书第16章K-摇臂赌博机的伪码编的程序: # -*- coding: utf-8 -*- """ e贪心和Softmax 2-摇臂赌博机 摇臂1:0.4概率 ...

  5. 强化学习——day12 多臂老虎机问题MAB

    在多臂老虎机(multi-armed bandit,MAB)问题(见图 2-1)中,有一个拥有 根拉杆的老虎机,拉动每一根拉杆都对应一个关于奖励的概率分布 .我们每次拉动其中一根拉杆,就可以从该拉杆对 ...

  6. 【强化学习】多臂老虎机——E_greedy、UCB、Gradient Bandit 算法 代码实现

    多臂老虎机 import numpy as np import matplotlib.pyplot as pltclass E_greedy:def __init__(self,arm_num=10, ...

  7. 强化学习之多臂老虎机(Multi-Armed-Bandit)问题

    一.问题背景 假设有一个老虎机有 nnn 个握把,每个握把 i" role="presentation" style="position: relative;& ...

  8. 【赠书】掌握人工智能重要主题,深度强化学习实践书籍推荐

    ‍‍ 今天要给大家介绍的书是深度强化学习实践的第二版,本书的主题是强化学习(Reinforcement Learning,RL),它是机器学习(Machine Learning,ML)的一个分支,强调 ...

  9. 动手学强化学习(三):动态规划算法 (Dynamic Programming)

    动手学强化学习(三):动态规划算法 (Dynamic Programming) 1. 简介 2. 悬崖漫步环境 3. 策略迭代算法 3.1 策略评估 3.2 策略提升 3.3 策略迭代算法 4.价值迭 ...

  10. 强化学习实践:DDQN—LunarLander月球登入初探

    强化学习实践:DDQN-月球登入LunarLander初探 算法DDQN 实践 环境准备 GYM及PARL+paddle parl的框架结构 agent构建 搭建神经网络 replay_memory经 ...

最新文章

  1. 【 MATLAB 】通过案例学会编写一个 matlab 函数(小猫掉进山洞问题)
  2. Java中的继承和接口
  3. 微信营销这么做,你就成功了 转载
  4. 什么是事务、事务特性、事务隔离级别、spring事务传播特性
  5. 如果你需要在 XHTML 中声明 DOCTYPE,必须使用到jsp:text动作元素
  6. aspose 转pdf表格大小乱了_自己写了一个小工具类:pdf转word,没有页数和大小限制,保真!...
  7. 图解TCPIP-以太网(物理层)
  8. UVA-572-搜索基础题
  9. 商业认知,近期与部分中小创业者一起吃饭,忽然有人谈到现在创业越来越难
  10. int与string互转
  11. 主机overlay和网络overlay_边缘计算中kubernetes网络能大一统吗?
  12. Excel VBA编程教程--excel录制宏做数据录入
  13. 琳琳冒险岛家族任务指南
  14. 二阶系统级联_一种高二阶级联结构Sigma-Delta调制器系统的制作方法
  15. BIOS的全局变量gST gBS gDS
  16. 特征值和特征向量的作用
  17. pdf阅读器,pdf阅读器大全,最好pdf阅读器排行,pdf阅读器下载
  18. ESP8266连接中国移动ONENET物联网平台TCP透传实现WIFI远程控制
  19. Latex/CTex/WinEdt 期刊双栏排版图表中英文标题走过的那些坑
  20. 亚马逊(Amazon)新接口SP-API和PII受限信息的开发者申请

热门文章

  1. Centos7+DockerCompose部署.NetCore3.1应用
  2. deepin php与nginx,深度Deepin20安装Nginx1.19+Php7.3+H5ai实践指南
  3. matlab rms数据滤波,与RMS相关的5个信息,如何通过RMS结果滤波来提高精度
  4. 【强大图片滤镜插件集】Nik Collection 2 by DxO for Mac 2019
  5. linux直接点击iso安装win10,iso安装win10,win10iso直接解压安装
  6. 注册测绘师-大地测量与平差-衡量精度的标准
  7. Vue 记录一次安装插件引起的项目崩溃(This is probably not a problem with npm,there is likely additional logging outp)
  8. css3 呼吸的莲花_心肾呼吸法—莲花能量冥想*
  9. 机票预订系统的数据流程图及实体联系图
  10. win10更新完提示未安装任何音频输出设备2019-11-13解决