基于DQN的机器人自动走迷宫(2)

github仓库
需要先阅读本文所基于的上一个工作:基于DQN的机器人自动走迷宫

  • 上次使用已有的库完成走迷宫任务的方案,需要通过获取全图视野来生成完美轨迹样本来进行训练,但是效果也是立竿见影。
  • 但是很多时候没有办法把获取这样的完美样本,需要智能体从起点开始完全自主摸索获取轨迹样本,然后训练,所以本文尝试不使用全图视野获取来训练。
  • 注意:整体的DQN算法机制和上一篇是一样的,只是有很多改进用来适应没有完美样本的情况,具体实现方法也不一样。

🔥实现

本次通过PyQT5搭建可视化界面显示迷宫,然后手动实现了迷宫环境、智能体、训练函数等全方面内容。并不会展示所有代码,详见github仓库。

迷宫环境及奖励机制

  • 首先加载迷宫,迷宫本身是用二维矩阵表示的,存于txt当中,0表示可以走的路,1表示障碍物,2表示起点。
  • 状态空间是xy坐标,也就是2个,动作空间就是上下左右4个。
  • 奖励机制:
    • 首先就是基本的移动操作判断,撞墙是会发生的,暂时不采取直接筛选非撞墙方向的方案。
    • 接着是探索奖励,采用了寻访计数,用二维数组self.explored记录到达地图所有位置的次数,到达新位置给一些奖励,重复到达给惩罚,注意这种方法相当于把撞墙检测和反复横跳的情况统一起来,防止出现智能体为了不撞墙而反复横跳保持局部最优的问题,使之能够尽可能多探索。
    • 然后检测完终点后,到达终点给予大额奖励,反之给予小的常规惩罚。
class MazeEnv(gym.Env):
    def __init__(self):
        super(MazeEnv, self).__init__()
        self.maze = utils.load_map('map.txt')           # 加载迷宫地图
        self.start = (0, 0)                             # 起点位置
        self.end = (2, 5)                               # 终点位置
        self.agent_position = list(self.start)          # 当前智能体位置
        self.action_space = gym.spaces.Discrete(4)      # 动作空间 上下左右 4个动作
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(1, 2), dtype=np.float32)  # 状态空间 xy坐标 2
        self.explored = torch.zeros(10, 10)             # 访问计数(到过的位置就+1)

    def reset(self):                                    # 重置智能体位置、访问计数清零
        self.agent_position = list(self.start)
        self.explored = torch.zeros(10, 10)
        return self._get_observation()

    def step(self, action):                             # 动作并获取回报
        col, row = [x * 2 + 1 for x in self.agent_position]
        reward = 0
        ''' 移动操作 '''
        if action == 0 and self.maze[row - 1][col] == 0:    # 向上
            self.agent_position[1] -= 1
        elif action == 1 and self.maze[row + 1][col] == 0:  # 向下
            self.agent_position[1] += 1
        elif action == 2 and self.maze[row][col - 1] == 0:  # 向左
            self.agent_position[0] -= 1
        elif action == 3 and self.maze[row][col + 1] == 0:  # 向右
            self.agent_position[0] += 1
        ''' 探索奖励 '''
        if not self.explored[self.agent_position[0], self.agent_position[1]]:   # 到新位置给小奖励
            reward += 20.0
        else:                       # 重复到达给惩罚
            reward -= 10.0 + 5.0 * self.explored[self.agent_position[0], self.agent_position[1]]
        self.explored[self.agent_position[0], self.agent_position[1]] += 1      # 寻访计数
        ''' 终点检测 '''
        done = tuple(self.agent_position) == self.end
        ''' 常规奖励 '''
        if done:
            reward += 20000         # 到达终点给予大量正奖励
        else:
            reward -= 5.0           # 基础的每步惩罚,鼓励尽快找到终点
        return self._get_observation(), reward, done, {}

