基于Pytorch的LSTM网络时序模型训练及预测

🔥概述

使用LSTM神经网络来预测风场发电量,输入特征是气象数据和预测信息等,输出则为两种测量方法下的功率。项目基于Pytorch框架。

🔥数据结构

DATATIME WINDSPEED PREPOWER WINDDIRECTION TEMPERATURE HUMIDITY PRESSURE
2/11/2021 00:00:00 1.2 8765 99 13.8 93 899
2/11/2021 00:00:00 1.2 8847 99 13.8 93 899
2/11/2021 00:15:00 1.1 8208 100 13.9 93 899
2/11/2021 00:15:00 1.1 8421 100 13.9 93 899
2/11/2021 00:30:00 1.2 7795 95 14.1 92 899
  • 续表,在上表右边续
“ROUND(A.WS | 1)” “ROUND(A.POWER | 0)” YD15
5.5 14247 13913
5.5 14247 13913
5 12543 12823
5 12543 12823
4.2 9177 8678
  • 数据第一列是日期及时间,每隔 15 分钟一行到两行数据 “ROUND(A.POWER | 0)” 和 YD15,最后两列是预测对象(输出特征),数据有上万条,汇集一整年的数据,数据末尾有一整天的数据缺少最后两列,这就是我们要最终预测的数据内容。中间理论上应该是输入特征,但是由于一些奇奇怪怪的原因倒数第三列不用,只用第二列开始的6个特征

🔥LSTM模型

模型介绍

LSTM(Long Short-Term Memory,长短时记忆网络)是一种递归式神经网络模型,广泛用于处理时间序列数据和自然语言处理任务。LSTM模型通过将特定的控制单元添加到普通的循环神经网络 (RNN) 模型中,来缓解简单 RNN 中出现的梯度消失问题。

与简单 RNN 不同,LSTM 模型包含三个门(input gate、forget gate 和 output gate),这些门帮助 LSTM 记住或遗忘先前的状态信息并输出新的隐藏状态;还有一个细胞状态,从而使得对时间序列的建模更加准确与可靠。

LSTM 模型的每一个 timestep 都涉及到四个变量:输入 (input)、遗忘 (forget)、更新 (update) 和输出 (output)。模型通过运行各种阶段组成的逻辑电路(logic circuits),以一系列操作对这些变量进行计算:

  • 输入门:该门控制着当前输入信息在什么程度上会影响当前时刻的细胞状态,进而影响后续状态的计算。
  • 遗忘门:该门控制着从之前细胞状态应当被遗忘多少比例。
  • 更新门:该门计算了当前输入及前一个时刻的隐藏状态对细胞状态的更新量,并结合输入门和遗忘门计算出当前时刻的细胞状态。
  • 输出门:该门控制着当前状态下,模型会在多大程度上输出当前时刻的隐藏状态。

LSTM 模型广泛应用于时间序列数据分析(如股票价格预测和天气预报等),自然语言处理(如机器翻译、情感分析等)以及其他需要对序列进行建模的任务中。

实现 LSTM 模型可以使用 TensorFlow 或 PyTorch 等深度学习框架,在过去几年中,由于 LSTM 模型具有较高的鲁棒性和良好的效果,已经被广泛应用于各种实际场景中进行预测和分类等工作。

搭建模型

# LSTM.py
import torch
import torch.nn as nn
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 定义LSTM模型
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_().to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_().to(device)
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))        # lstm层
        out = self.fc(out[:, -1, :])            # 全连接层
        return out

Pytorch定义一个模型非常简单,继承nn.Module类后就能继承神经网络大部分模型的功能,比如反向传播,权值计算等,我们只要设定好隐藏层维度、LSTM层数、LSTM参数、全连接层

self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True) 是一个在 PyTorch 中定义 LSTM 模型的语句,其中:

  • input_dim 是输入数据的特征维度(input dimension);
  • hidden_dim 是 LSTM 模型中隐藏层单元的数量(hidden dimension),也可以理解为每个时间步长的输出维度;
  • num_layers 是 LSTM 模型中堆叠的层数(number of layers),可以使模型更加复杂;
  • batch_first=True 表示输入张量的第一维度是 batch size,即输入数据的形状应该为 (batch_size, sequence_length, input_dim)

这条语句使用 nn.LSTM 类来定义 LSTM 模型。在 PyTorch 中,nn.LSTM 类实现了一个带有可训练参数的 LSTM 模型,可以用于处理序列样本,例如时间序列或序列化的文本数据。

