import numpy as np import pandas as pd import copy import cv2 as cv from tqdm import tqdm import os import torch import torch.nn.functional as F from PIL import Image import uuid from pathlib import Path import shutil from work_util.logger import logger from work_util import params import itertools ################################################### 图像类函数调用########################################################################### """将图像按512*512进行裁剪""" def cut_big_image(path): file_name = os.path.basename(path) file_directory = os.path.dirname(path) output_floder_normal = os.path.join(file_directory,'ori') # 创建输出文件夹(如果不存在) os.makedirs(output_floder_normal, exist_ok=True) # 打开图像 image = Image.open(path) width, height = image.size # 定义块的大小 block_size = 512 if (width < block_size) or (height < block_size): return False file_size = os.path.getsize(path) # 将最大大小转换为字节 max_size_mb = 100 max_size_bytes = max_size_mb * 1024 * 1024 # 10MB = 10 * 1024 * 1024 bytes # 判断文件大小是否超过最大限制 if file_size > max_size_bytes: return False # 计算需要的块数 num_blocks_x = (width + block_size - 1) // block_size # 向上取整 num_blocks_y = (height + block_size - 1) // block_size # 向上取整 # 裁剪并保存图像块 for i in range(num_blocks_x): for j in range(num_blocks_y): # 计算裁剪区域 left = i * block_size upper = j * block_size right = min(left + block_size, width) lower = min(upper + block_size, height) # 裁剪图像 block = image.crop((left, upper, right, lower)) # 创建一个新的512x512白色图像 new_block = Image.new('RGB', (block_size, block_size), (255, 255, 255)) # 将裁剪的图像粘贴到白色图像上 new_block.paste(block, (0, 0)) # 保存图像块 block_filename = f'block_{i}_{j}.png' new_block.save(os.path.join(output_floder_normal, block_filename)) logger.info("裁剪完成,图像块已保存。") return True """屋顶光伏的图像预处理""" def roof_pv_preprocess_input(image, md=False): mean = (0.231, 0.217, 0.22) std = (0.104, 0.086, 0.085) if md: image /= 255.0 image -= mean image /= std return image else: image /= 255.0 return image """用于图像合并""" def merge_pic_binary(path, tpye_list): try: image = Image.open(path) width, height = image.size file_name = os.path.basename(path) file_directory = os.path.dirname(path) path_list = [] for name in tpye_list: file_name_ = "merge_" +name+ "_" + file_name input_folder = os.path.join(file_directory,name) # 获取所有小图片文件名 image_files = [f for f in os.listdir(input_folder) if f.endswith('.png')] image_files.sort() # 按文件名排序,确保按顺序合并 # 假设小图的尺寸是512x512 block_size = 512 # 计算需要的块数 num_blocks_x = (width + block_size - 1) // block_size # 向上取整 num_blocks_y = (height + block_size - 1) // block_size # 向上取整 # 创建一个新的空白图像,用于合并块 merged_image = Image.new('RGB', (width, height)) # 遍历所有块并将它们粘贴到合并图像中 for i in range(num_blocks_x): for j in range(num_blocks_y): # 计算块的文件名 block_filename = f'block_{i}_{j}.png' block_path = os.path.join(input_folder, block_filename) logger.info(block_path) # 打开块图像 if os.path.exists(block_path): block = Image.open(block_path) # 计算粘贴位置 left = i * block_size upper = j * block_size merged_image.paste(block, (left, upper)) # 保存合并后的图像 target = os.path.join(file_directory,file_name_) merged_image.save(target) # 替换为你想保存的路径 path_list.append(target) logger.info("合并完成,保存为:%s", target) return {"status":True , "path": path_list} except Exception as e: logger.error("发生错误:", e) return {"status":True , "path": None} """删除文件夹""" def delete_folder(folder_path): try: if os.path.exists(folder_path): shutil.rmtree(folder_path) return True else: return False except Exception as e: logger.error(f"删除文件夹时发生错误: {e}") return False # 将二值图与原始图合并 def merge_final(path_list): ori_path = path_list[1] # 原始图像 binary_path = path_list[0] # 二值图像 # 加载图像并确保它们是RGBA模式(如果需要透明度) ori_image = Image.open(ori_path).convert('RGBA') binary_image = Image.open(binary_path).convert('RGBA') # 确保两张图像的大小相同 if ori_image.size != binary_image.size: raise ValueError("两张图像的尺寸必须相同!") # 设置混合比例,0.5表示两张图像各占50% alpha = 0.4 # 融合图像 blended_image = Image.blend(ori_image, binary_image, alpha) file_directory = os.path.dirname(ori_path) file_name = os.path.basename(ori_path) final_path = os.path.join(file_directory,'fusion_'+file_name) # 保存融合后的图像 blended_image.save(final_path) # 替换为你想保存的路径 def replace_substring(filename, old_substring, new_substring): # 使用 replace 方法进行替换 new_filename = filename.replace(old_substring, new_substring) return new_filename # 将屋顶与光伏的二值图进行合并 def merge_binary(path_list): # 打开二值图像 image1_path = path_list[1] # 替换为你的屋顶图像路径 image2_path = path_list[0] # 替换为你的光伏图像路径 file_directory = os.path.dirname(image1_path) # 加载图像并确保它们是RGB模式 image1 = Image.open(image1_path).convert('RGB') image2 = Image.open(image2_path).convert('RGB') # 创建一个新的图像,RGB模式,初始为黑色 output_image = Image.new('RGB', image1.size, (0, 0, 0)) # 获取像素数据 pixels1 = image1.load() pixels2 = image2.load() output_pixels = output_image.load() # 遍历所有像素点 for x in range(image1.width): for y in range(image1.height): # 检查两个图像在当前像素点上的颜色 if pixels1[x, y] == (255, 0, 0) and pixels2[x, y] == (255, 0, 0): # 红色 output_pixels[x, y] = (255, 0, 0) # 设置为红色 else: output_pixels[x, y] = (0, 0, 0) # 设置为黑色 # 保存输出图像 old_substring = "_pv_" new_substring = "_roofpv_" new_filename = replace_substring(path_list[1], old_substring, new_substring) final_path = os.path.join(file_directory, new_filename) output_image.save(final_path) # 替换为你想保存的路径 return final_path def show_data_pvfd(path, type): data = pd.read_csv(path) # print(data.columns) cols = ['date_time'] cols.append(type) # print(cols) result = data[cols] # result = result.iloc[:192] json_result = result.to_dict(orient='records') return json_result def show_testdata_pvfd(test_data_path): data = pd.read_csv(test_data_path) return data #预测数据补全 def data_complement(data): current_length = len(data) target_length = 192 rows_to_add = target_length - current_length first_row = data.iloc[0].copy() new_date_times = [first_row['date_time'] - pd.Timedelta(minutes=15 * (i + 1)) for i in range(rows_to_add)] new_rows = pd.DataFrame({ 'date_time': new_date_times[::-1], # 反转顺序,使得时间从最早到最晚 'power': [first_row['power']] * rows_to_add # 复制 'Power' 列的值 }) df = pd.concat([new_rows, data], ignore_index=True) return df #模型结果输出与真实值合并,返回结果到前端 def result_merge_guangfufadian(data_path, predictions): data = pd.read_csv(data_path) data['date_time'] = pd.to_datetime(data['date_time']) length = data.shape[0] if length >= 192+96: data = data.loc[-(192+96):] true_data = data[['date_time', 'power']] pred_time = np.array(true_data['date_time'].iloc[192:288]) pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) elif length < 192: true_data = data[['date_time', 'power']] true_data = data_complement(true_data) last_row = data.iloc[-1].copy() pred_time = [last_row['date_time'] + pd.Timedelta(minutes=15 * (i + 1)) for i in range(96)] pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) else : # 小于192 true_data = data[['date_time', 'power']] last_row = data.iloc[191].copy() pred_time = [last_row['date_time'] + pd.Timedelta(minutes=15 * (i + 1)) for i in range(96)] pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) pred_data['pred_power'] = pred_data['pred_power'].clip(lower=0) pred_data = pred_data.to_json(orient='records') true_data = true_data.to_json(orient='records') return pred_data, true_data """ 风力预测发电 """ def show_data_windfd(path,type): data = pd.read_csv(path) # print(data.columns) cols = ['date'] cols.append(type) # print(cols) result = data[cols] # result = result.iloc[:192] json_result = result.to_dict(orient='records') return json_result def show_testdata_windfd(test_data_path): data = pd.read_csv(test_data_path) return data #模型结果输出与真实值合并,返回结果到前端 def result_merge_fenglifadian(data_path, predictions): data = pd.read_csv(data_path) data['date'] = pd.to_datetime(data['date']) length = data.shape[0] if length >= 192+12: data = data.loc[-(192+12):] true_data = data[['date', 'Power(MW)']] pred_time = np.array(true_data['date'].iloc[192:192+12]) pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) elif length < 192: true_data = data[['date', 'Power(MW)']] true_data = data_complement(true_data) last_row = data.iloc[-1].copy() pred_time = [last_row['date'] + pd.Timedelta(minutes=15 * (i + 1)) for i in range(96)] pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) else : true_data = data[['date', 'Power(MW)']] last_row = data.iloc[191].copy() pred_time = [last_row['date'] + pd.Timedelta(minutes=15 * (i + 1)) for i in range(96)] pred_data = pd.DataFrame({ 'date_time': pred_time, 'pred_power': predictions }) pred_data = pred_data.to_json(orient='records') true_data = true_data.to_json(orient='records') return pred_data, true_data # 返回所有的数据 def show_data_jiawanyuce(col,path): data = pd.read_csv(path) # print(data.columns) cols = ['date_time'] cols.append(col) # print(cols) result = data[cols] # result = result.iloc[:192] json_result = result.to_dict(orient='records') return json_result # 返回所有数据 def get_jiawanyuce_data(path): data = pd.read_csv(path) return data def data_feature(data,x_cols): for name in x_cols: data[name+"_15_first"] = data[name].shift(1) data[name+"_30_first"] = data[name].shift(2) data[name+"_45_first"] = data[name].shift(3) data[name+"_60_first"] = data[name].shift(4) data[name+"_1h_mean"] = data[name].rolling(4).mean() data[name+"_1h_max"] = data[name].rolling(4).max() data[name+"_1h_min"] = data[name].rolling(4).min() data[name+"_1h_median"] = data[name].rolling(4).median() data[name+"_1h_std"] = data[name].rolling(4).std() data[name+"_1h_var"] = data[name].rolling(4).var() data[name+"_1h_skew"] = data[name].rolling(4).skew() data[name+"_1h_kurt"] = data[name].rolling(4).kurt() data[name+"_1_diff"] = data[name].diff(periods=1) data[name+"_2_diff"] = data[name].diff(periods=2) data[name+"_2h_mean"] = data[name].rolling(8).mean() data[name+"_2h_max"] = data[name].rolling(8).max() data[name+"_2h_min"] = data[name].rolling(8).min() data[name+"_2h_median"] = data[name].rolling(8).median() data[name+"_2h_std"] = data[name].rolling(8).std() data[name+"_2h_var"] = data[name].rolling(8).var() data[name+"_2h_skew"] = data[name].rolling(8).skew() data[name+"_2h_kurt"] = data[name].rolling(8).kurt() # 不想要日均的了,太长了 for name in x_cols: data[name+"_d_mean"] = data[name].rolling(4*24).mean() data[name+"_d_max"] = data[name].rolling(4*24).max() data[name+"_d_min"] = data[name].rolling(4*24).min() data[name+"_d_median"] = data[name].rolling(4).median() data[name+"_d_std"] = data[name].rolling(4*24).std() data[name+"_d_var"] = data[name].rolling(4*24).var() data[name+"_d_skew"] = data[name].rolling(4*24).skew() data[name+"_d_kurt"] = data[name].rolling(4*24).kurt() return data def get_pred_data(data,start_index,end_index): data['date_time'] = pd.to_datetime(data['date_time']) filtered_data = data[(data['date_time'] >= start_index) & (data['date_time'] <= end_index)] columns= ['X_ch','X_pr','X_li','X_I','Q','pH','Nm3d-1-ch4','S_gas_ch4'] data = data_feature(filtered_data,columns) # 只要最后一条数据进行预测,即可产生结果 return data.iloc[-1:,:] def float_range(start, end, step): while start <= end: yield round(start, 10) # 使用 round 来避免浮点数精度问题 start += step def get_params(params): # 灰分 params['A_min'] = params['A_min'] if params['A_min'] is not None else 4 params['A_max'] = params['A_max'] if params['A_max'] is not None else 48 params['A_step'] = params['A_step'] if params['A_step'] is not None else 4 # 挥发分 params['VM_min'] = params['VM_min'] if params['VM_min'] is not None else 5 params['VM_max'] = params['VM_max'] if params['VM_max'] is not None else 50 params['VM_step'] = params['VM_step'] if params['VM_step'] is not None else 5 # 活化剂比例 params['KC_min'] = params['KC_min'] if params['KC_min'] is not None else 1 params['KC_max'] = params['KC_max'] if params['KC_max'] is not None else 4 params['KC_step'] = params['KC_step'] if params['KC_step'] is not None else 0.5 # 混合方式 params['MM_min'] = params['MM_min'] if params['MM_min'] is not None else 0 params['MM_max'] = params['MM_max'] if params['MM_max'] is not None else 1 params['MM_step'] = params['MM_step'] if params['MM_step'] is not None else 1 # 活化温度 params['AT_min'] = params['AT_min'] if params['AT_min'] is not None else 600 params['AT_max'] = params['AT_max'] if params['AT_max'] is not None else 900 params['AT_step'] = params['AT_step'] if params['AT_step'] is not None else 50 # 活化时间 params['At_min'] = params['At_min'] if params['At_min'] is not None else 0.5 params['At_max'] = params['At_max'] if params['At_max'] is not None else 2 params['At_step'] = params['At_step'] if params['At_step'] is not None else 0.5 # 升温速率 params['Rt_min'] = params['Rt_min'] if params['Rt_min'] is not None else 5 params['Rt_max'] = params['Rt_max'] if params['Rt_max'] is not None else 10 params['Rt_step'] = params['Rt_step'] if params['Rt_step'] is not None else 5 return params def create_pred_data(params): sequence_A = list(float_range(params['A_min'], params['A_max'] , params['A_step'])) sequence_VM = list(float_range(params['VM_min'], params['VM_max'], params['VM_step'])) sequence_KC = list(float_range(params['KC_min'], params['KC_max'], params['KC_step'])) sequence_MM = list(float_range(params['MM_min'], params['MM_max'], params['MM_step'])) sequence_AT = list(float_range(params['AT_min'], params['AT_max'], params['AT_step'])) sequence_At = list(float_range(params['At_min'], params['At_max'], params['At_step'])) sequence_Rt = list(float_range(params['Rt_min'], params['Rt_max'], params['Rt_step'])) training_samples = list(itertools.product(sequence_A, sequence_VM, sequence_KC, sequence_MM, sequence_AT, sequence_At, sequence_Rt)) return training_samples def moniqi_data_prepare(model,path,type,scale,num): if model in path: data = pd.read_csv(path) if type == 1: col = 'SSA' else: col = "TPV" max_data = data[col].max() min_data = data[col].min() if scale < min_data: return { "status":False, "reason":"您输入的"+col+'取值范围过小,模拟器运行数据结果最小值结果为'+str(min_data) } elif scale > max_data: return { "status":False, "reason":"您输入的"+col+'取值范围过大,模拟器运行数据结果最大值结果为'+str(max_data) } else: if type == 1: data['difference'] = abs(data['SSA'] - scale) else: data = data[data['TPV'] > 0] data['difference'] = abs(data['TPV'] - scale) # closest_row = data.loc[[data['difference'].idxmin()]] closest_row = data.nsmallest(num, 'difference') # logger.info(closest_row) result_data = closest_row.to_dict(orient="records") return { "status":True, "reason":result_data } else: return { "status":False, "reason":"请先运行选择模型响应的正向模拟,生成范围数据" }