Q 网络

  • 结构很简单,就是多层全连接网络。输入状态空间,输出动作空间。
    self.input_hidden = nn.Sequential(
        nn.Linear(state_size, 128),
        nn.ReLU(False),
        nn.Linear(128, 256),
        nn.ReLU(False),
        nn.Linear(256, 128),
        nn.ReLU(False),
        nn.Linear(128, 128),
        nn.ReLU(False),
    )
    self.final_fc = nn.Linear(128, action_size)

智能体

  • 首先是各种超参数,注释中有说明,要调主要还是探索率衰减、学习率为主。
  • 探索率衰减值得足够接近1,毕竟每走一步都要衰减,不能衰减太快。
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=1000)    # 经验回放池
        self.gamma = 0.90                   # 折扣因子
        self.epsilon = 1                    # 探索率
        self.epsilon_min = 0.1              # 最小探索率
        self.epsilon_decay = 0.9995         # 探索率衰减
        self.learning_rate = 0.002          # 网络的学习率
        self.model = DQN(state_size, action_size, 42).cuda()            # Q网络
        self.target_model = DQN(state_size, action_size, 42).cuda()     # 目标网络
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()
        self.count = 0                      # 计数,用于更新目标网络
        self.update_rate = 10               # 更新目标网络频率
  • 然后是回放池用的收集数据的函数,就是把状态、动作、奖励、是否结束、下一状态都保存起来。
  • 以及选取动作的ε-贪婪算法,由于测试环节是不需要ε的,所以可以选择是否使用ε。
    def remember(self, state, action, reward, next_state, done):    # 新的数据加入经验回放池
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, epsilon=True):                         # ε-贪婪算法选取动作
        if np.random.rand() <= self.epsilon and epsilon:        # 随机
            return np.random.randint(self.action_size)
        self.model.eval()
        state = torch.tensor([state], dtype=torch.float)
        with torch.no_grad():
            q_values = self.model(state.cuda())                 # 贪婪
        return q_values.argmax().item()
  • 然后是学习,也就是训练的核心部分,首先从回放池采batch数据,然后变一下形状等,就可以前向传播、计算Q值,然后求得损失反向传播等操作和深度学习常规流程就一样了。
  • 最后还有注意更新目标网络,每隔一段时间就复制一次参数到目标网络。
    def replay(self, batch_size):                   # 训练学习
        if len(self.memory) < batch_size:
            return
        ''' 采样 + 数据处理 '''
        minibatch = random.sample(self.memory, batch_size)
        states, next_states, rewards, actions, dones = [], [], [], [], []
        for state, action, reward, next_state, done in minibatch:
            states.append(torch.FloatTensor(state).cuda())
            next_states.append(torch.FloatTensor(next_state).cuda())
            rewards.append(torch.FloatTensor([reward]).cuda())
            actions.append(torch.FloatTensor([action]).cuda())
            dones.append(torch.FloatTensor([done]).cuda())
        states = torch.stack(states)
        next_states = torch.stack(next_states)
        rewards = torch.stack(rewards)
        actions = torch.stack(actions)
        dones = torch.stack(dones)
        ''' 前向传播 + Q值及其目标值计算 '''
        self.model.train()
        self.target_model.eval()
        Q_expect = self.model(states).gather(1, actions.to(torch.int64))
        Q_targets = rewards + self.gamma * self.target_model(next_states).max(1)[0].view(-1, 1) * (1 - dones)
        ''' 反向传播 + 参数更新 '''
        self.optimizer.zero_grad()                      # 梯度清零
        loss = self.criterion(Q_expect, Q_targets)      # 损失
        loss.backward()                                 # 反向传播
        self.optimizer.step()                           # 参数更新
        if self.count % self.update_rate == 0:          # 目标网络参数更新
            self.target_model.load_state_dict(self.model.state_dict())
        self.count += 1
        if self.epsilon > self.epsilon_min:             # epsilon衰减
            self.epsilon *= self.epsilon_decay