self.lstm 在实例化后成为神经网络模型的一个属性,可以在 forward 方法中被调用来处理输入数据。可使用 input_tensor, _ = self.lstm(input_tensor) 这种格式对数据进行 LSTM 处理。

早停策略

  • 当模型收敛时,训练的损失将长期不发生明显下降,这时我们要设置早停策略来提前终止训练
# 早停策略
class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf

    def __call__(self, val_loss):       # 每当被调用就进行
        if self.best_score is None:         # 第一次最优损失
            self.best_score = val_loss
        elif val_loss > self.best_score + self.delta:       # 若新的损失值比原来还大了delta就计1次
            self.counter += 1
            if self.counter >= self.patience:       # 容忍次数达到就停止,标志位置true
                self.early_stop = True
        else:                               # 如果出现新的最优损失值就记录,次数归零重新来
            self.best_score = val_loss
            self.counter = 0
            self.val_loss_min = val_loss
  • 我们每次迭代都会将损失值输入到这个类进行判断,如果长期没发生改善即可终止迭代,delta是指损失值高于最优损失值多少时是可以容忍的,patience则是这种容忍的次数,到达次数即终止训练

🔥参数配置文件

  • 一个json文件(我命名为config),写着训练模型的必要参数,便于多文件读取并统一使用,不需要更改
  • 依次是 隐藏层神经元(维度)数hidden_dim、LSTM层数num_layers、读取的文件路径file_csv、单个batch大小batch_size、学习率learning_rate、训练世代数num_epochs、日志存放间隔log_interval、日志路径文件夹log_dir、输入特征数input_dim
{
    "hidden_dim" : 32,
    "num_layers" : 2,
    "file_csv" : "data/01.csv",
    "batch_size" : 256,
    "learning_rate" : 0.01,
    "num_epochs" : 500,
    "log_interval" : 10,
    "log_dir" : "./log",
    "input_dim" : 6
}

🔥训练模型

模块引用

import matplotlib.pyplot as plt
import json
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from LSTM import *

LSTM就是上面放模型的文件

训练函数

# 训练模型操作
def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, log_interval, log_dir):
    writer = SummaryWriter(log_dir=log_dir)         # 创建一个TensorBoard的SummaryWriter对象,用于将训练过程中的指标写入事件文件
    early_stopping = EarlyStopping(patience=50, delta=0)    # 早停策略对象,容忍次数和容忍度
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5, min_lr=1e-5)   # 学习率调节器
    for epoch in range(num_epochs):
        model.train()       # 训练模式
        train_loss = 0.0
        for i, (inputs, targets) in enumerate(train_loader):    # 对每个batch分别进行的操作
            inputs = inputs.to(device)          # 放到GPU
            targets = targets.to(device)
            optimizer.zero_grad()               # 清零梯度,否则会累加
            outputs = model(inputs)             # 前向传播
            loss = criterion(outputs, targets)  # 损失计算
            loss.backward()                     # 反向传播
            optimizer.step()                    # 优化器迭代权值
            train_loss += loss.item()
        train_loss /= len(train_loader)             # 平均损失计算
        writer.add_scalar('train_loss', train_loss, epoch)      # 训练记录写入TensorBoard
        model.eval()                # 进入验证模式
        val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:      # 操作类似训练集,只不过不进行反向传播
                inputs = inputs.to(device)
                targets = targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()
            val_loss /= len(val_loader)
            writer.add_scalar('val_loss', val_loss, epoch)      # 写入验证记录
            early_stopping(val_loss)                # 早停判断
            scheduler.step(val_loss)                # 学习率更新
            lr = optimizer.param_groups[0]['lr']
        if epoch % log_interval == 0:
            print(f'Epoch [{epoch}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Lr: {lr}')
        if early_stopping.early_stop:
            print('Early stopping')
            break
    writer.close()      # 关闭记录
  • 如上我们先设定一个TensorBoard事件总结记录的实例,我们可以通过TensorBoard查看训练时的一些结果(不演示)
  • 然后我们设定早停器学习自动下降的调节器,前者不再赘述,后者是为了防止出现因学习率过大导致损失值长期不变(局部最优)的情况出现,适当下降学习率可以提升训练效果,patience 类似早停策略的patience也是容忍损失值的次数, factor 是下降时的比例,即新的学习率是旧的学习率的factor倍。min_lr 是最小的学习率。还有一些参数不在此展示
  • 之后我们就可以开始训练,训练伴随着验证,model.train()model.eval() 使模型分别进入训练和评估(验证)阶段,两者的操作类似,只不过验证不进行反向传播。记得判断早停。训练结束后关闭记录器

