diff --git a/周家林/TCN.py b/周家林/TCN.py index 4c0d7e2..e52e79a 100644 --- a/周家林/TCN.py +++ b/周家林/TCN.py @@ -1,134 +1,227 @@ import torch import torch.nn as nn -import torch.nn.functional as F -from torch.nn.utils import weight_norm # 用于权重归一化的工具 +from torch.nn.utils import weight_norm +import pandas as pd +from sklearn import preprocessing +import numpy as np +from sklearn.model_selection import train_test_split +from torch.utils.data import TensorDataset, DataLoader +import matplotlib.pyplot as plt +import seaborn as sns +from sklearn.metrics import confusion_matrix +from sklearn.feature_selection import SelectKBest, chi2 +# 检查GPU可用性 +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +print(f"Using device: {device}") + +# 1. 加载数据集 +df = pd.read_csv('sensor.csv', index_col=0) + +# 2. 数据预处理 +df.drop(columns=['sensor_50', 'sensor_51', 'sensor_15'], inplace=True) +x = df.iloc[:, 1:50].fillna(method='ffill') + +scaler = preprocessing.MinMaxScaler() +x = scaler.fit_transform(x) +x = pd.DataFrame(x, columns=df.iloc[:, 1:50].columns) + +conditions = [(df['machine_status'] =='NORMAL'), (df['machine_status'] =='BROKEN'), (df['machine_status'] =='RECOVERING')] +choices = [1, 0, 2] +df['Operation'] = np.select(conditions, choices, default=0) +df.drop(['machine_status'],axis=1, inplace=True) + +# 4. 特征选择 +y = df['Operation'] + +selector = SelectKBest(score_func=chi2, k=20) +x_new = selector.fit_transform(x, y) + +# 3. 构建输入数据 +def create_sequences(data, target, time_steps=24): + X, y = [], [] + for i in range(len(data) - time_steps): + X.append(data[i:i + time_steps, :]) + y.append(target[i + time_steps]) + return np.array(X), np.array(y) + +X, y = create_sequences(x_new, y) + +# 4. 划分数据集 +X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + +# 转换为Tensor并移动到GPU +X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device) +y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device) +X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device) +y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device) + +# 创建DataLoader +train_dataset = TensorDataset(X_train_tensor, y_train_tensor) +train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) +test_dataset = TensorDataset(X_test_tensor, y_test_tensor) +test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) # 扩张因果卷积模块 class DilatedCausalConv1d(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, dilation): - """ - in_channels: 输入通道数 - out_channels: 输出通道数 - kernel_size: 卷积核大小 - dilation: 扩张因子(控制感受野大小) - """ super().__init__() - # 计算因果卷积需要的左侧padding量:(kernel_size-1)*dilation self.padding = (kernel_size - 1) * dilation # 保证时序因果关系(不泄露未来信息) - - # 创建带权重归一化的1D卷积层 self.conv = weight_norm( nn.Conv1d(in_channels, out_channels, kernel_size, - padding=self.padding, # 只在左侧填充 - dilation=dilation) # 设置扩张率 + padding=self.padding, + dilation=dilation) ) def forward(self, x): - """ - 输入形状: (batch_size, in_channels, seq_len) - 输出形状: (batch_size, out_channels, seq_len) - """ x = self.conv(x) - # 裁剪右侧多余的padding,保持输出长度与输入一致 return x[:, :, :-self.padding] # 切片操作去除右侧padding - # 残差块模块 class ResidualBlock(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2): super().__init__() - # 第一个卷积层(包含所有规范化操作) self.conv1 = DilatedCausalConv1d(in_channels, out_channels, kernel_size, dilation) - # 第二个卷积层 self.conv2 = DilatedCausalConv1d(out_channels, out_channels, kernel_size, dilation) - - # 公共组件初始化 - self.dropout = nn.Dropout(dropout) # 随机失活层 - self.relu = nn.ReLU() # 激活函数 - - # 当输入输出通道数不同时,使用1x1卷积调整通道数 + self.dropout = nn.Dropout(dropout) + self.relu = nn.ReLU() self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None def forward(self, x): - residual = x # 保存原始输入用于残差连接 - - # 第一层处理流程 - x = self.dropout(x) # 应用Dropout - x = self.relu(x) # 非线性激活 - x = self.conv1(x) # 扩张因果卷积 - - # 第二层处理流程 - x = self.dropout(x) # 再次应用Dropout - x = self.relu(x) # 非线性激活 - x = self.conv2(x) # 扩张因果卷积 - - # 处理残差连接 + residual = x + x = self.conv1(x) + x = self.relu(x) + x = self.dropout(x) + x = self.conv2(x) + x = self.relu(x) + x = self.dropout(x) if self.downsample is not None: - residual = self.downsample(residual) # 通过1x1卷积调整通道数 - return residual + x # 残差相加 - + residual = self.downsample(residual) + return residual + x # 完整TCN模型 class TCN(nn.Module): def __init__(self, input_size, num_channels, kernel_size=3, dropout=0.2): - """ - input_size: 输入特征维度(通道数) - num_channels: 每层的输出通道数列表(决定网络深度) - kernel_size: 卷积核尺寸 - """ super().__init__() - layers = [] # 存储所有残差块 - num_levels = len(num_channels) # 网络层数 - - # 逐层构建网络 + layers = [] + num_levels = len(num_channels) for i in range(num_levels): - dilation = 2 ** i # 扩张因子指数增长(2^0, 2^1, 2^2...) - in_channels = input_size if i == 0 else num_channels[i - 1] # 确定输入通道 - out_channels = num_channels[i] # 当前层输出通道 - - # 添加残差块 + dilation = 2 ** i + in_channels = input_size if i == 0 else num_channels[i - 1] + out_channels = num_channels[i] layers += [ - ResidualBlock( - in_channels, - out_channels, - kernel_size=kernel_size, - dilation=dilation, - dropout=dropout - ) + ResidualBlock(in_channels, out_channels, kernel_size, dilation, dropout) ] - - # 将所有残差块组合成序列 self.network = nn.Sequential(*layers) def forward(self, x): - """ - 输入形状: (batch_size, input_size, seq_len) - 输出形状: (batch_size, num_channels[-1], seq_len) - """ return self.network(x) -# 示例用法 -if __name__ == "__main__": - # 配置参数 - batch_size = 32 # 批大小 - seq_len = 100 # 序列长度 - input_size = 64 # 输入特征维度 - num_channels = [64, 64, 64] # 各层输出通道配置(这里3层,每层64通道) - kernel_size = 3 # 卷积核尺寸 +# 定义分类模型(整合TCN和分类器) +class TCNClassifier(nn.Module): + def __init__(self, input_size, num_channels, num_classes, kernel_size=3, dropout=0.2): + super().__init__() + self.tcn = TCN(input_size, num_channels, kernel_size, dropout) + self.linear = nn.Linear(num_channels[-1], num_classes) - # 初始化模型 - model = TCN(input_size, num_channels, kernel_size) + def forward(self, x): + # 调整输入维度:(batch_size, seq_len, features) -> (batch_size, features, seq_len) + x = x.permute(0, 2, 1) + tcn_output = self.tcn(x) # (batch_size, num_channels[-1], seq_len) - # 生成测试数据 - x = torch.randn(batch_size, input_size, seq_len) # 随机输入数据 + # 取最后一个时间步的特征用于分类 + last_time_step = tcn_output[:, :, -1] + return self.linear(last_time_step) - # 前向传播 - output = model(x) - # 验证输出形状(应与输入序列长度相同) - print(f"Input shape: {x.shape}") # (32, 64, 100) - print(f"Output shape: {output.shape}") # (32, 64, 100) \ No newline at end of file +# 初始化模型 +input_size = x_new.shape[1] # 特征数量(20) +num_channels = [64, 64, 64] # 各层通道数 +num_classes = 3 # 输出类别数 + +model = TCNClassifier(input_size, num_channels, num_classes).to(device) + +# 计算类别权重(处理不平衡数据) +y_train_np = y_train.cpu().numpy() if isinstance(y_train, torch.Tensor) else y_train +class_counts = np.bincount(y_train_np) +class_weights = 1. / torch.tensor(class_counts, dtype=torch.float32) +class_weights = class_weights / class_weights.sum() +class_weights = class_weights.to(device) + +# 定义损失函数和优化器 +criterion = nn.CrossEntropyLoss(weight=class_weights) +optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) +scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3) + +# 训练参数 +num_epochs = 5 +best_accuracy = 0 +train_losses = [] +val_accuracies = [] + +# 训练循环 +for epoch in range(num_epochs): + model.train() + epoch_loss = 0 + + for batch_X, batch_y in train_loader: + optimizer.zero_grad() + + # 前向传播 + outputs = model(batch_X) + loss = criterion(outputs, batch_y) + + # 反向传播 + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪 + optimizer.step() + + epoch_loss += loss.item() * batch_X.size(0) + + # 计算平均损失 + avg_loss = epoch_loss / len(train_loader.dataset) + train_losses.append(avg_loss) + + # 验证阶段 + model.eval() + correct = 0 + total = 0 + with torch.no_grad(): + for batch_X, batch_y in test_loader: + outputs = model(batch_X) + _, predicted = torch.max(outputs.data, 1) + total += batch_y.size(0) + correct += (predicted == batch_y).sum().item() + + accuracy = correct / total + val_accuracies.append(accuracy) + scheduler.step(avg_loss) # 调整学习率 + + print(f"Epoch [{epoch + 1}/{num_epochs}] | " + f"Loss: {avg_loss:.4f} | " + f"Val Acc: {accuracy * 100:.2f}% | " + f"LR: {optimizer.param_groups[0]['lr']:.6f}") + + +# 评估并输出混淆矩阵 +model.eval() +all_preds = [] +all_labels = [] + +with torch.no_grad(): + for batch_X, batch_y in test_loader: + outputs = model(batch_X) + _, predicted = torch.max(outputs, 1) + all_preds.extend(predicted.cpu().numpy()) + all_labels.extend(batch_y.cpu().numpy()) + +# 输出混淆矩阵 +cm = confusion_matrix(all_labels, all_preds) +sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', + xticklabels=['BROKEN', 'NORMAL', 'RECOVERING'], + yticklabels=['BROKEN', 'NORMAL', 'RECOVERING']) +plt.title("Confusion Matrix") +plt.show()