加载数据集、训练、评估模型。

This commit is contained in:
林林 2025-03-19 14:32:38 +08:00
parent eb1987cf9e
commit 7573a11ef5
1 changed files with 180 additions and 87 deletions

View File

@ -1,134 +1,227 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm # 用于权重归一化的工具
from torch.nn.utils import weight_norm
import pandas as pd
from sklearn import preprocessing
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.feature_selection import SelectKBest, chi2
# 检查GPU可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 1. 加载数据集
df = pd.read_csv('sensor.csv', index_col=0)
# 2. 数据预处理
df.drop(columns=['sensor_50', 'sensor_51', 'sensor_15'], inplace=True)
x = df.iloc[:, 1:50].fillna(method='ffill')
scaler = preprocessing.MinMaxScaler()
x = scaler.fit_transform(x)
x = pd.DataFrame(x, columns=df.iloc[:, 1:50].columns)
conditions = [(df['machine_status'] =='NORMAL'), (df['machine_status'] =='BROKEN'), (df['machine_status'] =='RECOVERING')]
choices = [1, 0, 2]
df['Operation'] = np.select(conditions, choices, default=0)
df.drop(['machine_status'],axis=1, inplace=True)
# 4. 特征选择
y = df['Operation']
selector = SelectKBest(score_func=chi2, k=20)
x_new = selector.fit_transform(x, y)
# 3. 构建输入数据
def create_sequences(data, target, time_steps=24):
X, y = [], []
for i in range(len(data) - time_steps):
X.append(data[i:i + time_steps, :])
y.append(target[i + time_steps])
return np.array(X), np.array(y)
X, y = create_sequences(x_new, y)
# 4. 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 转换为Tensor并移动到GPU
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)
# 创建DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 扩张因果卷积模块
class DilatedCausalConv1d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation):
"""
in_channels: 输入通道数
out_channels: 输出通道数
kernel_size: 卷积核大小
dilation: 扩张因子控制感受野大小
"""
super().__init__()
# 计算因果卷积需要的左侧padding量(kernel_size-1)*dilation
self.padding = (kernel_size - 1) * dilation # 保证时序因果关系(不泄露未来信息)
# 创建带权重归一化的1D卷积层
self.conv = weight_norm(
nn.Conv1d(in_channels,
out_channels,
kernel_size,
padding=self.padding, # 只在左侧填充
dilation=dilation) # 设置扩张率
padding=self.padding,
dilation=dilation)
)
def forward(self, x):
"""
输入形状: (batch_size, in_channels, seq_len)
输出形状: (batch_size, out_channels, seq_len)
"""
x = self.conv(x)
# 裁剪右侧多余的padding保持输出长度与输入一致
return x[:, :, :-self.padding] # 切片操作去除右侧padding
# 残差块模块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):
super().__init__()
# 第一个卷积层(包含所有规范化操作)
self.conv1 = DilatedCausalConv1d(in_channels, out_channels, kernel_size, dilation)
# 第二个卷积层
self.conv2 = DilatedCausalConv1d(out_channels, out_channels, kernel_size, dilation)
# 公共组件初始化
self.dropout = nn.Dropout(dropout) # 随机失活层
self.relu = nn.ReLU() # 激活函数
# 当输入输出通道数不同时使用1x1卷积调整通道数
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None
def forward(self, x):
residual = x # 保存原始输入用于残差连接
# 第一层处理流程
x = self.dropout(x) # 应用Dropout
x = self.relu(x) # 非线性激活
x = self.conv1(x) # 扩张因果卷积
# 第二层处理流程
x = self.dropout(x) # 再次应用Dropout
x = self.relu(x) # 非线性激活
x = self.conv2(x) # 扩张因果卷积
# 处理残差连接
residual = x
x = self.conv1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.conv2(x)
x = self.relu(x)
x = self.dropout(x)
if self.downsample is not None:
residual = self.downsample(residual) # 通过1x1卷积调整通道数
return residual + x # 残差相加
residual = self.downsample(residual)
return residual + x
# 完整TCN模型
class TCN(nn.Module):
def __init__(self, input_size, num_channels, kernel_size=3, dropout=0.2):
"""
input_size: 输入特征维度通道数
num_channels: 每层的输出通道数列表决定网络深度
kernel_size: 卷积核尺寸
"""
super().__init__()
layers = [] # 存储所有残差块
num_levels = len(num_channels) # 网络层数
# 逐层构建网络
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation = 2 ** i # 扩张因子指数增长2^0, 2^1, 2^2...
in_channels = input_size if i == 0 else num_channels[i - 1] # 确定输入通道
out_channels = num_channels[i] # 当前层输出通道
# 添加残差块
dilation = 2 ** i
in_channels = input_size if i == 0 else num_channels[i - 1]
out_channels = num_channels[i]
layers += [
ResidualBlock(
in_channels,
out_channels,
kernel_size=kernel_size,
dilation=dilation,
dropout=dropout
)
ResidualBlock(in_channels, out_channels, kernel_size, dilation, dropout)
]
# 将所有残差块组合成序列
self.network = nn.Sequential(*layers)
def forward(self, x):
"""
输入形状: (batch_size, input_size, seq_len)
输出形状: (batch_size, num_channels[-1], seq_len)
"""
return self.network(x)
# 示例用法
if __name__ == "__main__":
# 配置参数
batch_size = 32 # 批大小
seq_len = 100 # 序列长度
input_size = 64 # 输入特征维度
num_channels = [64, 64, 64] # 各层输出通道配置这里3层每层64通道
kernel_size = 3 # 卷积核尺寸
# 定义分类模型整合TCN和分类器
class TCNClassifier(nn.Module):
def __init__(self, input_size, num_channels, num_classes, kernel_size=3, dropout=0.2):
super().__init__()
self.tcn = TCN(input_size, num_channels, kernel_size, dropout)
self.linear = nn.Linear(num_channels[-1], num_classes)
# 初始化模型
model = TCN(input_size, num_channels, kernel_size)
def forward(self, x):
# 调整输入维度:(batch_size, seq_len, features) -> (batch_size, features, seq_len)
x = x.permute(0, 2, 1)
tcn_output = self.tcn(x) # (batch_size, num_channels[-1], seq_len)
# 生成测试数据
x = torch.randn(batch_size, input_size, seq_len) # 随机输入数据
# 取最后一个时间步的特征用于分类
last_time_step = tcn_output[:, :, -1]
return self.linear(last_time_step)
# 前向传播
output = model(x)
# 验证输出形状(应与输入序列长度相同)
print(f"Input shape: {x.shape}") # (32, 64, 100)
print(f"Output shape: {output.shape}") # (32, 64, 100)
# 初始化模型
input_size = x_new.shape[1] # 特征数量20
num_channels = [64, 64, 64] # 各层通道数
num_classes = 3 # 输出类别数
model = TCNClassifier(input_size, num_channels, num_classes).to(device)
# 计算类别权重(处理不平衡数据)
y_train_np = y_train.cpu().numpy() if isinstance(y_train, torch.Tensor) else y_train
class_counts = np.bincount(y_train_np)
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float32)
class_weights = class_weights / class_weights.sum()
class_weights = class_weights.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)
# 训练参数
num_epochs = 5
best_accuracy = 0
train_losses = []
val_accuracies = []
# 训练循环
for epoch in range(num_epochs):
model.train()
epoch_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
epoch_loss += loss.item() * batch_X.size(0)
# 计算平均损失
avg_loss = epoch_loss / len(train_loader.dataset)
train_losses.append(avg_loss)
# 验证阶段
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in test_loader:
outputs = model(batch_X)
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
accuracy = correct / total
val_accuracies.append(accuracy)
scheduler.step(avg_loss) # 调整学习率
print(f"Epoch [{epoch + 1}/{num_epochs}] | "
f"Loss: {avg_loss:.4f} | "
f"Val Acc: {accuracy * 100:.2f}% | "
f"LR: {optimizer.param_groups[0]['lr']:.6f}")
# 评估并输出混淆矩阵
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch_X, batch_y in test_loader:
outputs = model(batch_X)
_, predicted = torch.max(outputs, 1)
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(batch_y.cpu().numpy())
# 输出混淆矩阵
cm = confusion_matrix(all_labels, all_preds)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['BROKEN', 'NORMAL', 'RECOVERING'],
yticklabels=['BROKEN', 'NORMAL', 'RECOVERING'])
plt.title("Confusion Matrix")
plt.show()