测试函数

# 测试模型操作
def evaluate_model(model, test_loader, scaler, device):
    model.eval()            # 进入测试模式(也就是验证模式)
    y_true = []         # 实际值
    y_pred = []         # 预测值
    with torch.no_grad():       # 和验证操作一样,没有反向传播
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            y_true.append(targets.cpu().numpy())
            y_pred.append(outputs.cpu().numpy())

    y_true = scaler.inverse_transform(np.concatenate(y_true))       # 反归一化真实结果
    y_pred = scaler.inverse_transform(np.concatenate(y_pred))       # 反归一化预测结果

    mse = mean_squared_error(y_true, y_pred)            # 计算方差(均方根误差)MSE
    mae = mean_absolute_error(y_true, y_pred)           # 计算平均绝对误差MAE
    print(f'方差: {mse:.4f}, 绝对误差: {mae:.4f}')
    return y_true, y_pred
  • 这次我们的验证集和测试集是一样的,所有测试操作其实就是和验证操作一样,只不过我们最后计算了方差和绝对误差,以便对模型的性能进行了解。

作图函数

# 画图
def plot_results(y_true, y_pred):
    plt.plot(y_true[:, 0], label='True1')
    plt.plot(y_pred[:, 0], label='Predicted1')
    plt.plot(y_true[:, 1], label='True2')
    plt.plot(y_pred[:, 1], label='Predicted2')
    plt.legend()
    plt.show()
  • 很简单的基于matplotlib的作图,分别画出两个功率的真实值和预测值曲线

主函数

if __name__ == '__main__':
    # 读取配置文件
    with open('config.json', 'r') as f:
        config = json.load(f)
    for key, value in config.items():
        print(f'{key}: {value}', end=", ")

    # 设置设备
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print('设备:', device)

    # 读取CSV数据
    data = pd.read_csv(config["file_csv"], header=0, index_col=0)
    data = data.dropna()            # 去掉空数据
    features = data.iloc[:, :config["input_dim"]].values         # 输入特征
    targets = data.iloc[:, -2:].values          # 预测特征,后两列
  • 初始化及数据处理,读取config配置参数,设置GPU训练(没有就是用CPU),读取制作数据集,这里涉及pandas的df使用
    # 归一化数据
    scaler = StandardScaler()           # 标准正态分布归一化
    data_scaled = scaler.fit_transform(np.concatenate([features, targets], axis=1))     # axis为1代表沿行合并输入特征和预测特征,进行归一化
    features_scaled = data_scaled[:, :-2]       # concatenate合起来的再拆开
    targets_scaled = data_scaled[:, -2:]

    # 划分训练集和测试集,随机划分
    X_train, X_test, y_train, y_test = train_test_split(features_scaled, targets_scaled, test_size=0.01, random_state=22)

    # dataset和dataloader
    train_dataset = TensorDataset(torch.Tensor(X_train).unsqueeze(1), torch.Tensor(y_train))    # 输入特征、预测特征打包成Dataset
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)     # 数据集放进Loader
    val_dataset = TensorDataset(torch.Tensor(X_test).unsqueeze(1), torch.Tensor(y_test))        # 验证集也同理,但不洗牌方便看结果
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
  • 上面完成了数据的基本处理,包括归一化、数据集划分、dataset和dataloader的生产
  • 归一化这里我们用标准正态分布,将输入输出合并起来归一化,这样比分开归一化更好,归一化后再把数据拆开成输入输出
  • 划分我们用自带的train_test_split,随机划分
  • dataset和dataloader部分:

    创建Dataset数据集:
    torch.Tensor(X_train)将输入数据转换为PyTorch张量。
    unsqueeze(1)将输入数据张量的维度从(N, )变为(N, 1),这是因为在PyTorch中,通常将数据视为一个张量序列,其中第一个维度是样本数量,第二个维度是特征数量。这里,我们将每个样本视为一个特征,因此需要在张量中添加一个新的维度。
    torch.Tensor(y_train)将标签数据转换为PyTorch张量。
    TensorDataset将两个张量打包成一个数据集对象,使得在训练时可以按照指定的batch_size逐批次进行训练
    数据集(Dataset)转换为一个可迭代的数据加载器DataLoader:
    batch_size是指每个批次的样本数量,即每次迭代从数据集中取出的样本数量。
    shuffle=True表示每个epoch开始时是否对数据进行洗牌,即将数据集中的样本随机打乱,以使每个批次包含不同的样本,减少模型对数据的依赖性。

    # 创建模型,放入GPU
    model = LSTMModel(input_dim=features_scaled.shape[1], hidden_dim=config['hidden_dim'],
                      num_layers=config['num_layers'], output_dim=2).to(device)

    # 定义损失函数和优化器
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
  • 以上是训练必要准备,首先实例化模型,定义损失函数和优化器,这种模型用Adam比较合适,输入初始学习率
    # 训练模型
    train_model(model, train_loader, val_loader, criterion, optimizer, device, config['num_epochs'],
                config['log_interval'], config['log_dir'])

    # 评估模型
    scaler2 = StandardScaler()  # 只属于预测特征的归一化,标准正态分布归一化
    targets_scaled2 = scaler2.fit_transform(targets)
    test_dataset = TensorDataset(torch.Tensor(X_test).unsqueeze(1), torch.Tensor(y_test))   # 本项目,测试集和验证集其实是一样的
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False)
    y_true, y_pred = evaluate_model(model, test_loader, scaler2, device)

    # 绘制预测结果
    plot_results(y_true, y_pred)

    # 保存模型
    torch.save(model.state_dict(), 'lstm_model_v2_0.pt')
  • 最后进行模型训练,调用训练函数,传入参数。训练完进行评估(测试),这里测试我有定义一个新的归一化scaler2,因为原先是输入输出合并进行归一化,结构和这个预测的不一样,预测的只有两列结果而已。这个scaler2是用于预测结果反归一化。
  • 预测结果最后画出来,进行可视化,保存模型即可。