UI界面

  • 用PyQT5实现,就是实时显示绘制迷宫,然后显示智能体的位置。这方面是专门的知识,以下展示部分函数。
  • replace_position:更新智能体位置,要抹除迷宫中智能体之前的位置。
  • drawLines:绘制迷宫、终点、智能体。
    def replace_position(self, list1, pos):
        # 找到迷宫中智能体的位置
        pos1 = None
        for i in range(len(list1)):
            for j in range(len(list1[0])):
                if list1[i][j] == 2:
                    pos1 = (i, j)
                    break
            if pos1:
                break
        list1[pos1[0]][pos1[1]] = 0                     # 抹除旧的智能体位置
        list1[pos[1] * 2 + 1][pos[0] * 2 + 1] = 2       # 新的智能体位置
        return list1
    def drawLines(self, qp, maze):          # 具体绘制
        cell_size = 60                      # 每个格子的宽和高
        # 终点
        qp.setPen(Qt.NoPen)
        qp.setBrush(QBrush(QColor(0, 220, 0), Qt.SolidPattern))
        qp.drawRect(int(end[0] * cell_size), int(end[1] * cell_size), cell_size, cell_size)
        ''' 遍历迷宫矩阵 '''
        for j, row in enumerate(maze):
            for i, cell in enumerate(row):
                if cell == 1:  # 墙体绘制,实心线
                    if j % 2 == 0 and i % 2 == 0:
                        continue
                    elif j % 2 == 0:                # 横墙
                        qp.setPen(QPen(QColor(0, 0, 0), 5))
                        qp.drawLine(int(i / 2 * cell_size - 0.5 * cell_size), int(j / 2 * cell_size), int(i / 2 * cell_size + 0.5 * cell_size), int(j / 2 * cell_size))
                    elif i % 2 == 0:                # 竖墙
                        qp.setBrush(QBrush(QColor(0, 0, 0), Qt.SolidPattern))
                        qp.drawLine(int(i / 2 * cell_size), int(j / 2 * cell_size - 0.5 * cell_size), int(i / 2 * cell_size), int(j / 2 * cell_size + 0.5 * cell_size))
                elif cell == 2:                     # 智能体,实心圆形
                    qp.setBrush(QBrush(QColor(100, 0, 100), Qt.SolidPattern))
                    qp.drawEllipse(int((i - 1) / 2 * cell_size), int((j - 1) / 2 * cell_size), cell_size, cell_size)

训练主函数

  • 前面主要是参数设置、定义对象、设置一些记录用的变量。
  • 然后迭代时,将迭代数episodes 分为若干波rounds方便进度条一波一波地显示。
  • 每一代都让智能体最多走30格,因为当前设置的目的地走7步正常就能到了。
  • 每一代核心流程就是动作(act函数)、获取回报(step函数)、存入经验池(remember函数)、策略网络学习(replay函数),中间保持UI的更新、进度条的更新还有一些结果的记录就行了。
  • 每代训练完都进行测试,看看能不能在最少的步数到达目的地,成功则记录次数,当次数到达足够多就可以停止,绘制平均奖励曲线,如果奖励曲线收敛,则可以比较有把握地认为任务收敛。
