wgz_decision/models/env.py

160 lines
6.5 KiB
Python

import gym
import pandas as pd
from data_manager import *
from module import *
from parameters import *
class WgzGym(gym.Env):
def __init__(self, **kwargs):
super(WgzGym, self).__init__()
self.rec_data = None
self.unbalance = None
self.reward = None
self.current_output = None
self.final_step_outputs = None
self.data_manager = DataManager()
self._load_year_data()
self.month = 1
self.day = 1
self.TRAIN = True
self.current_time = None
self.episode_length = 24
self.penalty_coefficient = 10 # 约束惩罚系数
self.sell_coefficient = 0.5 # 售出利润系数
self.a = 0.5
self.b = 0.3
self.c = 0.2
self.heat_a = 0.6
self.power_a = 0.4
self.EC_parameters = kwargs.get('EC_parameters', EC_parameters) # 电解水制氢器
self.HST_parameters = kwargs.get('dg_parameters', dg_parameters) # 储氢罐
self.grid = Grid()
self.EC = EC(self.EC_parameters)
self.HST = HST(self.HST_parameters)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(3,), dtype=np.float32)
'''
时间 光伏 温度(湿度暂未考虑) 电需 热需(转化为对应热水所需瓦数) 人数 电价 7
电解水制氢功率 储氢罐容量占比 市电功率(注意标准化) 3
'''
self.state_space = gym.spaces.Box(low=0, high=1, shape=(10,), dtype=np.float32)
def reset(self, *args):
self.month = np.random.randint(1, 13) # choose 12 month
if self.TRAIN:
self.day = np.random.randint(1, 20)
else:
self.day = np.random.randint(20, Constant.MONTHS_LEN[self.month - 1])
self.current_time = 0
self.EC.reset()
self.HST.reset()
return self._build_state()
def _build_state(self):
hst_soc = self.HST.current_soc
ec_out = self.EC.get_hydrogen()
grid_ex = self.grid
time_step = self.current_time
if self.TRAIN:
price = self.data_manager.get_price_data(self.month, self.day, self.current_time)
temper = self.data_manager.get_temperature_data(self.month, self.day, self.current_time)
solar = self.data_manager.get_solar_data(self.month, self.day, self.current_time)
load = self.data_manager.get_load_data(self.month, self.day, self.current_time)
heat = self.data_manager.get_heat_data(self.month, self.day, self.current_time)
people = self.data_manager.get_people_data(self.month, self.day, self.current_time)
else:
price = self.rec_data[0]
temper = self.rec_data[1]
solar = self.rec_data[2]
load = self.rec_data[3]
heat = self.rec_data[4]
people = self.rec_data[5]
obs = np.concatenate((np.float32(time_step), np.float32(price), np.float32(temper),
np.float32(solar), np.float32(load), np.float32(heat),
np.float32(people), np.float32(ec_out), np.float32(hst_soc), np.float32(grid_ex)),
axis=None)
return obs
def step(self, action):
# 每个组件执行动作 one step
current_obs = self._build_state()
self.EC.step(action[0])
self.HST.step(action[1])
self.grid.step(action[2])
price = current_obs[1]
temper = current_obs[2] # 用途待补充
solar = current_obs[3]
load = current_obs[4]
heat = current_obs[5]
people = current_obs[6] # 用途待补充
power_gap = solar + self.HST.get_power() - self.EC.current_power - load
heat_gap = self.HST.get_heat() + self.EC.get_heat() - heat
# reward = 0.0
sell_benefit, buy_cost = 0, 0
if power_gap >= 0: # 过剩
sell_benefit = self.grid.get_cost(price, power_gap) * self.sell_coefficient
power_gap = 0
power_penalty = 0
else: # 缺少
power_gap = abs(power_gap)
buy_cost = self.grid.get_cost(price, power_gap)
power_penalty = power_gap * self.penalty_coefficient
if heat_gap >= 0:
heat_gap = 0
heat_penalty = 0
else:
heat_gap = abs(heat_gap)
heat_penalty = heat_gap * self.penalty_coefficient
hst_cost = self.HST.get_cost()
ec_cost = self.EC.get_cost(price)
solar_cost = solar # 待补充
economic_cost = hst_cost + ec_cost + solar_cost - sell_benefit + buy_cost
demand_cost = self.heat_a * heat_penalty + self.power_a * power_penalty
eco_benifit = self.EC.less_carbon() - self.grid.get_carbon()
reward = - self.a * demand_cost - self.b * economic_cost + self.c * eco_benifit
self.unbalance = power_gap + heat_gap
final_step_outputs = [self.HST.current_soc, self.EC.current_power, self.grid.current_power]
self.current_time += 1
finish = (self.current_time == self.episode_length)
if finish:
self.final_step_outputs = final_step_outputs
self.current_time = 0
next_obs = self.reset()
else:
next_obs = self._build_state()
return current_obs, next_obs, float(reward), finish
def _load_year_data(self):
data_df = pd.read_csv('data/all_data.csv', sep=',')
solar = data_df['solar_power'].to_numpy(dtype=float)
temper = data_df['temper'].to_numpy(dtype=float)
energy = data_df['energy_demand'].to_numpy(dtype=float)
water = data_df['water_demand'].to_numpy(dtype=float)
people = data_df['people_count'].to_numpy(dtype=float)
price = data_df['price'].to_numpy(dtype=float)
'''可根据需求重新设计训练数据大小'''
def process_elements(elements, transform_function, add_function):
for e in elements:
transformed_e = transform_function(e)
add_function(transformed_e)
process_elements(solar, lambda x: x, self.data_manager.add_load_element)
process_elements(temper, lambda x: x, self.data_manager.add_load_element)
process_elements(energy, lambda x: x, self.data_manager.add_irradiance_element)
process_elements(water, lambda x: x, self.data_manager.add_temperature_element)
process_elements(people, lambda x: x, self.data_manager.add_wind_element)
process_elements(price, lambda x: x, self.data_manager.add_price_element)