运行结果

训练过程
如上,显示打印了配置参数,然后每个特定代就打印出训练状态,过一段时间损失变化不大,学习率下降
早停
发生了早停,输出了最终的测试误差和方差,损失值下降到0.0797
训练结果
结果打印出来,可以看到预测和真实值差别不大,可以调节更改画图函数的输入数据量,以更细致地观察结果

  • 改变学习率等各种参数可以进行更多的调试,训练出不同效果的模型,我这并不是最优的配置

🔥测试模型

  • 如果要将数据集所有数据进行预测并直观查看可以用运行下面这个文件
import torch
import torch.nn as nn
import pandas as pd
from LSTM import LSTMModel
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import numpy as np
import json

# 加载配置
with open('config.json', 'r') as f:
    config = json.load(f)
    print(config)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("设备:", device)

# 读取csv数据
data = pd.read_csv(config["file_csv"])
data = data.dropna()  # 排除空数据
data = data.drop(columns=[data.columns[0]])

# 定义特征和预测对象,最后两列是预测对象
features = data.iloc[:, :config["input_dim"]].values
target = data.iloc[:, -2:].values

# 归一化特征和预测对象
scaler = StandardScaler()  # 归一化   均值为0,标准差为1的正态分布
features_scaled = scaler.fit_transform(features)
target_scaled = scaler.fit_transform(target)

# 训练参数
input_dim = features_scaled.shape[1]  # 输入参数维度,特征维度
hidden_dim = config["hidden_dim"]  # 特征隐藏层神经元
num_layers = config["num_layers"]  # LSTM模型中LSTM层数的数量
output_dim = 2  # 输出数据的维度,即LSTM模型的输出特征(预测对象)数量

# 实例化LSTM模型
model = LSTMModel(input_dim, hidden_dim, num_layers, output_dim).to(device)  # 放到GPU

model.load_state_dict(torch.load('lstm_model_v2_0.pt'))

# 预测测试集
test_inputs = torch.autograd.Variable(torch.Tensor(features_scaled).view(-1, 1, input_dim)).to(device)  # 测试集张量
predicted_targets = model(test_inputs)  # 对目标模型走一次前向传播,即预测
predicted_targets = scaler.inverse_transform(predicted_targets.cpu().detach().numpy())  # 反归一化得到真实大小的数据