def train():
    global next_state
    env = MazeEnv()         # 定义迷宫环境
    state_size = np.prod(env.observation_space.shape)   # 状态空间,xy坐标,即2
    action_size = env.action_space.n                    # 动作空间,上下左右,即4
    agent = DQNAgent(state_size, action_size)           # 定义智能体
    batch_size = 64         # 批次大小
    episodes = 1000         # 总迭代数
    rounds = 20             # 迭代数平均分为若干波
    epoch = 0               # 记录当前迭代数
    return_list = []        # 记录每代回报
    success = 0             # 成功次数(训练)
    succ_test = 0           # 成功次数(测试)

    ''' 迭代次数分为 rounds 波统计 '''
    for i in range(rounds):
        with tqdm(total=int(episodes / rounds), desc=f'波数 {i + 1} / {rounds}') as pbar:
            ''' 每一波(一代代循环) '''
            for e in range(int(episodes / rounds)):
                epoch += 1
                episode_return = 0
                state = env.reset()
                ''' 采样和训练环节  每一代(一步步循环) '''
                for time in range(0, 30):
                    action = agent.act(state)                           # 动作
                    next_state, reward, done, _ = env.step(action)      # 回报、下一个状态、是否到达
                    episode_return += reward                            # 本次迭代总回报
                    pw.wid3.update_maze(next_state)                     # 更新迷宫UI
                    action_dict = ["↑", "↓", "←", "→"]
                    # print(f'{action_dict[action]}', end="")
                    agent.remember(state, action, reward, next_state, done)     # 更新经验回放池
                    if done:                                    # 到达终点,记录成功次数
                        success += 1
                        break
                    if len(agent.memory) > batch_size:          # 训练
                        agent.replay(batch_size)

                return_list.append(episode_return)              # 记录每代回报
                ''' 测试环节 '''
                state = env.reset()
                path = []                                       # 单次测试的路径
                for t in range(0, 8):                           # 最短步数内能否到达终点
                    action = agent.act(state, False)
                    state, _, done, _ = env.step(action)
                    if done:
                        succ_test += 1
                        if succ_test == 8:                      # 累计到达若干次,则结束训练退出
                            torch.save(agent.model.state_dict(), 'DQN.pth')         # 保存权重
                            torch.save(agent.target_model.state_dict(), 'DQN-target.pth')
                            print(f"训练完成!成功找到路径!权重已保存。步数:{t},路径为{path}")
                            utils.draw_image(return_list)                           # 绘制回报曲线
                            return
                    path.append(action)
                ''' 更新进度条和UI显示(UI更新开了可能会卡死,所以注释了) '''
                recent_mean = int(np.mean(return_list[-20:]))                       # 近期回报均值
                # pw.set_data(r=i + 1, rounds=rounds, epoch=epoch,
                #             result=recent_mean, success=success, success_test=succ_test)
                pbar.set_postfix({'迭代数': '%d' % epoch, '平均回报': '%d' % recent_mean,
                                  '训练成功次数': '%d' % success, '测试成功次数': '%d' % succ_test})
                pbar.update(1)
    print(f"训练结束,但未找到路径。")
    utils.draw_image(return_list)                   # 绘制回报曲线

🔥效果

  • 训练过程(最终完成收敛的情况)演示:
  • 训练结果,这次训练明显有很不错的收敛。
    训练结果

🔥总结和思考

  • DQN在不获取全局视野作为训练样本的情况下,纯靠从起点开始移动探索,很容易不收敛,需要对奖励模型、超参数等做优化。
  • 在前期ε很大的情况下如果能多碰巧到达终点累积成功样本,会增加收敛的可能性,反之前期很少成功则,大概率不收敛。这也意味着在终点比较远或者地图大且复杂时很难收敛,或者至少时间成本会很高。展示的结果算是比较理想的情况,虽然不算少见但是不能像训练纯深度学习那样稳健。
  • 对于奖励规则,应该尽可能引导智能体不去陷入撞墙、反复横跳等局部最优,所以应该对这些情况给予惩罚,当前本项目用了寻访计数来存储已经到过的位置,用来奖励新的探索并惩罚到旧的位置,效果还是有提升的。而奖励规则设计本身还是挺靠想象、主观经验感受的。
  • 经验回放池使用队列存储,而大部分的样本都是不成功的,好的样本终究会出列,所以经验回访池里面的样本中保持的正样本量太少,抽取batch时不容易抽到,这也是收敛速度低迷的原因,所以后续还可以改进机制,尽可能多存成功的样本。
  • 所以是否能获取全局视野,在训练开始时就能获取很好的样本至关重要,这可能与实际应用场景的要求有关,能获取应该尽量获取。
  • 再有就是可能DQN本身还是优先的,可以用更复杂的改进版本或者Actor-Critic等框架来提升效果。
  • 本工作算是对以DQN为代表的离散任务的强化学习的一个尝试,能够初步地全面地认识强化学习算法。
  • Copyrights © 2023-2025 LegendLeo Chen
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信