students_git_repo/周家林/TCN.py

228 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import torch.nn as nn
from torch.nn.utils import weight_norm
import pandas as pd
from sklearn import preprocessing
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.feature_selection import SelectKBest, chi2
# 检查GPU可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 1. 加载数据集
df = pd.read_csv('sensor.csv', index_col=0)
# 2. 数据预处理
df.drop(columns=['sensor_50', 'sensor_51', 'sensor_15'], inplace=True)
x = df.iloc[:, 1:50].fillna(method='ffill')
scaler = preprocessing.MinMaxScaler()
x = scaler.fit_transform(x)
x = pd.DataFrame(x, columns=df.iloc[:, 1:50].columns)
conditions = [(df['machine_status'] =='NORMAL'), (df['machine_status'] =='BROKEN'), (df['machine_status'] =='RECOVERING')]
choices = [1, 0, 2]
df['Operation'] = np.select(conditions, choices, default=0)
df.drop(['machine_status'],axis=1, inplace=True)
# 4. 特征选择
y = df['Operation']
selector = SelectKBest(score_func=chi2, k=20)
x_new = selector.fit_transform(x, y)
# 3. 构建输入数据
def create_sequences(data, target, time_steps=24):
X, y = [], []
for i in range(len(data) - time_steps):
X.append(data[i:i + time_steps, :])
y.append(target[i + time_steps])
return np.array(X), np.array(y)
X, y = create_sequences(x_new, y)
# 4. 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 转换为Tensor并移动到GPU
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)
# 创建DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 扩张因果卷积模块
class DilatedCausalConv1d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation):
super().__init__()
self.padding = (kernel_size - 1) * dilation # 保证时序因果关系(不泄露未来信息)
self.conv = weight_norm(
nn.Conv1d(in_channels,
out_channels,
kernel_size,
padding=self.padding,
dilation=dilation)
)
def forward(self, x):
x = self.conv(x)
return x[:, :, :-self.padding] # 切片操作去除右侧padding
# 残差块模块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):
super().__init__()
self.conv1 = DilatedCausalConv1d(in_channels, out_channels, kernel_size, dilation)
self.conv2 = DilatedCausalConv1d(out_channels, out_channels, kernel_size, dilation)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None
def forward(self, x):
residual = x
x = self.conv1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.conv2(x)
x = self.relu(x)
x = self.dropout(x)
if self.downsample is not None:
residual = self.downsample(residual)
return residual + x
# 完整TCN模型
class TCN(nn.Module):
def __init__(self, input_size, num_channels, kernel_size=3, dropout=0.2):
super().__init__()
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation = 2 ** i
in_channels = input_size if i == 0 else num_channels[i - 1]
out_channels = num_channels[i]
layers += [
ResidualBlock(in_channels, out_channels, kernel_size, dilation, dropout)
]
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# 定义分类模型整合TCN和分类器
class TCNClassifier(nn.Module):
def __init__(self, input_size, num_channels, num_classes, kernel_size=3, dropout=0.2):
super().__init__()
self.tcn = TCN(input_size, num_channels, kernel_size, dropout)
self.linear = nn.Linear(num_channels[-1], num_classes)
def forward(self, x):
# 调整输入维度:(batch_size, seq_len, features) -> (batch_size, features, seq_len)
x = x.permute(0, 2, 1)
tcn_output = self.tcn(x) # (batch_size, num_channels[-1], seq_len)
# 取最后一个时间步的特征用于分类
last_time_step = tcn_output[:, :, -1]
return self.linear(last_time_step)
# 初始化模型
input_size = x_new.shape[1] # 特征数量20
num_channels = [64, 64, 64] # 各层通道数
num_classes = 3 # 输出类别数
model = TCNClassifier(input_size, num_channels, num_classes).to(device)
# 计算类别权重(处理不平衡数据)
y_train_np = y_train.cpu().numpy() if isinstance(y_train, torch.Tensor) else y_train
class_counts = np.bincount(y_train_np)
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float32)
class_weights = class_weights / class_weights.sum()
class_weights = class_weights.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)
# 训练参数
num_epochs = 5
best_accuracy = 0
train_losses = []
val_accuracies = []
# 训练循环
for epoch in range(num_epochs):
model.train()
epoch_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
epoch_loss += loss.item() * batch_X.size(0)
# 计算平均损失
avg_loss = epoch_loss / len(train_loader.dataset)
train_losses.append(avg_loss)
# 验证阶段
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in test_loader:
outputs = model(batch_X)
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
accuracy = correct / total
val_accuracies.append(accuracy)
scheduler.step(avg_loss) # 调整学习率
print(f"Epoch [{epoch + 1}/{num_epochs}] | "
f"Loss: {avg_loss:.4f} | "
f"Val Acc: {accuracy * 100:.2f}% | "
f"LR: {optimizer.param_groups[0]['lr']:.6f}")
# 评估并输出混淆矩阵
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch_X, batch_y in test_loader:
outputs = model(batch_X)
_, predicted = torch.max(outputs, 1)
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(batch_y.cpu().numpy())
# 输出混淆矩阵
cm = confusion_matrix(all_labels, all_preds)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['BROKEN', 'NORMAL', 'RECOVERING'],
yticklabels=['BROKEN', 'NORMAL', 'RECOVERING'])
plt.title("Confusion Matrix")
plt.show()