# 画图,部分区间
begin = 34000
end = 34096
plt.plot(np.arange(begin, end, 1), target[begin:end, 0], label='Actual1')  # 两个输出分别画出来
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 0], label='Predicted1')
plt.plot(np.arange(begin, end, 1), target[begin:end, 1], label='Actual2')
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 1], label='Predicted2')
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 0] - target[begin:end, 0], label='Loss1')
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 1] - target[begin:end, 1], label='Loss2')
plt.legend()
plt.show()
  • 操作很简单,初始化一些配置,加载训练过的模型,然后把这个数据集丢进去进行一次前向传播得到数据,反归一化得到结果,绘图(设置好begin和end就可以观察不同区间点)。
    测试结果
  • 如图,红色调的两条预测曲线和蓝绿色两条实际曲线很相近,趋势上得到比较好的结果,而且没有过拟合的效果。紫色的是两条误差曲线

🔥预测数据

  • 跑下面文件即可预测数据文件空缺功率的日期的全部数据
import csv
import json

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from sklearn.preprocessing import StandardScaler

from LSTM import LSTMModel

# 加载配置
with open('config.json', 'r') as f:
    config = json.load(f)
    print(config)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("设备:", device)

data = pd.read_csv(config["file_csv"])

# 找到日期最靠后的数据对应的日期,即为预测日期
dates = pd.to_datetime(data['DATATIME'], format='%d/%m/%Y %H:%M:%S')
latest_date = dates.nlargest(1).iloc[0].strftime('%d/%m/%Y')
latest_date = latest_date.split("/")
if latest_date[0][0] == '0':  # 个位数的日期不带0
    latest_date[0] = latest_date[0][1:]
if latest_date[1][0] == '0':  # 个位数的月份不带0
    latest_date[1] = latest_date[1][1:]
latest_date = "/".join(latest_date)
print('被预测日期为:', latest_date)  # 输出结果

data = data.dropna()  # 排除空数据的数据用于归一化
data = data.drop(columns=[data.columns[0]])  # 除去第一列,即时间列

# 空数据作为预测
predict_data = pd.read_csv(config["file_csv"])
predict_data = predict_data[~predict_data.index.isin(data.index)]
predict_data = predict_data[predict_data.iloc[:, 0].str.contains(latest_date)]  # 预测最后日期
predict_data = predict_data.drop(columns=[predict_data.columns[0]])
print(f'预测数量:{len(predict_data)}')

# 定义特征和预测对象,最后两列是预测对象
features = data.iloc[:, :config["input_dim"]].values
target = data.iloc[:, -2:].values
predicts = predict_data.iloc[:, :config["input_dim"]].values

# 归一化特征和预测对象
scaler = StandardScaler()  # 归一化   均值为0,标准差为1的正态分布
features_scaled = scaler.fit_transform(features)
predicts_scaled = scaler.transform(predicts)
target_scaled = scaler.fit_transform(target)

# 训练参数
input_dim = features_scaled.shape[1]  # 输入参数维度,特征维度
hidden_dim = config["hidden_dim"]  # 特征隐藏层神经元
num_layers = config["num_layers"]  # LSTM模型中LSTM层数的数量
output_dim = 2  # 输出数据的维度,即LSTM模型的输出特征(预测对象)数量

# 实例化LSTM模型
model = LSTMModel(input_dim, hidden_dim, num_layers, output_dim).to(device)  # 放到GPU

model.load_state_dict(torch.load('lstm_model_v2_0.pt'))

# 预测集
predict_inputs = torch.autograd.Variable(torch.Tensor(predicts_scaled).view(-1, 1, input_dim)).to(device)  # 预测集张量
predicted_targets = model(predict_inputs)  # 对目标模型走一次前向传播,即预测
predicted_targets = scaler.inverse_transform(predicted_targets.cpu().detach().numpy())  # 反归一化得到真实大小的数据

# 输出csv文件
with open('output/output.csv', 'w', newline='', encoding="utf-8") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows(predicted_targets)

# 画图,部分区间
begin = 0
end = len(predicted_targets)
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 0], label='Predicted1')
plt.plot(np.arange(begin, end, 1), predicted_targets[begin:end, 1], label='Predicted2')
plt.legend()
plt.show()
  • 和测试类似,只不过预测数据是没有实际值的,预测前我们分析csv数据找到日期最后的数据,把这些数据传进model进行前向传播,再反归一化就是预测结果,我们作图并且存入新的csv文件,结果如下:
    预测结果
  • Copyrights © 2023-2025 LegendLeo Chen
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信