import pandas as pd from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score from sklearn.model_selection import train_test_split, cross_val_score, KFold,GridSearchCV import numpy as np import xgboost as xgb from sklearn.svm import SVR from sklearn.tree import DecisionTreeRegressor from sklearn.ensemble import RandomForestRegressor, AdaBoostRegressor,GradientBoostingRegressor from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, MinMaxScaler,RobustScaler from sklearn.linear_model import LinearRegression,Ridge,LogisticRegression, Lasso, ElasticNet from sklearn.neighbors import KNeighborsRegressor from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import RBF, Matern, ConstantKernel, WhiteKernel, DotProduct from sklearn.feature_selection import SelectKBest, f_regression import matplotlib.pyplot as plt from joblib import dump, load import os def get_ssa_data(): train_data = pd.read_excel('./data/SSA.xlsx',sheet_name='sheet1') valid_data = pd.read_excel('./data/SSA.xlsx',sheet_name='sheet2') train_x = train_data.drop('SSA', axis=1) train_y = train_data['SSA'] valid_x = valid_data.drop('SSA', axis=1) valid_y = valid_data['SSA'] return train_x,train_y,valid_x,valid_y def evaluate_model_accuracy(predict,real): predict = np.array(predict) real = np.array(real) # 计算 MAE mae = mean_absolute_error(real, predict) # 计算 MSE mse = mean_squared_error(real, predict) # 计算 RMSE rmse = np.sqrt(mse) # 计算 MAPE mape = np.mean(np.abs((real - predict) / real)) * 100 # 计算 R² r2 = r2_score(real, predict) # 返回结果 return { 'MAE': mae, 'MSE': mse, 'RMSE': rmse, 'MAPE': mape, 'R_2': r2 } def draw_picture(y_test,y_pred,model_acc,save_path,title): plt.scatter(y_test, y_pred, c='blue', marker='o', label='Predicted vs Actual') plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], color='red', lw=2, label='y = x') plt.xlabel('Actual values',fontsize=16) plt.ylabel('Predicted values',fontsize=16) plt.title(title) plt.legend(loc='best') metrics_text = (f"MSE: {round(model_acc['MSE'], 2)}\n" f"RMSE: {round(model_acc['RMSE'], 2)}\n" f"MAE: {round(model_acc['MAE'], 2)}\n" f"MAPE: {round(model_acc['MAPE'], 2)}%\n" f"R_square: {round(model_acc['R_2'], 2)}") plt.metrics_text = (f"MSE: {round(model_acc['MSE'], 2)}\n" f"RMSE: {round(model_acc['RMSE'], 2)}\n" f"MAE: {round(model_acc['MAE'], 2)}\n" f"MAPE: {round(model_acc['MAPE'], 2)}%\n" f"R_square: {round(model_acc['R_2'], 2)}") plt.text(0.75, 0.25, metrics_text, transform=plt.gca().transAxes, fontsize=10, verticalalignment='top') # 获取当前图的边界 # xlim = plt.gca().get_xlim() # ylim = plt.gca().get_ylim() if not os.path.exists(save_path): os.makedirs(save_path) name = title + '.png' path = os.path.join(save_path,name) plt.savefig(path) print(f"图形已保存到: {path}") plt.show() train_x,train_y,valid_x,valid_y = get_ssa_data() """ 线性回归 """ # # 假设训练时的 Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # ('regressor', LinearRegression()) # ]) # pipeline.fit(train_x, train_y) # # 保存模型 # dump(pipeline, './model/SSA_LinearRegression.joblib') # # 预测 # pred_y = pipeline.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','LinearRegression') # # 加载模型 # # loaded_pipeline = load('./model/SSA_LinearRegression.joblib') # # # 使用模型预测 # # pred_y = loaded_pipeline.predict(valid_x) # # print(pred_y) """ 岭回归 """ # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 数据标准化 # ('ridge', Ridge()) # 岭回归模型 # ]) # # 设置超参数网格 # param_grid = { # 'ridge__alpha': [0.001, 0.01, 0.1, 1, 10, 100], # 正则化强度 # 'ridge__solver': ['svd', 'cholesky', 'lsqr', 'sag'], # 求解器 # 'ridge__max_iter': [100, 500, 1000], # 最大迭代次数 # 'ridge__tol': [1e-4, 1e-3] # 收敛阈值 # } # # 设置K折交叉验证 # kfold = KFold(n_splits=5, shuffle=True, random_state=42) # # 创建GridSearchCV对象 # grid_search = GridSearchCV( # estimator=pipeline, # param_grid=param_grid, # cv=kfold, # scoring='neg_mean_squared_error', # 使用负均方误差作为评分 # n_jobs=-1, # 使用所有可用的CPU核心 # verbose=1 # 显示详细过程 # ) # # 训练模型(自动进行超参数优化和交叉验证) # print("开始训练和超参数优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_Ridge.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','Ridge') # # 加载模型 # loaded_pipeline = load('./model/SSA_Ridge.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ 高斯回归 """ #定义更复杂的核函数组合 # base_kernels = [ # # RBF核 + 噪声 # ConstantKernel(1.0, (1e-3, 1e3)) * RBF(1.0, (1e-2, 1e2)) + WhiteKernel(1e-5, (1e-8, 1e-1)), # # Matern核(ν=1.5,适用于中等平滑数据) # ConstantKernel(1.0, (1e-3, 1e3)) * Matern(length_scale=1.0, length_scale_bounds=(1e-2, 1e2), nu=1.5) + WhiteKernel(1e-5, (1e-8, 1e-1)), # # 组合核:RBF + 线性核 # ConstantKernel(1.0, (1e-3, 1e3)) * (RBF(1.0, (1e-2, 1e2)) + DotProduct(sigma_0=1.0, sigma_0_bounds=(1e-2, 1e2))) + WhiteKernel(1e-5, (1e-8, 1e-1)) # ] # # 创建Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 可替换为MinMaxScaler() # ('gpr', GaussianProcessRegressor(n_restarts_optimizer=20)) # ]) # # 定义超参数网格(更广的范围) # param_grid = { # 'gpr__kernel': base_kernels, # 'gpr__alpha': [1e-6, 1e-5, 1e-4, 1e-3], # 'gpr__normalize_y': [True, False] # } # # 设置K折交叉验证和网格搜索 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # grid_search = GridSearchCV(pipeline, param_grid, cv=kf, scoring='neg_mean_squared_error', # n_jobs=-1, verbose=2) # # 训练模型 # print("开始网格搜索...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_GaussianProcessRegressor.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','GaussianProcessRegressor') # # 加载模型 # loaded_pipeline = load('./model/SSA_GaussianProcessRegressor.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ Lasso 回归 """ # # 创建Pipeline(标准化 + Lasso) # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 必须标准化,因为Lasso对尺度敏感 # ('lasso', Lasso(max_iter=10000)) # 增加迭代次数确保收敛 # ]) # # 4. 定义超参数网格 # param_grid = { # 'lasso__alpha': np.logspace(-6, 3, 50), # 正则化系数范围:0.0001到100 # 'lasso__selection': ['cyclic', 'random'] # 优化算法选择 # } # # 5. 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 6. 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 7. 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_Lasso.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','Lasso') # # 加载模型 # loaded_pipeline = load('./model/SSA_Lasso.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ ElasticNet """ # # 创建Pipeline(标准化 + Lasso) # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 必须标准化,因为Lasso对尺度敏感 # ('model', ElasticNet(max_iter=10000)) # 增加迭代次数确保收敛 # ]) # # 定义超参数网格 # param_grid = { # 'model__alpha': np.logspace(-4, 2, 50), # 'model__l1_ratio': [0.1, 0.5, 0.7, 0.9, 0.95] # 控制L1/L2混合比例 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_ElasticNet.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','ElasticNet') # # 加载模型 # loaded_pipeline = load('./model/SSA_ElasticNet.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ K近邻回归 """ # # 创建Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # KNN对特征尺度敏感,必须标准化 # ('knn', KNeighborsRegressor()) # ]) # # 定义超参数网格 # param_grid = { # 'knn__n_neighbors': np.arange(1, 20), # 最近邻的数量 # # 'knn__weights': ['uniform', 'distance'], # 权重函数 # 'knn__algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'], # 计算算法 # # 'knn__leaf_size': np.arange(1, 20), # 叶子节点大小 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_KNeighborsRegressor.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','KNeighborsRegressor') # # 加载模型 # loaded_pipeline = load('./model/SSA_KNeighborsRegressor.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ SVR """ # # 创建Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 优先尝试StandardScaler # ('svr', SVR(max_iter=10000)) # 增加迭代次数确保收敛 # ]) # # 定义超参数网格 # param_grid = { # 'svr__kernel': ['rbf', 'poly', 'sigmoid'], # 核函数类型 # 'svr__C': np.logspace(-2, 4, 20), # 正则化参数(0.01到10000) # 'svr__gamma': ['scale', 'auto'] + list(np.logspace(-3, 1, 10)), # 核系数 # 'svr__epsilon': [0.01, 0.1, 0.5] # 控制对噪声的容忍度 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_SVR.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','SVR') # # 加载模型 # loaded_pipeline = load('./model/SSA_SVR.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ 决策树 """ # # 创建Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 虽然决策树不需要,但保留以便比较 # ('dtr', DecisionTreeRegressor(random_state=42)) # ]) # # 定义超参数网格 # param_grid = { # 'dtr__criterion': ['squared_error', 'friedman_mse'], # 分裂标准 # 'dtr__max_depth': [None, 3, 5, 7, 10, 15, 20], # 树的最大深度 # 'dtr__min_samples_split': [2, 5, 10], # 分裂所需最小样本数 # 'dtr__min_samples_leaf': [1, 2, 4], # 叶节点最小样本数 # 'dtr__max_features': ['auto', 'sqrt', 'log2', None] # 考虑的特征数量 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_DTR.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','DTR') # # 加载模型 # loaded_pipeline = load('./model/SSA_DTR.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ 随机森林 """ # # 创建Pipeline # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 可选,随机森林对尺度不敏感 # ('rfr', RandomForestRegressor(random_state=42, n_jobs=-1)) # ]) # # 定义超参数网格 # param_grid = { # 'rfr__n_estimators': np.arange(1, 50), # 树的数量 # 'rfr__max_depth': np.arange(1, 50), # 树的最大深度 # # 'rfr__min_samples_split': [2, 5, 10], # 分裂所需最小样本数 # # 'rfr__min_samples_leaf': [1, 2, 4], # 叶节点最小样本数 # # 'rfr__max_features': ['auto', 'sqrt', 'log2', 0.5, 0.8], # 考虑的特征比例 # # 'rfr__bootstrap': [True, False] # 是否使用bootstrap采样 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_RFR.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','RFR') # # 加载模型 # loaded_pipeline = load('./model/SSA_RFR.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ ADBT """ # # 创建Pipeline # base_estimator = DecisionTreeRegressor(max_depth=3) # pipeline = Pipeline([ # ('scaler', StandardScaler()), # ('adb', AdaBoostRegressor( # estimator=base_estimator, # 正确传递基础估计器 # random_state=42 # )) # ]) # # 定义超参数网格 # param_grid = { # 'adb__n_estimators': [50, 100, 200], # 'adb__learning_rate': [0.01, 0.1, 0.5, 1.0], # 'adb__loss': ['linear', 'square', 'exponential'], # # 通过estimator参数传递决策树深度 # 'adb__estimator': [DecisionTreeRegressor(max_depth=d) for d in [1, 2, 3, 4]] # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_ADB.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','ADB') # # 加载模型 # loaded_pipeline = load('./model/SSA_ADB.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """ XGB """ # 创建Pipeline # base_estimator = DecisionTreeRegressor(max_depth=3) # pipeline = Pipeline([ # ('scaler', StandardScaler()), # 可选 # ('xgb', xgb.XGBRegressor(objective='reg:squarederror', # random_state=42, # n_jobs=-1)) # ]) # # 定义超参数网格 # param_grid = { # 'xgb__n_estimators': np.arange(1,150), # 树的数量 # # 'xgb__max_depth': [3, 4, 5], # 树的最大深度 # # 'xgb__learning_rate': [0.01, 0.05, 0.1], # 学习率 # # 'xgb__subsample': [0.6, 0.8, 1.0], # 样本采样比例 # # 'xgb__colsample_bytree': [0.6, 0.8, 1.0], # 特征采样比例 # # 'xgb__gamma': [0, 0.1, 0.2], # 最小分裂损失 # # 'xgb__reg_alpha': [0, 0.1, 1], # L1正则化 # # 'xgb__reg_lambda': [0.1, 1, 10] # L2正则化 # } # # 设置5折交叉验证 # kf = KFold(n_splits=5, shuffle=True, random_state=42) # # 网格搜索(优化超参数) # grid_search = GridSearchCV( # pipeline, # param_grid, # cv=kf, # scoring='neg_mean_squared_error', # 最小化MSE # n_jobs=-1, # 使用所有CPU核心 # verbose=2 # 打印进度 # ) # # 训练模型 # print("开始网格搜索优化...") # grid_search.fit(train_x, train_y) # print("\n最佳参数组合:", grid_search.best_params_) # # 使用最佳模型进行预测 # best_model = grid_search.best_estimator_ # # 保存模型 # dump(best_model, './model/SSA_XGB.joblib') # pred_y = best_model.predict(valid_x) # # 验证 # acc = evaluate_model_accuracy(pred_y,valid_y) # print(acc) # # 画图保存 # draw_picture(valid_y,pred_y,acc,'./pic/ssa','XGB') # # 加载模型 # loaded_pipeline = load('./model/SSA_XGB.joblib') # # 使用模型预测 # pred_y = loaded_pipeline.predict(valid_x) # print(pred_y) """GBDT""" pipeline = Pipeline([ ('scaler', StandardScaler()), # 可替换为MinMaxScaler() ('gbdt', GradientBoostingRegressor(random_state=42)) ]) param_grid = { 'gbdt__n_estimators': [20, 30, 40, 50, 60, 70, 80,100], # 树的数量 'gbdt__learning_rate': [0.01, 0.1, 0.2], # 学习率 'gbdt__max_depth': [3, 5, 7], # 树的最大深度 'gbdt__min_samples_split': [2, 5], # 分裂所需最小样本数 'gbdt__min_samples_leaf': [1, 2,3,4,5], # 叶节点最小样本数 'gbdt__max_features': ['sqrt', 'log2'], # 特征选择方式 } # 设置K折交叉验证和网格搜索 kf = KFold(n_splits=5, shuffle=True, random_state=42) grid_search = GridSearchCV(pipeline, param_grid, cv=kf, scoring='neg_mean_squared_error', n_jobs=-1, verbose=2) # 训练模型 print("开始网格搜索...") grid_search.fit(train_x, train_y) print("\n最佳参数组合:", grid_search.best_params_) #使用最佳模型进行预测 best_model = grid_search.best_estimator_ dump(best_model, './model/SSA_GDBT.joblib') pred_y = best_model.predict(valid_x) acc = evaluate_model_accuracy(pred_y,valid_y) print(pred_y) print("\n") print(acc) draw_picture(valid_y,pred_y,acc,'./pic/SSA','GDBT') loaded_pipeline = load('./model/SSA_GDBT.joblib') pred_y = loaded_pipeline.predict(valid_x) print(pred_y)