基于图神经网络的金融异常检测

🔥概述

  • 本次任务使用图神经网络,以金融中异常检测为例,实现大规模图数据结构的分类预测。本任务基于的数据集DGraph,DGraph 是大规模动态图数据集的集合,由真实金融场景中随着时间演变事件和标签构成。
  • DGraph-Fin 是一个由数百万个节点和边组成的有向无边权的动态图。它代表了Finvolution Group用户之间的社交网络,其中一个节点对应一个Finvolution 用户,从一个用户到另一个用户的边表示该用户将另一个用户视为紧急联系人。 下面是位于dataset/DGraphFin目录的DGraphFin数据集的描述:

x: 20维节点特征向量
y: 节点对应标签,一共包含四类。其中类1代表欺诈用户而类0代表正常用户(实验中需要进行预测的两类标签),类2和类3则是背景用户,即无需预测其标签。
edge_index: 图数据边集,每条边的形式(id_a,id_b),其中ids是x中的索引
edge_type: 共11种类型的边
edge_timestamp: 脱敏后的时间戳
train_mask, valid_mask, test_mask: 训练集,验证集和测试集掩码

  • 本预测任务为识别欺诈用户的节点预测任务,只需要将欺诈用户(Class 1)从正常用户(Class 0)中区分出来。需要注意的是,其中测试集中样本对应的label均被标记为-100。

🔥图神经网络——SAGE

本次使用的SAGE主要思想是根据自身节点及其邻居节点的信息,通过某种聚合方式来更新自身节点信息,适用于动态图和大规模图场景。
主要特点
1. 采样策略:与传统的GNN方法需要在每个节点的所有邻居上进行计算不同,SAGE通过随机采样来选择一定数量的邻居节点,从而降低计算复杂度。
2. 聚合函数:SAGE采用多种聚合函数(如均值、LSTM、池化等)来合并邻居节点的信息,这样可以灵活地适应不同的应用场景。
3. 可扩展性:由于SAGE不依赖于整个图的结构,可以在训练过程中动态处理新节点和边,具有较强的可扩展性。
4. 半监督学习:SAGE支持半监督学习,可以在有标签和无标签的数据上进行训练,有助于提升模型性能。
具体流程
1. 节点采样:对于每个节点,从其邻居中随机选择一部分进行后续处理。
2. 信息聚合:使用选定的聚合函数整合邻居节点的特征信息。
3. 节点更新:将聚合后的信息与节点自身的特征结合,更新节点的表示。
4. 多层堆叠:通过多层的采样和聚合,逐层更新节点表示,最终得到节点的高维特征向量。

🔥实现

首先调库

from utils.dgraphfin import DGraphFin
from utils.utils import prepare_folder
from utils.evaluator import Evaluator

import torch
import torch.nn.functional as F
import torch.nn as nn

import torch_geometric.transforms as T
from torch_geometric.nn import GATConv, GCNConv, SAGEConv
import numpy as np
from torch_geometric.data import Data
import os

#设置gpu设备
device = 0
device = f'cuda:{device}' if torch.cuda.is_available() else 'cpu'
device = torch.device(device)

数据处理与概览

然后对数据集进行概览,首先读取数据集为DGraphFin模式,注意remove_edge_index=False是为了保留数据当中的边的信息

path='./datasets/'          # 数据保存路径
save_dir='./results/'       # 模型保存路径
dataset_name='DGraph'
dataset = DGraphFin(root=path, name=dataset_name, transform=T.ToSparseTensor(remove_edge_index=False))
nlabels = dataset.num_classes       # 分类数2
if dataset_name in ['DGraph']:
    nlabels = 2    # 本实验中仅需预测类0和类1

data = dataset[0]
# data.adj_t = data.adj_t.to_symmetric() # 将有向图转化为无向图

if dataset_name in ['DGraph']:      # 归一化
    x = data.x
    x = (x - x.mean(0)) / x.std(0)
    data.x = x
if data.y.dim() == 2:
    data.y = data.y.squeeze(1)

split_idx = {'train': data.train_mask, 'valid': data.valid_mask, 'test': data.test_mask}  #划分训练集,验证集

train_idx = split_idx['train']
result_dir = prepare_folder(dataset_name,'mlp')

print(data)
print(data.x.shape)  #feature
print(data.y.shape)  #label
print(type(data.adj_t))

预处理后得到数据data = dataset[0],再把数据进行归一化后打印出内容:

Data(x=[3700550,20],edge_index=[2,4300999], edge_attr=[4300999], y=[3700550],train_mask=[857899],valid_mask=[183862],test_mask=[183840], adj_t=[3700550, 3700550])

其中x是输入,20个特征维度,整个图有370多万个节点;430多万条边(edge_index);edge_attr代表边的类型(有11种关系);y仅1维度即类别编号;三个mask代表数据集被分为训练、验证、测试三个部分,分别用索引tensor存放;adj_t是邻接矩阵,包含了所有节点的连接情况的稀疏阵。
可以看到数据集异常庞大,进行训练对资源的压力也不言而喻。选用SAGE模型也是因为其对于大规模图的学习能力很强的同时又能降低复杂度。

SAGE网络

使用torch_geometric库当中的SAGEConv实现,其参数为输入特征维度和输出类别维度,很容易使用。

