本文章中使用的算法和例子来源于bilibili中西湖大学赵世钰老师的【强化学习的数学原理】课程。网址:第5课-蒙特卡洛方法(MC Basic算法例子)_哔哩哔哩_bilibili
目录
一、任务目标
二、细节分析
三、代码演示
一、任务目标
1、初始的策略已给出
2、使用MC Basic算法去更新策略
3、出界回馈为-1,禁止区域回馈为-10,目标区域回馈为1,折扣累计奖赏参数为0.9
二、细节分析
1、为达到步数动态调整的效果,使用递归函数get_expected_reward()来完成。
2、迭代的终止条件为,每一次调整的策略和未调整前的策略一样,即策略未更新。
三、代码演示
import numpy as npclass Behrman:# 属性n, m = None, None # 棋盘的行数和列数xx, yy = [-1, 0, 1, 0, 0], [0, 1, 0, -1, 0] # 五种行动(上、右、下、左、静止)s = "↑→↓←O" # 用于显示的行动符号_lambda = 0.9 # 折扣因子# 矩阵chessboard = [["***"],["**#"],["#*x"]]policy = [[1, 3, 2],[2, 3, 3],[2, 2, 0]]Punishment = Nonedef __init__(self):self.n, self.m = len(self.chessboard), len(self.chessboard[0][0])self.init_matrix()def init_matrix(self):"""初始化棋盘矩阵、奖惩值矩阵、状态和策略矩阵。"""self.chessboard = [list(row[0]) for row in self.chessboard]self.states = [[0] * self.m for _ in range(self.n)]self.Punishment = [[0 if cell == "*" else (-10 if cell == "#" else 1)for cell in row] for row in self.chessboard]def next_point(self, x, y, i):"""计算下一个位置的坐标,如果越界返回-1。"""wall = 0nx, ny = x + self.xx[i], y + self.yy[i]if 0 <= nx < self.n and 0 <= ny < self.m:return nx, ny, wallelse:wall = 1return x, y, walldef action(self, x, y, i):"""返回给定行动的奖惩值。"""next_pos = self.next_point(x, y, i)if next_pos[2] == 1:return -1else:return self.Punishment[next_pos[0]][next_pos[1]]def get_expected_reward(self, x, y, action, step_count):"""递归计算前 `step_count` 步的累计奖励值。"""if step_count == 0:return 0next_pos = self.next_point(x, y, action)immediate_reward = self.action(x, y, action)future_reward = self._lambda * self.get_expected_reward(next_pos[0], next_pos[1], self.policy[next_pos[0]][next_pos[1]], step_count - 1)return immediate_reward + future_rewarddef get_MCpolicy(self, step_count=3):"""动态步数策略更新函数。"""new_policy = [[0 for _ in range(self.m)] for _ in range(self.n)]for x in range(self.n):for y in range(self.m):best_action = max(range(5), key=lambda i:self.get_expected_reward(x, y, i, step_count))new_policy[x][y] = best_actionreturn new_policydef show_policy(self):"""显示最终策略矩阵。"""policy_display = "\n".join("\t".join(self.s[self.policy[x][y]] for y in range(self.m)) for x in range(self.n))print(policy_display)def run_policy_iteration(self, step_count=2):"""蒙特卡洛算法,带有动态步数的策略迭代。"""count = 0flag = 1while flag:count += 1new_policy = self.get_MCpolicy(step_count)if np.array_equal(new_policy, self.policy):flag = 0else:for x in range(self.n):for y in range(self.m):self.policy[x][y] = new_policy[x][y]print(f"策略迭代算法收敛,共迭代{count}次")self.show_policy()if __name__ == '__main__':behrman = Behrman()behrman.run_policy_iteration(step_count=3) # 可以调整step_count以选择不同步数