基于DQN的机器人自动走迷宫

🔥概述

  • 本次任务是使用DQN算法,训练一个机器人自动走迷宫。
    Maze类会生成如下的随机迷宫 随机迷宫
    Maze类中有许多方法,这次有用到的如下:
  1. Maze(maze_size=size) 来随机生成一个 size * size 大小的迷宫
  2. print() 函数可以输出迷宫的 size 以及画出迷宫图
  3. sense_robot() :获取机器人在迷宫中目前的位置
  4. move_robot(direction) :根据输入方向移动默认机器人,若方向不合法则返回错误信息
  5. can_move_actions(position):获取当前机器人可以移动的方向

🔥DQN算法

DQN算法框架

  • 如图,每次智能体根据当前状态state通过epsilon-贪婪算法选择动作,环境给出奖励reward和下一个状态next_state,得到的s、a、r、s’、done(done表示是否到达终点),将这个五元组数据加入经验回放池。
  • 经验池累积足够多的数据后,再从中随机采样batch_size个数据进行一次训练(一个step),state和next_state分别输入main网络(也是评价网络)和目标网络进行前向传播,得到的输出即Q和Q_target放到公式当中计算MSE损失,然后就是反向传播更新main网络的参数。
  • 每进行n个step,就把main网络的权重复制给目标网络,完成更新。
  • 如此往复迭代,main网络就能有效地根据状态输出Q,智能体选取Q值最大的对应的动作,逐步完成走迷宫的任务。

🔥实现

首先是库调用:

import os
import random
import numpy as np
from Maze import Maze
from Runner import Runner
import matplotlib.pyplot as plt
from QRobot import QRobot
from torch import nn
import torch.optim as optim
import torch
import collections
import copy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

经验回放池

然后是经验回放池,内置的build_full_view函数相当于金手指,将智能体放在每一个位置进行遍历,然后遍历其各个方向获取五元组,放入经验池,这样做可以使得经验池有很全面的数据集,这样训练的效率更高。

