339 lines
15 KiB
Python
339 lines
15 KiB
Python
import json
|
||
import pandas as pd
|
||
import math
|
||
from scipy.optimize import fsolve
|
||
from shapely.geometry import Point, shape
|
||
|
||
# 默认文件路径
|
||
PV_EXCEL_PATH = r"./pv_product.xlsx" # 光伏组件数值
|
||
TILT_EXCEL_PATH = r"./倾角_峰值小时数.xlsx" # 倾角和峰值小时数
|
||
GEOJSON_PATH = r"./中国_市.geojson" # GeoJSON 文件
|
||
|
||
|
||
def calculate_pv_metrics(component_name, electricity_price, pv_number, q, longitude, latitude,
|
||
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4, E_S=1.0, K=0.8):
|
||
"""
|
||
计算光伏项目的各项指标,包括装机容量、年发电量、等效小时数、环境收益和内部收益率(IRR)。
|
||
|
||
参数:
|
||
component_name (str): 光伏组件名称,例如 "TWMHF-66HD715"
|
||
electricity_price (float): 电价(元/kWh)
|
||
pv_number (int): 光伏组件数量
|
||
q (float): 运维成本占初始投资成本的比例(例如 0.02 表示 2%)
|
||
longitude (float): 经度
|
||
latitude (float): 纬度
|
||
is_fixed (bool): 是否为固定式支架,True 为固定式,False 为跟踪式
|
||
optimize (bool): 是否优化倾角和方位角
|
||
peak_load_hour (int): 峰值负荷小时,默认 16
|
||
cost_per_kw (float): 每 kW 投资成本(元/kW),默认 3.4 元/kW
|
||
E_S (float): 标准辐射量,默认 1.0
|
||
K (float): 系统效率,默认 0.8
|
||
|
||
返回:
|
||
dict: 包含以下结果的字典:
|
||
- component_name: 组件名称
|
||
- tilt: 倾角 (度)
|
||
- azimuth: 方位角 (度)
|
||
- array_distance: 阵列间距 (米)
|
||
- max_power: 单组件最大功率 (Wp)
|
||
- capacity: 装机容量 (kW)
|
||
- peak_sunshine_hours: 峰值日照小时数 (小时/天)
|
||
- single_daily_energy: 一天单个组件发电量 (kWh)
|
||
- annual_energy: 年发电量 (kWh)
|
||
- equivalent_hours: 等效小时数 (小时)
|
||
- coal_reduction: 标准煤减排量 (kg)
|
||
- CO2_reduction: CO₂ 减排量 (kg)
|
||
- SO2_reduction: SO₂ 减排量 (kg)
|
||
- NOX_reduction: NOx 减排量 (kg)
|
||
- IRR: 内部收益率 (%)
|
||
"""
|
||
|
||
# 1. 加载 GeoJSON 数据
|
||
def load_geojson(file_path):
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
data = json.load(file)
|
||
return data
|
||
except FileNotFoundError:
|
||
raise FileNotFoundError(f"未找到GeoJSON文件:{file_path}")
|
||
except Exception as e:
|
||
raise Exception(f"加载GeoJSON出错:{e}")
|
||
|
||
# 2. 根据经纬度查找城市
|
||
def find_city_by_coordinates(lat, lon, geojson_data):
|
||
point = Point(lon, lat)
|
||
for feature in geojson_data['features']:
|
||
polygon = shape(feature['geometry'])
|
||
if polygon.contains(point):
|
||
city_name = feature['properties'].get('name', '未知城市')
|
||
# 规范化城市名称(去除“市”或“地区”等后缀)
|
||
if city_name.endswith('市') or city_name.endswith('地区'):
|
||
city_name = city_name[:-1]
|
||
return city_name
|
||
return '未知城市'
|
||
|
||
# 3. 从 Excel 获取倾角和峰值日照小时数
|
||
def get_tilt_and_peak_hours(lat, lon, excel_path=TILT_EXCEL_PATH, geojson_path=GEOJSON_PATH):
|
||
try:
|
||
# 加载 GeoJSON 数据
|
||
geojson_data = load_geojson(geojson_path)
|
||
# 查找城市
|
||
city = find_city_by_coordinates(lat, lon, geojson_data)
|
||
# 读取 Excel 文件
|
||
df = pd.read_excel(excel_path)
|
||
if len(df.columns) < 5:
|
||
raise ValueError("Excel文件需包含至少5列:城市、倾角、峰值日照小时数等")
|
||
# 匹配城市名称(前两个字)
|
||
matched_row = df[df.iloc[:, 2].str.startswith(city, na=False)]
|
||
if matched_row.empty:
|
||
raise ValueError(f"未找到匹配的城市:{city}")
|
||
# 提取并转换为浮点数
|
||
tilt = float(matched_row.iloc[0, 3]) # 第四列:倾角
|
||
peak_hours = float(matched_row.iloc[0, 4]) # 第五列:PSH
|
||
return {
|
||
"tilt": tilt,
|
||
"peak_sunshine_hours": peak_hours
|
||
}
|
||
except FileNotFoundError:
|
||
raise FileNotFoundError(f"未找到Excel文件:{excel_path}")
|
||
except ValueError as e:
|
||
raise ValueError(f"Excel数据转换错误:{e}")
|
||
except Exception as e:
|
||
raise Exception(f"读取Excel或GeoJSON出错:{e}")
|
||
|
||
# 4. 计算倾角和方位角
|
||
def get_tilt_and_azimuth(is_fixed=True, optimize=True, longitude=116, latitude=None, peak_load_hour=16):
|
||
if optimize and latitude is None:
|
||
raise ValueError("优化模式下需提供纬度")
|
||
|
||
if is_fixed:
|
||
if optimize:
|
||
# 从 Excel 获取倾角
|
||
tilt_data = get_tilt_and_peak_hours(latitude, longitude)
|
||
tilt = tilt_data["tilt"]
|
||
if not (0 <= tilt <= 90):
|
||
raise ValueError(f"Excel中的倾角 {tilt:.2f}° 超出合理范围(0°-90°)")
|
||
azimuth = (peak_load_hour - 12) * 15 + (longitude - 116) # 方位角公式
|
||
azimuth = azimuth % 360 if azimuth >= 0 else azimuth + 360
|
||
else:
|
||
print("倾角:0°(水平)-90°(垂直) | 方位角:0°(正北)-180°(正南),顺时针")
|
||
tilt = float(input("请输入倾角(度):"))
|
||
azimuth = float(input("请输入方位角(度):"))
|
||
if not (0 <= tilt <= 90) or not (0 <= azimuth <= 360):
|
||
raise ValueError("倾角需在0-90°,方位角需在0-360°")
|
||
else: # 跟踪式
|
||
azimuth = 180 # 固定朝南
|
||
if optimize:
|
||
tilt_data = get_tilt_and_peak_hours(latitude, longitude)
|
||
tilt = tilt_data["tilt"]
|
||
if not (0 <= tilt <= 90):
|
||
raise ValueError(f"Excel中的倾角 {tilt:.2f}° 超出合理范围(0°-90°)")
|
||
else:
|
||
print("倾角:0°(水平)-90°(垂直)")
|
||
tilt = float(input("请输入倾角(度):"))
|
||
if not (0 <= tilt <= 90):
|
||
raise ValueError("倾角需在0-90°")
|
||
return tilt, azimuth
|
||
|
||
# 5. 获取光伏组件信息
|
||
def get_pv_product_info(component_name, excel_path=PV_EXCEL_PATH):
|
||
try:
|
||
df = pd.read_excel(excel_path)
|
||
if len(df.columns) < 10:
|
||
raise ValueError("Excel文件需包含至少10列:组件名称、尺寸、功率等")
|
||
row = df[df.iloc[:, 1] == component_name]
|
||
if row.empty:
|
||
raise ValueError(f"未找到组件:{component_name}")
|
||
return {
|
||
"component_name": component_name,
|
||
"max_power": row.iloc[0, 5],
|
||
"efficiency": row.iloc[0, 9],
|
||
"pv_size": row.iloc[0, 3]
|
||
}
|
||
except FileNotFoundError:
|
||
raise FileNotFoundError(f"未找到Excel文件:{excel_path}")
|
||
except Exception as e:
|
||
raise Exception(f"读取Excel出错:{e}")
|
||
|
||
# 6. 计算光伏阵列间距
|
||
def calculate_array_distance(L, tilt, latitude):
|
||
beta_rad = math.radians(tilt)
|
||
phi_rad = math.radians(latitude)
|
||
return (L * math.cos(beta_rad) +
|
||
L * math.sin(beta_rad) * 0.707 * math.tan(phi_rad) +
|
||
0.4338 * math.tan(phi_rad))
|
||
|
||
# 7. 计算等效小时数
|
||
def calculate_equivalent_hours(P, P_r):
|
||
if P_r == 0:
|
||
raise ValueError("额定功率不能为 0")
|
||
h = P / P_r # 单位换算
|
||
return h
|
||
|
||
# 8. 计算装机容量
|
||
def calculate_installed_capacity(max_power, num_components):
|
||
if max_power < 0 or num_components < 0 or not isinstance(num_components, int):
|
||
raise ValueError("功率和数量需为非负数,数量需为整数")
|
||
return (max_power * num_components) / 1000 # 单位:kW
|
||
|
||
# 9. 计算年发电量
|
||
def calculate_annual_energy(peak_hours, capacity, E_S=E_S, K=K):
|
||
if any(x < 0 for x in [peak_hours, capacity]) or E_S <= 0 or not 0 <= K <= 1:
|
||
raise ValueError("输入参数需满足:辐射量、容量≥0,E_S>0,K∈[0,1]")
|
||
return peak_hours * 365 * (capacity / E_S) * K # 单位:kWh
|
||
|
||
# 10. 计算环境收益
|
||
def calculate_environmental_benefits(E_p_million_kwh):
|
||
if E_p_million_kwh < 0:
|
||
raise ValueError("年发电量需≥0")
|
||
return {
|
||
"coal_reduction": E_p_million_kwh * 0.404 * 10,
|
||
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
|
||
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
|
||
"NOX_reduction": E_p_million_kwh * 0.015 * 10
|
||
}
|
||
|
||
# 11. 计算净现值和内部收益率
|
||
def calculate_reference_yield(E_p, electricity_price, IC, q, n=25):
|
||
if E_p < 0 or electricity_price < 0 or IC <= 0 or not 0 <= q <= 1:
|
||
raise ValueError("发电量、电价≥0,投资成本>0,回收比例∈[0,1]")
|
||
|
||
def npv_equation(irr, p, w, ic, q_val, n=n):
|
||
term1 = (1 + irr) ** (-1)
|
||
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
|
||
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||
return pv_revenue - ic + pv_salvage
|
||
|
||
irr_guess = 0.1
|
||
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
|
||
if not 0 <= irr <= 1:
|
||
raise ValueError(f"IRR计算结果{irr:.4f}不合理,请检查输入")
|
||
npv = npv_equation(irr, E_p, electricity_price, IC, q)
|
||
return {"NPV": npv, "IRR": irr * 100}
|
||
|
||
# 主计算流程
|
||
try:
|
||
# 验证经纬度
|
||
if not (-90 <= latitude <= 90):
|
||
raise ValueError("纬度必须在 -90 到 90 之间")
|
||
if not (-180 <= longitude <= 180):
|
||
raise ValueError("经度必须在 -180 到 180 之间")
|
||
|
||
# 获取倾角和方位角
|
||
tilt, azimuth = get_tilt_and_azimuth(
|
||
is_fixed=is_fixed,
|
||
optimize=optimize,
|
||
longitude=longitude,
|
||
latitude=latitude,
|
||
peak_load_hour=peak_load_hour
|
||
)
|
||
|
||
# 获取峰值日照小时数
|
||
peak_hours = get_tilt_and_peak_hours(latitude, longitude)["peak_sunshine_hours"]
|
||
|
||
# 获取组件信息
|
||
pv_info = get_pv_product_info(component_name)
|
||
width_mm = float(pv_info["pv_size"].split("×")[1])
|
||
L = (width_mm / 1000) * 4
|
||
array_distance = calculate_array_distance(L, tilt, latitude)
|
||
|
||
# 计算装机容量
|
||
max_power = pv_info["max_power"]
|
||
capacity = calculate_installed_capacity(max_power, pv_number) # 单位:kW
|
||
|
||
# 计算一天单个组件发电量
|
||
single_daily_energy = peak_hours * (capacity / pv_number) * K # 单位:kWh
|
||
|
||
# 计算年发电量
|
||
E_p = calculate_annual_energy(peak_hours, capacity, E_S, K) # 单位:kWh
|
||
|
||
# 计算等效小时数
|
||
h = calculate_equivalent_hours(E_p, capacity) # P_r 单位为 kW,E_p 单位为 kWh
|
||
|
||
# 计算环境收益(转换为百万 kWh)
|
||
E_p_million_kwh = E_p / 1000000 # 转换为百万 kWh
|
||
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
|
||
|
||
# 计算初始投资成本
|
||
IC = capacity * cost_per_kw * 1000 # 单位:元
|
||
|
||
# 计算 IRR
|
||
ref_yield = calculate_reference_yield(E_p, electricity_price, IC, q)
|
||
|
||
# 返回结果
|
||
return {
|
||
"component_name": component_name,
|
||
"tilt": tilt,
|
||
"azimuth": azimuth,
|
||
"array_distance": array_distance,
|
||
"max_power": max_power,
|
||
"capacity": capacity,
|
||
"peak_sunshine_hours": peak_hours,
|
||
"single_daily_energy": single_daily_energy,
|
||
"annual_energy": E_p,
|
||
"equivalent_hours": h,
|
||
"coal_reduction": env_benefits["coal_reduction"],
|
||
"CO2_reduction": env_benefits["CO2_reduction"],
|
||
"SO2_reduction": env_benefits["SO2_reduction"],
|
||
"NOX_reduction": env_benefits["NOX_reduction"],
|
||
"IRR": ref_yield["IRR"]
|
||
}
|
||
|
||
except Exception as e:
|
||
raise Exception(f"计算过程中发生错误: {str(e)}")
|
||
|
||
|
||
# 示例用法
|
||
if __name__ == "__main__":
|
||
try:
|
||
# 输入并验证经纬度
|
||
latitude = float(input("请输入纬度(-90 到 90,例如 39.9042):"))
|
||
if not (-90 <= latitude <= 90):
|
||
raise ValueError("纬度必须在 -90 到 90 之间")
|
||
longitude = float(input("请输入经度(-180 到 180,例如 116.4074):"))
|
||
if not (-180 <= longitude <= 180):
|
||
raise ValueError("经度必须在 -180 到 180 之间")
|
||
|
||
component_name = input("请输入组件名称:")
|
||
electricity_price = float(input("请输入电价(元/kWh):"))
|
||
pv_number = 1200 # 固定组件数量
|
||
q = 0.02 # 运维成本占初始投资成本的比例
|
||
choice = input("选择光伏支架(固定式/跟踪式):")
|
||
optimize = input("是否优化倾角和方位角(是/否):")
|
||
|
||
# 调用主函数
|
||
result = calculate_pv_metrics(
|
||
component_name=component_name,
|
||
electricity_price=electricity_price,
|
||
pv_number=pv_number,
|
||
q=q,
|
||
is_fixed=(choice == "固定式"),
|
||
optimize=(optimize.lower() == "是"),
|
||
longitude=longitude,
|
||
latitude=latitude
|
||
)
|
||
|
||
# 打印结果
|
||
print("\n")
|
||
print(f"组件名称:{result['component_name']}")
|
||
print(f"倾角:{result['tilt']:.2f}° | 方位角:{result['azimuth']:.2f}°")
|
||
print(f"阵列间距:{result['array_distance']:.2f} 米")
|
||
print(f"单个组件最大功率(Wp):{result['max_power']}")
|
||
print(f"装机容量:{result['capacity']:.2f} kW")
|
||
print(f"峰值日照小时数:{result['peak_sunshine_hours']:.2f} 小时/天")
|
||
print(f"一天单个组件发电量:{result['single_daily_energy']:.0f} kWh")
|
||
print(f"年发电量:{result['annual_energy']:,.0f} kWh")
|
||
print(f"等效小时数:{result['equivalent_hours']:.0f} 小时")
|
||
print("环境收益:")
|
||
print(f"标准煤减排量:{result['coal_reduction']:,.0f} kg")
|
||
print(f"CO₂减排量:{result['CO2_reduction']:,.0f} kg")
|
||
print(f"SO₂减排量:{result['SO2_reduction']:,.0f} kg")
|
||
print(f"NOx减排量:{result['NOX_reduction']:,.0f} kg")
|
||
print(f"内部收益率 IRR:{result['IRR']:.2f}%")
|
||
|
||
|
||
except ValueError as e:
|
||
print(f"输入错误:{e}")
|
||
except Exception as e:
|
||
print(f"错误:{e}") |