class SAGE(nn.Module):
    def __init__(self, in_feats, h_feats, num_classes):
        super(SAGE, self).__init__()
        self.conv1 = SAGEConv(in_feats, h_feats)
        self.dropout = nn.Dropout(0.5)
        self.conv2 = SAGEConv(h_feats, h_feats)
        self.conv3 = SAGEConv(h_feats, num_classes)

    def forward(self, x, edge_index):
        h = self.conv1(x, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv2(h, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv3(h, edge_index)
        return h

    def reset(self):
        self.conv1.reset_parameters()
        self.conv2.reset_parameters()
        self.conv3.reset_parameters()

训练

训练步骤和常规的深度学习网络(如MLP、卷积神经网络等)的训练流程上是一致的,但是有以下几点需要注意。

  • 最重要的是和x一起输入模型前向传播的既可以是邻接矩阵也可以是边的索引,显然后者数据量更小更合适。
  • 评估采用AUC,即通过对ROC曲线下各部分的面积求和而得。其中ROC曲线的横坐标是伪阳性率FP(也叫假正类率,False Positive Rate),纵坐标是真阳性率TP(真正类率,True Positive Rate)。
  • 计算损失时一定要使用mask去访问训练集部分的节点,否则模型会找到标号为-100的测试集无法计算,而且验证集的节点也不应该被模型学到。
  • 验证时,由于之前前向传播输入了整个图,所以只需要也通过mask访问验证集获得结果即可,此时模型进入评估状态,不会更新参数。
def train(data, model):
    model.reset()       # 重置参数
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)       # 优化器
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=6, factor=0.5, verbose=True)
    loss_fn = nn.CrossEntropyLoss()         # 交叉熵损失
    min_valid_loss = 1e8

    train_mask = data.train_mask            # 训练数据掩码

    features = data.x.to(device)
    labels = data.y.to(device)
    adj = data.adj_t.to(device)             # 邻接矩阵(可以代替edge输入到模型,模型也会识别)
    edge = data.edge_index.to(device)       # 边

    eval_metric = 'auc'                     # 使用AUC衡量指标
    evaluator = Evaluator(eval_metric)      # 评估器
    for epoch in range(500):
        model.train()                       # 训练环节
        out = model(features, edge)

        loss = loss_fn(out[train_mask], labels[train_mask])
        # Compute accuracy on training/validation/test
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        with torch.no_grad():               # 验证环节
            model.eval()
            losses, eval_results = dict(), dict()
            for key in ['train', 'valid']:
                node_id = split_idx[key]
                # out = model(features, edge)
                y_pred = out.exp()          # 分类概率
                losses[key] = loss_fn(out[node_id], labels[node_id]).item()
                eval_results[key] = evaluator.eval(labels[node_id], y_pred[node_id])[eval_metric]

        scheduler.step(losses['valid'])     # 学习率下降策略
        train_eval, valid_eval = eval_results['train'], eval_results['valid']
        train_loss, valid_loss = losses['train'], losses['valid']
        if valid_loss < min_valid_loss:
            min_valid_loss = valid_loss
            torch.save(model.state_dict(), save_dir+'/model.pt') # 最优模型保存

        if epoch % 5 == 0:
            print(f'Epoch: {epoch:02d}, '
                  f'Loss: {loss:.4f}, '
                  f'Train: {100 * train_eval:.3f}, ' # 我们将AUC(百分制)
                  f'Valid: {100 * valid_eval:.3f} ')

model = SAGE(data.x.size(-1), 128, nlabels).to(device)
train(data, model.to(device))

训练过程记录如下图,是纵轴值是AUC*100后的分数,可以看到过拟合状况不算很明显,收敛较为平滑。
训练曲线

预测

由于测试系统用的是CPU进行预测,资源相当有限,而题目又要求预测是对每个节点逐个预测,也就是如果有数万个测试节点,就需要调用数万次predict函数,这意味着常规预测下要将整个图的x和边输入到模型数万次,将是相当夸张的时间开销。所以本次测试采用的是在训练完之后调用最优模型,将整个图在设备上提前进行一次推理,得到的结果保存在pt文件当中,上传pt文件:

model = SAGE(data.x.size(-1), 128, nlabels).to(device)
model.load_state_dict(torch.load('./results/model.pt'))
with torch.no_grad():
    model.eval()
    out = model(data.x, data.edge_index)
    y_pred = out.exp()
torch.save(y_pred, 'pred_result.pt')

预测时只要以如下按节点索引访问数据即可,只需要读取一次文件,每次预测节点只要从pred张量中访问对应预测结果即可,将测试速度降低到极致。

from utils.evaluator import Evaluator
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch_geometric.nn import GATConv, GCNConv, SAGEConv

pred = torch.load('./pred_result.pt', map_location=torch.device('cpu'))     # 直接读取一次
def predict(data,node_id):
    # 模型预测时,测试数据已经进行了归一化处理
    y_pred = pred[node_id]              # 根据索引快速访问结果
    return y_pred

测试得到0.754的评分,相当可以。
测试结果

  • 当然,这里也提供常规预测的方法供参考:
## 生成 main.py 时请勾选此 cell
def predict(data,node_id):
    """
    加载模型和模型预测
    :param node_id: int, 需要进行预测节点的下标
    :return: tensor, 类0以及类1的概率, torch.size[1,2]
    """
    model = SAGE(data.x.size(-1), 128, nlabels)
    model.load_state_dict(torch.load('./results/model.pt', map_location=torch.device('cpu')))
    # 模型预测时,测试数据已经进行了归一化处理
    # -------------------------- 实现模型预测部分的代码 ---------------------------
    with torch.no_grad():
        model.eval()
        out = model(data.x, data.edge_index)
        y_pred = out.exp()[node_id]  # (N,num_classes)

    return y_pred
  • Copyrights © 2023-2025 LegendLeo Chen
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信