class ReplayBuffer:
    ''' 经验回放池 '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)  # 队列,先进先出

    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):       # 从buffer中采样数据,数量为batch_size
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

    def build_full_view(self, maze: Maze):      # 获取视野
        maze_copy = copy.deepcopy(maze)
        maze_size = maze_copy.maze_size
        actions = ["u", "r", "d", "l"]
        for i in range(maze_size):
            for j in range(maze_size):
                state = (i, j)
                if state == maze_copy.destination:
                    continue
                for action_index, action in enumerate(actions):
                    maze_copy.robot["loc"] = state
                    reward = maze_copy.move_robot(action)
                    next_state = maze_copy.sense_robot()
                    is_terminal = 1 if next_state == maze_copy.destination or next_state == state else 0
                    self.add(state, action_index, reward, next_state, is_terminal)

DQN网络

然后是DQN网络的实现,很简单,就是很小的MLP

class DQN(nn.Module):
    ''' 定义Q网络 '''
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, action_size)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

智能体

然后就是最主要智能体类,这里介绍一下各个函数

  • 首先初始化一下用到的参数和对象。首先设定了reward的规则,撞墙和普通移动都有负收益,到达终点的奖励根据地图大小而不同。
  • 折扣因子用于损失函数计算,epsilon用于智能体贪婪算法,其值越大越可能随机选取一个方向动作,反之则会更容易选取Q最大的那个动作。
  • 在初始化就可以直接开始调用金手指获得优质的经验回放池,然后调用训练函数(后续介绍)。
class Robot(QRobot):
    ''' 定义智能体 '''
    def __init__(self, maze):
        super(Robot, self).__init__(maze)
        maze.set_reward(reward={        # 回报规则
            "hit_wall": -5.0,
            "destination": maze.maze_size ** 2.0,
            "default": -1.0,
        })
        self.maze = maze
        self.gamma = 0.90                       # 折扣因子
        self.learning_rate = 0.001              # 学习率
        self.epsilon = 0.1                        # 探索率
        self.epsilon_decay = 0.9995             # 探索率衰减率
        self.epsilon_min = 0                    # 最小探索率
        self.replaybuffer = ReplayBuffer(2000)  # 经验回放池
        self.model = DQN(2, 4).to(device)       # Q网络
        self.target_model = DQN(2, 4).to(device)    # 目标网络
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()
        self.count = 0
        self.action_dict = {"u":0, "r":1, "d":2, "l":3}     # 动作字典
        self.action_dict2 = {v: k for k, v in self.action_dict.items()}     # 反转字典

        self.replaybuffer.build_full_view(maze=maze)        # 开视野
        self.train()            # 训练

epsilon-贪婪算法如下,该任务中随机选择动作可以使用给的can_move_actions函数来排除撞墙方向,也可以就纯随机选取,都是可以收敛的。

    def take_action(self, state):       # epsilon-贪心策略获取动作
        if random.uniform(0, 1) < self.epsilon:        # 随机选择动作
            # action = random.choice(self.maze.can_move_actions(self.maze.sense_robot()))
            action = random.choice(['u','r','d','l'])
        else:                                          # 选择具有最高Q值的动作
            self.model.eval()
            with torch.no_grad():
                q_values = self.model(state.to(device))
            action = self.action_dict2.get(torch.argmax(q_values).item())
            self.model.train()
        return action

训练函数如下,不断地进行循环,进行网络前向传播参数更新,然后测试智能体是否能在指定步数内到达终点,否则就继续训练。

    def train(self):        # 训练函数
        while True:
            self.learn(self.replaybuffer.size())
            self.reset()
            for _ in range(self.maze.maze_size ** 2 - 1):
                a, r = self.test_update()
                if r == self.maze.reward["destination"]:
                    return

参数更新函数如下,流程就是从经验池获取数据,然后输入两个网络进行前向传播,然后计算损失,更新参数,就是很深度学习的一个流程,这里更新目标网络的频率可以自主设置。

    def learn(self, batch_size=32):         # 更新网络
        if self.replaybuffer.size() < batch_size:
            return
        # 从经验池抽样s,a,r,s’,done
        b_s, b_a, b_r, b_ns, b_d = self.replaybuffer.sample(batch_size)
        transition_dict = {'states': b_s, 'actions': b_a, 'next_states': b_ns,
            'rewards': b_r, 'dones': b_d}
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).view(-1, 1).to(device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(device)

        self.model.train()
        self.target_model.eval()
        # 计算Q值及其目标值
        Q_targets = rewards + self.gamma * self.target_model(next_states).reshape(batch_size, 4).max(1)[0].view(-1, 1) * (1-dones)
        Q_expect = self.model(states).reshape(batch_size, 4).gather(1, actions.to(torch.int64))
        # 求损失
        self.optimizer.zero_grad()  # 梯度清零
        loss = self.criterion(Q_expect, Q_targets)  # 损失
        loss.backward()             # 反向传播
        self.optimizer.step()       # 更新Q网络参数

        # if self.count % 10 == 0:
        self.target_model.load_state_dict(self.model.state_dict())  # 目标网络参数更新
            # self.count += 1

然后就是上传测试系统测试的几个函数,是为了符合测试要求的,不用管太多,注意测试时就不用epsilon-贪婪算法,直接选Q值最大的动作。

    def train_update(self):
        """
        以训练状态选择动作并更新Deep Q network的相关参数
        :return :action, reward 如:"u", -1
        """
        # -----------------请实现你的算法代码--------------------------------------
        state = torch.tensor(self.maze.sense_robot(), dtype=torch.float32)
        action = self.take_action(state)
        reward = self.maze.move_robot(action)
        # -----------------------------------------------------------------------
        return action, reward

    def test_update(self):
        """
        以测试状态选择动作并更新Deep Q network的相关参数
        :return : action, reward 如:"u", -1
        """
        # -----------------请实现你的算法代码--------------------------------------
        state = torch.tensor(self.maze.sense_robot(), dtype=torch.float32)
        self.model.eval()
        with torch.no_grad():
            q_values = self.model(state.to(device))
        action = self.action_dict2.get(torch.argmax(q_values).item())
        reward = self.maze.move_robot(action)
        # -----------------------------------------------------------------------

        return action, reward

🔥测试

接下来在本地进行一下调试,代码如下Runner就是给的一个跑通流程的代码,用就行了,也可以自己实现,大体上就是调用之前的update函数。

from QRobot import QRobot
from Maze import Maze
from Runner import Runner

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
"""  Deep Qlearning 算法相关参数: """

epoch = 10  # 训练轮数
maze_size = 5  # 迷宫size
training_per_epoch = 50

""" 使用 DQN 算法训练 """

g = Maze(maze_size=maze_size)
print(g)
r = Robot(g)
runner = Runner(r)
runner.run_training(epoch, training_per_epoch)
# runner.run_testing()
runner.plot_results()
r.reset()
for _ in range(25):
    a, re = r.test_update()
    if re == maze.reward["destination"]:
        print("success")
        break

得到训练结果,直接上11×11大小的迷宫

11乘11迷宫训练结果

输入到系统测试,有3×3、5×5、11×11的迷宫,都测试通过,这里展示11×11的结果
11乘11迷宫结果

  • Copyrights © 2023-2025 LegendLeo Chen
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信