building-agents/environment_llm.py

177 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gym
import json
import numpy as np
import pandas as pd
from module import *
from parameters import *
from data_manager import *
class ESSEnv(gym.Env):
def __init__(self, **kwargs):
super(ESSEnv, self).__init__()
self.excess = None
self.shedding = None
self.unbalance = None
self.real_unbalance = None
self.operation_cost = None
self.current_output = None
self.final_step_outputs = None
self.data_manager = DataManager()
# self.time_counter = TimeCounter()
self._load_year_data()
self.month = None
self.day = None
self.current_time = None
self.TRAIN = True
self.episode_length = kwargs.get('episode_length', 24)
self.battery_parameters = kwargs.get('battery_parameters', battery_parameters)
self.dg_parameters = kwargs.get('dg_parameters', dg_parameters)
self.solar_parameters = kwargs.get('solar_parameters', solar_parameters)
self.wind_parameters = kwargs.get('wind_parameters', wind_parameters)
self.penalty_coefficient = 50 # 约束惩罚系数
self.sell_coefficient = 0.5 # 售出利润系数
self.grid = Grid()
self.battery = Battery(self.battery_parameters)
self.dg1 = DG(self.dg_parameters['gen_1'])
self.dg2 = DG(self.dg_parameters['gen_2'])
self.dg3 = DG(self.dg_parameters['gen_3'])
self.solar = Solar(self.solar_parameters)
self.wind = Wind(self.wind_parameters)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(5,), dtype=np.float32) # 已增加调节电压动作
self.state_space = gym.spaces.Box(low=0, high=1, shape=(10,), dtype=np.float32)
def reset(self, *args):
self.month = np.random.randint(1, 13) # choose 12 month
if self.TRAIN:
self.day = np.random.randint(1, 20)
else:
self.day = np.random.randint(20, Constant.MONTHS_LEN[self.month] - 1)
# self.time_counter.increment_day()
# self.month, self.day = self.time_counter.get_date()
self.current_time = 0
self.battery.reset()
self.dg1.reset()
self.dg2.reset()
self.dg3.reset()
self.solar.reset()
self.wind.reset()
return self._build_state()
def _build_state(self):
soc = self.battery.SOC()
dg1_output = self.dg1.current_output
dg2_output = self.dg2.current_output
dg3_output = self.dg3.current_output
time_step = self.current_time
price = self.data_manager.get_price_data(self.month, self.day, self.current_time)
houseload = self.data_manager.get_load_cons_data(self.month, self.day, self.current_time)
temperature = self.data_manager.get_temperature_data(self.month, self.day, self.current_time)
irradiance = self.data_manager.get_irradiance_data(self.month, self.day, self.current_time)
windspeed = self.data_manager.get_wind_data(self.month, self.day, self.current_time)
pv_gen = self.solar.step(temperature, irradiance)
wind_gen = self.wind.step(windspeed)
generation = pv_gen + wind_gen
netload = houseload - generation
obs = np.concatenate((np.float32(time_step), np.float32(price), np.float32(soc), np.float32(netload),
np.float32(dg1_output), np.float32(dg2_output), np.float32(dg3_output),
np.float32(temperature), np.float32(irradiance), np.float32(windspeed)), axis=None)
return obs
def step(self, action): # state transition: current_obs->take_action->get_reward->get_finish->next_obs
# 在每个组件中添加动作
current_obs = self._build_state()
temperature = current_obs[7]
irradiance = current_obs[8]
wind_speed = current_obs[9]
self.battery.step(action[0]) # 执行状态转换,电池当前容量也改变
self.dg1.step(action[1])
self.dg2.step(action[2])
self.dg3.step(action[3])
self.solar.step(temperature, irradiance, action[4])
self.wind.step(wind_speed)
self.current_output = np.array((self.dg1.current_output, self.dg2.current_output, self.dg3.current_output,
-self.battery.energy_change, self.solar.current_power, self.wind.current_power))
actual_production = sum(self.current_output)
price = current_obs[1]
netload = current_obs[3]
unbalance = actual_production - netload
reward = 0
excess_penalty = 0 # 过多
deficient_penalty = 0 # 过少
sell_benefit = 0
buy_cost = 0
self.excess = 0
self.shedding = 0
if unbalance >= 0: # 现在过剩
if unbalance <= self.grid.exchange_ability:
# sell money to grid is little [0.029,0.1]
sell_benefit = self.grid.get_cost(price, unbalance) * self.sell_coefficient
else:
sell_benefit = self.grid.get_cost(price, self.grid.exchange_ability) * self.sell_coefficient
# real unbalance电网也无法满足
self.excess = unbalance - self.grid.exchange_ability
excess_penalty = self.excess * self.penalty_coefficient
else: # unbalance <0, 采用缺少惩罚
if abs(unbalance) <= self.grid.exchange_ability:
buy_cost = self.grid.get_cost(price, abs(unbalance))
else:
buy_cost = self.grid.get_cost(price, self.grid.exchange_ability)
self.shedding = abs(unbalance) - self.grid.exchange_ability
deficient_penalty = self.shedding * self.penalty_coefficient
battery_cost = self.battery.get_cost(self.battery.energy_change, self.battery.current_capacity)
dg1_cost = self.dg1.get_cost(self.dg1.current_output)
dg2_cost = self.dg2.get_cost(self.dg2.current_output)
dg3_cost = self.dg3.get_cost(self.dg3.current_output)
solar_cost = self.solar.get_cost(self.solar.current_power)
wind_cost = self.wind.gen_cost(self.wind.current_power)
self.operation_cost = (battery_cost + dg1_cost + dg2_cost + dg3_cost + solar_cost + wind_cost + excess_penalty +
deficient_penalty - sell_benefit + buy_cost)
reward -= self.operation_cost / 1e3
self.unbalance = unbalance
self.real_unbalance = self.shedding + self.excess
final_step_outputs = [self.dg1.current_output, self.dg2.current_output, self.dg3.current_output,
self.battery.current_capacity, self.solar.current_power, self.wind.current_power]
self.current_time += 1
finish = (self.current_time == self.episode_length)
if finish:
self.final_step_outputs = final_step_outputs
self.current_time = 0
next_obs = self.reset()
else:
next_obs = self._build_state()
return current_obs, next_obs, float(reward), finish
def _load_year_data(self):
price_df = pd.read_csv('data/prices.csv', sep=',')
load_df = pd.read_csv('data/houseload.csv', sep=',')
irradiance_df = pd.read_csv('data/irradiance.csv', sep=',')
temperature_df = pd.read_csv('data/temper.csv', sep=',')
wind_df = pd.read_csv('data/wind.csv', sep=',')
price = price_df['price'].to_numpy(dtype=float)
load = load_df['houseload'].to_numpy(dtype=float)
irradiance = irradiance_df['irradiance'].to_numpy(dtype=float)
temperature = temperature_df['t2m'].to_numpy(dtype=float)
wind = wind_df['wind_speed'].to_numpy(dtype=float)
'''重新设计价格、发电量及需求的大小'''
def process_elements(elements, transform_function, add_function):
for element in elements:
transformed_element = transform_function(element)
add_function(transformed_element)
process_elements(price, lambda x: max(x / 10, 0.5), self.data_manager.add_price_element)
process_elements(load, lambda x: x * 3, self.data_manager.add_load_element)
process_elements(irradiance, lambda x: x, self.data_manager.add_irradiance_element)
process_elements(temperature, lambda x: x - 273.15, self.data_manager.add_temperature_element)
process_elements(wind, lambda x: x, self.data_manager.add_wind_element)