Compare commits
No commits in common. "main" and "3bb8c38050cb8f0265cc0f2b5c87f635e45369e6" have entirely different histories.
main
...
3bb8c38050
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,583 @@
|
|||
import pandas as pd
|
||||
import math
|
||||
import numpy as np
|
||||
import requests
|
||||
from scipy.optimize import fsolve
|
||||
from tabulate import tabulate
|
||||
|
||||
# 默认文件路径
|
||||
PV_EXCEL_PATH = r"./pv_product.xlsx" # 请确保此文件存在或更改为正确路径
|
||||
|
||||
# 地形类型与复杂性因子范围
|
||||
TERRAIN_COMPLEXITY_RANGES = {
|
||||
"distributed": {
|
||||
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
|
||||
"灌木": (1.3, 1.5), "湿地": (1.5, 1.8), "林地": (1.5, 1.8), "建筑": (1.2, 1.5)
|
||||
},
|
||||
"centralized": {
|
||||
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
|
||||
"灌木": (1.3, 1.6), "湿地": (1.5, 1.8), "林地": (1.6, 2.0)
|
||||
},
|
||||
"floating": {"水域": (1.2, 1.5)}
|
||||
}
|
||||
|
||||
# 地形类型与土地可用性、发电效率的映射
|
||||
TERRAIN_ADJUSTMENTS = {
|
||||
"耕地": {"land_availability": 0.85, "K": 0.8}, "裸地": {"land_availability": 0.85, "K": 0.8},
|
||||
"草地": {"land_availability": 0.85, "K": 0.8}, "灌木": {"land_availability": 0.75, "K": 0.75},
|
||||
"湿地": {"land_availability": 0.65, "K": 0.75}, "水域": {"land_availability": 0.85, "K": 0.8},
|
||||
"林地": {"land_availability": 0.65, "K": 0.7}, "建筑": {"land_availability": 0.6, "K": 0.75}
|
||||
}
|
||||
|
||||
# 光伏类型的装机容量上限 (MW/平方千米)
|
||||
CAPACITY_LIMITS = {
|
||||
"distributed": 25.0, "centralized": 50.0, "floating": 25.0
|
||||
}
|
||||
|
||||
# 实际面板间距系数
|
||||
PANEL_SPACING_FACTORS = {
|
||||
"distributed": 1.5, "centralized": 1.2, "floating": 1.3
|
||||
}
|
||||
|
||||
|
||||
def calculate_psh_average(lat, lon, start_year=2010, end_year=2023):
|
||||
"""
|
||||
从 NASA POWER API 获取峰值日照小时数(PSH)。
|
||||
返回:平均 PSH(小时/天),失败时返回默认值 4.0
|
||||
"""
|
||||
print("DEBUG: Starting calculate_psh_average (version 2025-04-28)")
|
||||
url = "https://power.larc.nasa.gov/api/temporal/monthly/point"
|
||||
params = {
|
||||
"parameters": "ALLSKY_SFC_SW_DWN",
|
||||
"community": "RE",
|
||||
"longitude": lon,
|
||||
"latitude": lat,
|
||||
"format": "JSON",
|
||||
"start": str(start_year),
|
||||
"end": str(end_year)
|
||||
}
|
||||
try:
|
||||
print(f"DEBUG: Requesting NASA POWER API for lat={lat}, lon={lon}")
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response.raise_for_status()
|
||||
print("DEBUG: API response received")
|
||||
data = response.json()
|
||||
|
||||
print("DEBUG: Validating API data")
|
||||
if "properties" not in data or "parameter" not in data["properties"]:
|
||||
print("ERROR: NASA POWER API returned invalid data format")
|
||||
return 4.0
|
||||
|
||||
ghi_data = data["properties"]["parameter"].get("ALLSKY_SFC_SW_DWN", {})
|
||||
if not ghi_data:
|
||||
print("ERROR: No GHI data found in API response")
|
||||
return 4.0
|
||||
|
||||
print("DEBUG: Filtering GHI data")
|
||||
print(f"DEBUG: Raw GHI data keys: {list(ghi_data.keys())}")
|
||||
ghi_data = {k: v for k, v in ghi_data.items() if not k.endswith("13")}
|
||||
if not ghi_data:
|
||||
print("ERROR: No valid GHI data after filtering")
|
||||
return 4.0
|
||||
print(f"DEBUG: Filtered GHI data keys: {list(ghi_data.keys())}")
|
||||
|
||||
print("DEBUG: Creating DataFrame")
|
||||
df = pd.DataFrame.from_dict(ghi_data, orient="index", columns=["GHI (kWh/m²/day)"])
|
||||
print(f"DEBUG: Original DataFrame index: {list(df.index)}")
|
||||
|
||||
print("DEBUG: Reformatting DataFrame index")
|
||||
new_index = []
|
||||
for k in df.index:
|
||||
try:
|
||||
# 验证索引格式
|
||||
if not (isinstance(k, str) and len(k) == 6 and k.isdigit()):
|
||||
print(f"ERROR: Invalid index format for {k}")
|
||||
return 4.0
|
||||
year = k[:4]
|
||||
month = k[-2:]
|
||||
formatted_index = f"{year}-{month:0>2}" # 使用 :0>2 确保两位数字
|
||||
print(f"DEBUG: Formatting index {k} -> {formatted_index}")
|
||||
new_index.append(formatted_index)
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to format index {k}: {e}")
|
||||
return 4.0
|
||||
df.index = new_index
|
||||
print(f"DEBUG: Reformatted DataFrame index: {list(df.index)}")
|
||||
|
||||
if df.empty:
|
||||
print("ERROR: DataFrame is empty")
|
||||
return 4.0
|
||||
|
||||
print("DEBUG: Calculating PSH")
|
||||
df["PSH (hours/day)"] = df["GHI (kWh/m²/day)"]
|
||||
if df["PSH (hours/day)"].isna().any():
|
||||
print("ERROR: PSH data contains invalid values")
|
||||
return 4.0
|
||||
|
||||
print("DEBUG: Grouping by year")
|
||||
df['Year'] = df.index.str[:4]
|
||||
print(f"DEBUG: Year column: {list(df['Year'])}")
|
||||
annual_avg = df.groupby('Year')['PSH (hours/day)'].mean()
|
||||
print(f"DEBUG: Annual averages: {annual_avg.to_dict()}")
|
||||
|
||||
if annual_avg.empty:
|
||||
print("ERROR: Annual average PSH data is empty")
|
||||
return 4.0
|
||||
|
||||
print("DEBUG: Calculating final PSH")
|
||||
psh = annual_avg.mean()
|
||||
if math.isnan(psh):
|
||||
print("ERROR: PSH calculation resulted in NaN")
|
||||
return 4.0
|
||||
|
||||
print(f"DEBUG: PSH calculated successfully, value={psh:.2f}")
|
||||
print(f"获取成功!平均PSH: {psh:.2f} 小时/天")
|
||||
return psh
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"ERROR: NASA POWER API request failed: {e}")
|
||||
return 4.0
|
||||
except Exception as e:
|
||||
print(f"ERROR: Error processing API data: {e}")
|
||||
return 4.0
|
||||
|
||||
|
||||
def calculate_optimal_tilt(lat):
|
||||
"""根据纬度计算最佳倾角(单位:度)"""
|
||||
try:
|
||||
lat_abs = abs(lat)
|
||||
if lat_abs < 25:
|
||||
optimal_tilt = lat_abs * 0.87
|
||||
elif lat_abs <= 50:
|
||||
optimal_tilt = lat_abs * 0.76 + 3.1
|
||||
else:
|
||||
optimal_tilt = lat_abs * 0.5 + 16.3
|
||||
return optimal_tilt
|
||||
except ValueError as e:
|
||||
raise Exception(f"倾角计算错误: {e}")
|
||||
|
||||
|
||||
def pv_area(panel_capacity, slope_deg, shading_factor=0.1, land_compactness=1.0, terrain_complexity=1.0):
|
||||
"""计算单块光伏组件占地面积"""
|
||||
base_area = panel_capacity * 6
|
||||
slope_factor = 1 + (slope_deg / 50) if slope_deg <= 15 else 1.5
|
||||
shade_factor = 1 + shading_factor * 2
|
||||
compact_factor = 1 / land_compactness if land_compactness > 0 else 1.5
|
||||
terrain_factor = terrain_complexity
|
||||
return base_area * slope_factor * shade_factor * compact_factor * terrain_factor
|
||||
|
||||
|
||||
def calculate_pv_potential(available_area_sq_km, component_name, longitude, latitude, slope_deg=10,
|
||||
shading_factor=0.1, land_compactness=0.8, terrain_complexity=1.2,
|
||||
terrain_type="耕地", pv_type="centralized", land_availability=0.85,
|
||||
min_irradiance=800, max_slope=25, electricity_price=0.65, q=0.02,
|
||||
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4,
|
||||
E_S=1.0, K=0.8, project_lifetime=25, discount_rate=0.06):
|
||||
"""计算最小和最大组件数量的光伏系统潜力"""
|
||||
# 输入验证
|
||||
if available_area_sq_km <= 0:
|
||||
raise ValueError("可用面积必须大于0")
|
||||
if slope_deg < 0 or slope_deg > max_slope:
|
||||
raise ValueError(f"坡度必须在0-{max_slope}度之间")
|
||||
|
||||
# 转换为公顷
|
||||
available_area_hectares = available_area_sq_km * 100
|
||||
|
||||
# 验证光伏类型与地形类型
|
||||
valid_terrains = TERRAIN_COMPLEXITY_RANGES.get(pv_type, {})
|
||||
if terrain_type not in valid_terrains:
|
||||
raise ValueError(f"{pv_type} 光伏不支持 {terrain_type} 地形。可选地形:{list(valid_terrains.keys())}")
|
||||
|
||||
# 获取地形调整参数
|
||||
terrain_adjustments = TERRAIN_ADJUSTMENTS.get(terrain_type, {"land_availability": 0.85, "K": 0.8})
|
||||
adjusted_land_availability = terrain_adjustments["land_availability"] / max(1.0, terrain_complexity)
|
||||
adjusted_K = terrain_adjustments["K"] / max(1.0, terrain_complexity)
|
||||
|
||||
# 获取组件信息
|
||||
pv_info = get_pv_product_info(component_name)
|
||||
single_panel_capacity = pv_info["max_power"] / 1000 # kWp
|
||||
pv_size = pv_info["pv_size"].split("×")
|
||||
panel_length = float(pv_size[0]) / 1000 # 米
|
||||
panel_width = float(pv_size[1]) / 1000 # 米
|
||||
panel_area_sqm = panel_length * panel_width
|
||||
|
||||
# 获取阵列间距
|
||||
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
|
||||
array_distance = calculate_array_distance(panel_width * 1.1, tilt, latitude)
|
||||
spacing_factor = PANEL_SPACING_FACTORS.get(pv_type, 1.2)
|
||||
adjusted_array_distance = array_distance * spacing_factor
|
||||
|
||||
# 计算有效面积
|
||||
effective_area_hectares = available_area_hectares * adjusted_land_availability
|
||||
effective_area_sqm = effective_area_hectares * 10000
|
||||
|
||||
# 计算每MW占地面积
|
||||
area_per_mw = 10000 * (1 + slope_deg / 50 if slope_deg <= 15 else 1.5) * (
|
||||
1 + shading_factor * 2) * terrain_complexity * spacing_factor
|
||||
|
||||
# 容量密度限制 (kW/m²)
|
||||
capacity_density_limit = CAPACITY_LIMITS.get(pv_type, 5.0) / 1000
|
||||
max_capacity_by_density = effective_area_sqm * capacity_density_limit
|
||||
|
||||
# 计算单块组件占地面积
|
||||
row_spacing = panel_length * math.sin(math.radians(tilt)) + adjusted_array_distance
|
||||
effective_panel_area = panel_area_sqm * (row_spacing / panel_length) * 1.2
|
||||
|
||||
# 调整 min/max 布局以确保差异
|
||||
min_area_per_panel = effective_panel_area * 0.8 # 密集布局
|
||||
max_area_per_panel = effective_panel_area * 1.5 # 稀疏布局
|
||||
|
||||
max_panels = math.floor(effective_area_sqm / min_area_per_panel)
|
||||
min_panels = math.floor(effective_area_sqm / max_area_per_panel)
|
||||
|
||||
# 计算装机容量
|
||||
max_capacity_raw = calculate_installed_capacity(pv_info["max_power"], max_panels)
|
||||
min_capacity_raw = calculate_installed_capacity(pv_info["max_power"], min_panels)
|
||||
|
||||
# 应用容量密度限制
|
||||
max_capacity = min(max_capacity_raw, max_capacity_by_density)
|
||||
min_capacity = min(min_capacity_raw, max_capacity_by_density * 0.8) # 稀疏布局取80%
|
||||
|
||||
# 检查理论上限
|
||||
theoretical_max_capacity_mw = available_area_sq_km * CAPACITY_LIMITS.get(pv_type, 5.0)
|
||||
if max_capacity / 1000 > theoretical_max_capacity_mw:
|
||||
max_capacity = theoretical_max_capacity_mw * 1000
|
||||
max_panels = math.floor(max_capacity * 1000 / pv_info["max_power"])
|
||||
if min_capacity / 1000 > theoretical_max_capacity_mw * 0.8:
|
||||
min_capacity = theoretical_max_capacity_mw * 1000 * 0.8
|
||||
min_panels = math.floor(min_capacity * 1000 / pv_info["max_power"])
|
||||
|
||||
# 计算指标
|
||||
min_metrics = calculate_pv_metrics(
|
||||
component_name=component_name, electricity_price=electricity_price, pv_number=min_panels,
|
||||
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
|
||||
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
|
||||
override_capacity=min_capacity
|
||||
)
|
||||
min_lcoe = calculate_lcoe(
|
||||
capacity=min_metrics["capacity"], annual_energy=min_metrics["annual_energy"],
|
||||
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
|
||||
discount_rate=discount_rate
|
||||
)
|
||||
|
||||
max_metrics = calculate_pv_metrics(
|
||||
component_name=component_name, electricity_price=electricity_price, pv_number=max_panels,
|
||||
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
|
||||
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
|
||||
override_capacity=max_capacity
|
||||
)
|
||||
max_lcoe = calculate_lcoe(
|
||||
capacity=max_metrics["capacity"], annual_energy=max_metrics["annual_energy"],
|
||||
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
|
||||
discount_rate=discount_rate
|
||||
)
|
||||
|
||||
# 警告
|
||||
if min_panels == max_panels:
|
||||
print(f"警告:最小和最大组件数量相同 ({min_panels}),请检查地形复杂性或面积是否过小")
|
||||
if min_panels == 0 or max_panels == 0:
|
||||
print(f"警告:组件数量为0,请检查输入参数")
|
||||
|
||||
# 返回结果
|
||||
return {
|
||||
"min_case": {
|
||||
**min_metrics, "lcoe": min_lcoe, "actual_panels": min_panels,
|
||||
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
|
||||
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": max_area_per_panel,
|
||||
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
|
||||
},
|
||||
"max_case": {
|
||||
**max_metrics, "lcoe": max_lcoe, "actual_panels": max_panels,
|
||||
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
|
||||
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": min_area_per_panel,
|
||||
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def calculate_lcoe(capacity, annual_energy, cost_per_kw, q, project_lifetime=25, discount_rate=0.06):
|
||||
"""计算平准化度电成本(LCOE)"""
|
||||
total_investment = capacity * cost_per_kw * 1000
|
||||
annual_om_cost = total_investment * q
|
||||
discount_factors = [(1 + discount_rate) ** -t for t in range(1, project_lifetime + 1)]
|
||||
discounted_energy = sum(annual_energy * discount_factors[t] for t in range(project_lifetime))
|
||||
discounted_costs = total_investment + sum(annual_om_cost * discount_factors[t] for t in range(project_lifetime))
|
||||
if discounted_energy == 0:
|
||||
return float('inf')
|
||||
return discounted_costs / discounted_energy
|
||||
|
||||
|
||||
def get_pv_product_info(component_name, excel_path=PV_EXCEL_PATH):
|
||||
"""从Excel获取光伏组件信息"""
|
||||
try:
|
||||
df = pd.read_excel(excel_path)
|
||||
if len(df.columns) < 10:
|
||||
raise ValueError("Excel文件需包含至少10列:组件名称、尺寸、功率等")
|
||||
row = df[df.iloc[:, 1] == component_name]
|
||||
if row.empty:
|
||||
raise ValueError(f"未找到组件:{component_name}")
|
||||
return {
|
||||
"component_name": component_name,
|
||||
"max_power": row.iloc[0, 5],
|
||||
"efficiency": row.iloc[0, 9],
|
||||
"pv_size": row.iloc[0, 3]
|
||||
}
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"未找到Excel文件:{excel_path}")
|
||||
except Exception as e:
|
||||
raise Exception(f"读取Excel出错:{e}")
|
||||
|
||||
|
||||
def get_tilt_and_azimuth(is_fixed=True, optimize=True, longitude=116, latitude=None, peak_load_hour=16):
|
||||
"""计算光伏系统的倾角和方位角"""
|
||||
if optimize and latitude is None:
|
||||
raise ValueError("优化模式下需提供纬度")
|
||||
|
||||
if is_fixed:
|
||||
if optimize:
|
||||
tilt = calculate_optimal_tilt(latitude)
|
||||
azimuth = (peak_load_hour - 12) * 15 + (longitude - 116)
|
||||
azimuth = azimuth % 360 if azimuth >= 0 else azimuth + 360
|
||||
else:
|
||||
print("倾角:0°(水平)-90°(垂直) | 方位角:0°(正北)-180°(正南),顺时针")
|
||||
tilt = float(input("请输入倾角(度):"))
|
||||
azimuth = float(input("请输入方位角(度):"))
|
||||
if not (0 <= tilt <= 90) or not (0 <= azimuth <= 360):
|
||||
raise ValueError("倾角需在0-90°,方位角需在0-360°")
|
||||
else:
|
||||
azimuth = 180
|
||||
if optimize:
|
||||
tilt = calculate_optimal_tilt(latitude)
|
||||
else:
|
||||
print("倾角:0°(水平)-90°(垂直)")
|
||||
tilt = float(input("请输入倾角(度):"))
|
||||
if not (0 <= tilt <= 90):
|
||||
raise ValueError("倾角需在0-90°")
|
||||
return tilt, azimuth
|
||||
|
||||
|
||||
def calculate_array_distance(L, tilt, latitude):
|
||||
"""计算阵列间距"""
|
||||
beta_rad = math.radians(tilt)
|
||||
phi_rad = math.radians(latitude)
|
||||
return (L * math.cos(beta_rad) +
|
||||
L * math.sin(beta_rad) * 0.707 * math.tan(phi_rad) +
|
||||
0.4338 * math.tan(phi_rad))
|
||||
|
||||
|
||||
def calculate_equivalent_hours(P, P_r):
|
||||
"""计算等效小时数"""
|
||||
if P_r == 0:
|
||||
raise ValueError("额定功率不能为 0")
|
||||
return P / P_r
|
||||
|
||||
|
||||
def calculate_installed_capacity(max_power, num_components):
|
||||
"""计算装机容量"""
|
||||
if max_power < 0 or num_components < 0 or not isinstance(num_components, int):
|
||||
raise ValueError("功率和数量需为非负数,数量需为整数")
|
||||
return (max_power * num_components) / 1000 # 单位:kW
|
||||
|
||||
|
||||
def calculate_annual_energy(peak_hours, capacity, E_S=1.0, K=0.8):
|
||||
"""计算年发电量"""
|
||||
if any(x < 0 for x in [peak_hours, capacity]) or E_S <= 0 or not 0 <= K <= 1:
|
||||
raise ValueError("输入参数需满足:辐射量、容量≥0,E_S>0,K∈[0,1]")
|
||||
return peak_hours * 365 * (capacity / E_S) * K # 单位:kWh
|
||||
|
||||
|
||||
def calculate_environmental_benefits(E_p_million_kwh):
|
||||
"""计算环境收益"""
|
||||
if E_p_million_kwh < 0:
|
||||
raise ValueError("年发电量需≥0")
|
||||
return {
|
||||
"coal_reduction": E_p_million_kwh * 0.404 * 10,
|
||||
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
|
||||
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
|
||||
"NOX_reduction": E_p_million_kwh * 0.015 * 10
|
||||
}
|
||||
|
||||
|
||||
def calculate_reference_yield(E_p, electricity_price, IC, q, n=25):
|
||||
"""计算净现值(NPV)和内部收益率(IRR)"""
|
||||
if E_p < 0 or electricity_price < 0 or IC <= 0 or not 0 <= q <= 1:
|
||||
raise ValueError("发电量、电价≥0,投资成本>0,回收比例∈[0,1]")
|
||||
|
||||
def npv_equation(irr, p, w, ic, q_val, n=n):
|
||||
term1 = (1 + irr) ** (-1)
|
||||
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
|
||||
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||||
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||||
return pv_revenue - ic + pv_salvage
|
||||
|
||||
irr_guess = 0.1
|
||||
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
|
||||
if not 0 <= irr <= 1:
|
||||
raise ValueError(f"IRR计算结果{irr:.4f}不合理,请检查输入")
|
||||
npv = npv_equation(irr, E_p, electricity_price, IC, q)
|
||||
return {"NPV": npv, "IRR": irr * 100}
|
||||
|
||||
|
||||
def calculate_pv_metrics(component_name, electricity_price, pv_number, q, longitude, latitude,
|
||||
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4, E_S=1.0, K=0.8,
|
||||
override_capacity=None):
|
||||
"""计算光伏项目的各项指标"""
|
||||
try:
|
||||
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
|
||||
pv_info = get_pv_product_info(component_name)
|
||||
width_mm = float(pv_info["pv_size"].split("×")[1])
|
||||
L = (width_mm / 1000) * 1.1
|
||||
array_distance = calculate_array_distance(L, tilt, latitude)
|
||||
max_power = pv_info["max_power"]
|
||||
|
||||
# 使用提供的容量或计算容量
|
||||
if override_capacity is not None:
|
||||
capacity = override_capacity
|
||||
else:
|
||||
capacity = calculate_installed_capacity(max_power, pv_number)
|
||||
|
||||
# 使用NASA API获取峰值日照小时数
|
||||
peak_hours = calculate_psh_average(latitude, longitude)
|
||||
single_daily_energy = peak_hours * (capacity / pv_number) * K if pv_number > 0 else 0
|
||||
E_p = calculate_annual_energy(peak_hours, capacity, E_S, K)
|
||||
h = calculate_equivalent_hours(E_p, capacity) if capacity > 0 else 0
|
||||
E_p_million_kwh = E_p / 1000000
|
||||
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
|
||||
IC = capacity * cost_per_kw * 1000
|
||||
ref_yield = calculate_reference_yield(E_p, electricity_price, IC, q)
|
||||
return {
|
||||
"longitude": longitude,
|
||||
"latitude": latitude,
|
||||
"component_name": component_name,
|
||||
"tilt": tilt,
|
||||
"azimuth": azimuth,
|
||||
"array_distance": array_distance,
|
||||
"max_power": max_power,
|
||||
"capacity": capacity,
|
||||
"peak_sunshine_hours": peak_hours,
|
||||
"single_daily_energy": single_daily_energy,
|
||||
"annual_energy": E_p,
|
||||
"equivalent_hours": h,
|
||||
"coal_reduction": env_benefits["coal_reduction"],
|
||||
"CO2_reduction": env_benefits["CO2_reduction"],
|
||||
"SO2_reduction": env_benefits["SO2_reduction"],
|
||||
"NOX_reduction": env_benefits["NOX_reduction"],
|
||||
"IRR": ref_yield["IRR"]
|
||||
}
|
||||
except Exception as e:
|
||||
raise Exception(f"计算过程中发生错误: {str(e)}")
|
||||
|
||||
|
||||
def print_result(min_case, max_case):
|
||||
"""优化输出格式,使用表格展示结果"""
|
||||
headers = ["指标", "最小组件数量", "最大组件数量"]
|
||||
table_data = [
|
||||
["经度", f"{min_case['longitude']:.2f}", f"{max_case['longitude']:.2f}"],
|
||||
["纬度", f"{min_case['latitude']:.2f}", f"{max_case['latitude']:.2f}"],
|
||||
["光伏类型", min_case["pv_type"], max_case["pv_type"]],
|
||||
["地形类型", min_case["terrain_type"], max_case["terrain_type"]],
|
||||
["组件型号", min_case["component_name"], max_case["component_name"]],
|
||||
["识别面积 (平方千米)", f"{min_case['available_area_sq_km']:.2f}", f"{max_case['available_area_sq_km']:.2f}"],
|
||||
["识别面积 (公顷)", f"{min_case['available_area_hectares']:.2f}", f"{max_case['available_area_hectares']:.2f}"],
|
||||
["有效面积 (公顷)", f"{min_case['effective_area_hectares']:.2f}", f"{max_case['effective_area_hectares']:.2f}"],
|
||||
["理论最大容量 (MW)", f"{min_case['theoretical_max_capacity_mw']:.2f}",
|
||||
f"{max_case['theoretical_max_capacity_mw']:.2f}"],
|
||||
["单块组件占地 (m²)", f"{min_case['panel_area_sqm']:.2f}", f"{max_case['panel_area_sqm']:.2f}"],
|
||||
["组件数量", f"{min_case['actual_panels']:,}", f"{max_case['actual_panels']:,}"],
|
||||
["倾角 (度)", f"{min_case['tilt']:.2f}", f"{max_case['tilt']:.2f}"],
|
||||
["方位角 (度)", f"{min_case['azimuth']:.2f}", f"{max_case['azimuth']:.2f}"],
|
||||
["阵列间距 (m)", f"{min_case['array_distance']:.2f}", f"{max_case['array_distance']:.2f}"],
|
||||
["单块功率 (Wp)", f"{min_case['max_power']}", f"{max_case['max_power']}"],
|
||||
["装机容量 (MW)", f"{min_case['capacity'] / 1000:.2f}", f"{max_case['capacity'] / 1000:.2f}"],
|
||||
["峰值日照 (小时/天)", f"{min_case['peak_sunshine_hours']:.2f}", f"{max_case['peak_sunshine_hours']:.2f}"],
|
||||
["年发电量 (kWh)", f"{min_case['annual_energy']:,.0f}", f"{max_case['annual_energy']:,.0f}"],
|
||||
["等效小时数", f"{min_case['equivalent_hours']:.2f}", f"{max_case['equivalent_hours']:.2f}"],
|
||||
["LCOE (元/kWh)", f"{min_case['lcoe']:.4f}", f"{max_case['lcoe']:.4f}"],
|
||||
["标准煤减排 (kg)", f"{min_case['coal_reduction']:,.0f}", f"{max_case['coal_reduction']:,.0f}"],
|
||||
["CO₂减排 (kg)", f"{min_case['CO2_reduction']:,.0f}", f"{max_case['CO2_reduction']:,.0f}"],
|
||||
["SO₂减排 (kg)", f"{min_case['SO2_reduction']:,.0f}", f"{max_case['SO2_reduction']:,.0f}"],
|
||||
["NOx减排 (kg)", f"{min_case['NOX_reduction']:,.0f}", f"{max_case['NOX_reduction']:,.0f}"],
|
||||
["IRR (%)", f"{min_case['IRR']:.2f}", f"{max_case['IRR']:.2f}"]
|
||||
]
|
||||
print("\n===== 光伏系统潜力评估结果 =====")
|
||||
print(tabulate(table_data, headers=headers, tablefmt="grid"))
|
||||
|
||||
|
||||
# 主程序
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
try:
|
||||
# 输入参数
|
||||
print("\n======= 光伏系统潜力评估 =======")
|
||||
print("请输入以下必要参数:")
|
||||
|
||||
# 输入经纬度
|
||||
latitude = float(input("请输入纬度(-90到90,例如39.9):"))
|
||||
if not -90 <= latitude <= 90:
|
||||
raise ValueError("纬度必须在-90到90之间")
|
||||
|
||||
longitude = float(input("请输入经度(-180到180,例如116.4):"))
|
||||
if not -180 <= longitude <= 180:
|
||||
raise ValueError("经度必须在-180到180之间")
|
||||
|
||||
# 输入可用面积
|
||||
available_area_sq_km = float(input("请输入识别面积(平方千米,例如10):"))
|
||||
if available_area_sq_km <= 0:
|
||||
raise ValueError("识别面积必须大于0")
|
||||
|
||||
# 输入光伏类型
|
||||
pv_type = input("请输入光伏类型(distributed, centralized, floating):").lower()
|
||||
if pv_type not in ["distributed", "centralized", "floating"]:
|
||||
raise ValueError("光伏类型必须是 distributed, centralized 或 floating")
|
||||
|
||||
# 输入地形类型
|
||||
valid_terrains = list(TERRAIN_COMPLEXITY_RANGES.get(pv_type, {}).keys())
|
||||
print(f"支持的地形类型:{valid_terrains}")
|
||||
terrain_type = input("请输入地形类型:")
|
||||
if terrain_type not in valid_terrains:
|
||||
raise ValueError(f"不支持的地形类型:{terrain_type}")
|
||||
|
||||
# 输入组件型号
|
||||
component_name = input("请输入光伏组件型号(需在Excel中存在,例如M10-72H):")
|
||||
|
||||
# 输入其他参数
|
||||
slope_deg = float(input("请输入地形坡度(度,0-25,例如10):"))
|
||||
if not 0 <= slope_deg <= 25:
|
||||
raise ValueError("坡度必须在0-25度之间")
|
||||
|
||||
terrain_complexity = float(input("请输入地形复杂性因子(参考范围1.0-2.0,例如1.2):"))
|
||||
min_complexity, max_complexity = TERRAIN_COMPLEXITY_RANGES[pv_type][terrain_type]
|
||||
if not min_complexity <= terrain_complexity <= max_complexity:
|
||||
raise ValueError(f"地形复杂性因子必须在 {min_complexity}-{max_complexity} 之间")
|
||||
|
||||
electricity_price = float(input("请输入电价(元/kWh,例如0.65):"))
|
||||
if electricity_price < 0:
|
||||
raise ValueError("电价必须非负")
|
||||
|
||||
# 计算光伏潜力
|
||||
result = calculate_pv_potential(
|
||||
available_area_sq_km=available_area_sq_km,
|
||||
component_name=component_name,
|
||||
longitude=longitude,
|
||||
latitude=latitude,
|
||||
slope_deg=slope_deg,
|
||||
terrain_complexity=terrain_complexity,
|
||||
terrain_type=terrain_type,
|
||||
pv_type=pv_type,
|
||||
electricity_price=electricity_price
|
||||
)
|
||||
|
||||
# 输出结果
|
||||
print_result(result["min_case"], result["max_case"])
|
||||
|
||||
# 询问是否继续
|
||||
if input("\n是否继续评估?(y/n):").lower() != 'y':
|
||||
break
|
||||
|
||||
except ValueError as ve:
|
||||
print(f"输入错误:{ve}")
|
||||
except FileNotFoundError as fe:
|
||||
print(f"文件错误:{fe}")
|
||||
except Exception as e:
|
||||
print(f"发生错误:{e}")
|
||||
print("请重新输入参数或检查错误。\n")
|
|
@ -0,0 +1,585 @@
|
|||
import pandas as pd
|
||||
import math
|
||||
import numpy as np
|
||||
import requests
|
||||
from scipy.optimize import fsolve
|
||||
from tabulate import tabulate
|
||||
import json
|
||||
|
||||
# 默认文件路径
|
||||
PV_EXCEL_PATH = r"./pv_product.xlsx" # 请确保此文件存在或更改为正确路径
|
||||
CONFIG_PATH = r"./config.json" # 配置文件路径
|
||||
|
||||
# 地形类型与复杂性因子范围
|
||||
TERRAIN_COMPLEXITY_RANGES = {
|
||||
"distributed": {
|
||||
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
|
||||
"灌木": (1.3, 1.5), "湿地": (1.5, 1.8), "林地": (1.5, 1.8), "建筑": (1.2, 1.5)
|
||||
},
|
||||
"centralized": {
|
||||
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
|
||||
"灌木": (1.3, 1.6), "湿地": (1.5, 1.8), "林地": (1.6, 2.0)
|
||||
},
|
||||
"floating": {"水域": (1.2, 1.5)}
|
||||
}
|
||||
|
||||
# 地形类型与土地可用性、发电效率的映射
|
||||
TERRAIN_ADJUSTMENTS = {
|
||||
"耕地": {"land_availability": 0.85, "K": 0.8}, "裸地": {"land_availability": 0.85, "K": 0.8},
|
||||
"草地": {"land_availability": 0.85, "K": 0.8}, "灌木": {"land_availability": 0.75, "K": 0.75},
|
||||
"湿地": {"land_availability": 0.65, "K": 0.75}, "水域": {"land_availability": 0.85, "K": 0.8},
|
||||
"林地": {"land_availability": 0.65, "K": 0.7}, "建筑": {"land_availability": 0.6, "K": 0.75}
|
||||
}
|
||||
|
||||
# 光伏类型的装机容量上限 (MW/平方千米)
|
||||
CAPACITY_LIMITS = {
|
||||
"distributed": 25.0, "centralized": 50.0, "floating": 25.0
|
||||
}
|
||||
|
||||
# 实际面板间距系数
|
||||
PANEL_SPACING_FACTORS = {
|
||||
"distributed": 1.5, "centralized": 1.2, "floating": 1.3
|
||||
}
|
||||
|
||||
|
||||
def get_slope_from_api(lat, lon):
|
||||
"""
|
||||
通过 OpenTopoData API 获取地形坡度(单位:度)。
|
||||
返回:坡度(0-25°),失败时返回 None(由调用者处理)
|
||||
"""
|
||||
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
|
||||
print("警告:经纬度必须是数值")
|
||||
return None
|
||||
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
|
||||
print(f"警告:经纬度超出范围 (lat={lat}, lon={lon})")
|
||||
return None
|
||||
|
||||
# 尝试多个数据集
|
||||
datasets = ["srtm30m", "etopo1"]
|
||||
step = 0.001 # 约100米
|
||||
points = [
|
||||
f"{lat:.6f},{lon:.6f}",
|
||||
f"{lat + step:.6f},{lon:.6f}",
|
||||
f"{lat - step:.6f},{lon:.6f}",
|
||||
f"{lat:.6f},{lon + step:.6f}",
|
||||
f"{lat:.6f},{lon - step:.6f}"
|
||||
]
|
||||
locations = "|".join(points)
|
||||
|
||||
for dataset in datasets:
|
||||
url = f"https://api.opentopodata.org/v1/{dataset}"
|
||||
params = {
|
||||
"locations": locations,
|
||||
"interpolation": "cubic"
|
||||
}
|
||||
print(f"发送请求:dataset={dataset}, locations={locations}")
|
||||
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if "results" not in data or len(data["results"]) != 5:
|
||||
print(f"警告:{dataset} 返回无效数据: {data.get('error', '无错误信息')}")
|
||||
continue
|
||||
|
||||
elevations = [result["elevation"] for result in data["results"]]
|
||||
if any(elev is None for elev in elevations):
|
||||
print(f"警告:{dataset} 高程数据包含空值")
|
||||
continue
|
||||
|
||||
distance = 100
|
||||
height_diffs = [
|
||||
abs(elevations[1] - elevations[0]),
|
||||
abs(elevations[2] - elevations[0]),
|
||||
abs(elevations[3] - elevations[0]),
|
||||
abs(elevations[4] - elevations[0])
|
||||
]
|
||||
|
||||
avg_height_diff = sum(height_diffs) / len(height_diffs)
|
||||
slope_rad = math.atan2(avg_height_diff, distance)
|
||||
slope_deg = math.degrees(slope_rad)
|
||||
slope_deg = min(max(slope_deg, 0), 25)
|
||||
|
||||
print(f"获取成功!坡度: {slope_deg:.2f}° (dataset={dataset})")
|
||||
return slope_deg
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
print(f"警告:{dataset} 请求失败 (HTTP {e.response.status_code}): {e.response.text}")
|
||||
continue
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"警告:{dataset} 请求失败: {e}")
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"警告:处理 {dataset} 数据出错: {e}")
|
||||
continue
|
||||
|
||||
print("警告:所有数据集均失败")
|
||||
return None
|
||||
|
||||
|
||||
def calculate_psh_average(lat, lon, start_year=2010, end_year=2023):
|
||||
"""
|
||||
从 NASA POWER API 获取峰值日照小时数(PSH)。
|
||||
返回:平均 PSH(小时/天),失败时返回默认值 4.0
|
||||
"""
|
||||
url = "https://power.larc.nasa.gov/api/temporal/monthly/point"
|
||||
params = {
|
||||
"parameters": "ALLSKY_SFC_SW_DWN",
|
||||
"community": "RE",
|
||||
"longitude": lon,
|
||||
"latitude": lat,
|
||||
"format": "JSON",
|
||||
"start": str(start_year),
|
||||
"end": str(end_year)
|
||||
}
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if "properties" not in data or "parameter" not in data["properties"]:
|
||||
return 4.0
|
||||
ghi_data = data["properties"]["parameter"].get("ALLSKY_SFC_SW_DWN", {})
|
||||
if not ghi_data:
|
||||
return 4.0
|
||||
ghi_data = {k: v for k, v in ghi_data.items() if not k.endswith("13")}
|
||||
if not ghi_data:
|
||||
return 4.0
|
||||
df = pd.DataFrame.from_dict(ghi_data, orient="index", columns=["GHI (kWh/m²/day)"])
|
||||
new_index = [f"{k[:4]}-{k[-2:]:0>2}" for k in df.index]
|
||||
df.index = new_index
|
||||
if df.empty:
|
||||
return 4.0
|
||||
df["PSH (hours/day)"] = df["GHI (kWh/m²/day)"]
|
||||
if df["PSH (hours/day)"].isna().any():
|
||||
return 4.0
|
||||
df['Year'] = df.index.str[:4]
|
||||
annual_avg = df.groupby('Year')['PSH (hours/day)'].mean()
|
||||
if annual_avg.empty:
|
||||
return 4.0
|
||||
psh = annual_avg.mean()
|
||||
if math.isnan(psh):
|
||||
return 4.0
|
||||
print(f"获取成功!平均PSH: {psh:.2f} 小时/天")
|
||||
return psh
|
||||
except requests.exceptions.RequestException:
|
||||
return 4.0
|
||||
except Exception:
|
||||
return 4.0
|
||||
|
||||
|
||||
def calculate_optimal_tilt(lat):
|
||||
"""根据纬度计算最佳倾角(单位:度)"""
|
||||
try:
|
||||
lat_abs = abs(lat)
|
||||
if lat_abs < 25:
|
||||
optimal_tilt = lat_abs * 0.87
|
||||
elif lat_abs <= 50:
|
||||
optimal_tilt = lat_abs * 0.76 + 3.1
|
||||
else:
|
||||
optimal_tilt = lat_abs * 0.5 + 16.3
|
||||
return optimal_tilt
|
||||
except ValueError as e:
|
||||
raise Exception(f"倾角计算错误: {e}")
|
||||
|
||||
|
||||
def pv_area(panel_capacity, slope_deg, shading_factor=0.1, land_compactness=1.0, terrain_complexity=1.0):
|
||||
"""计算单块光伏组件占地面积"""
|
||||
base_area = panel_capacity * 6
|
||||
slope_factor = 1 + (slope_deg / 50) if slope_deg <= 15 else 1.5
|
||||
shade_factor = 1 + shading_factor * 2
|
||||
compact_factor = 1 / land_compactness if land_compactness > 0 else 1.5
|
||||
terrain_factor = terrain_complexity
|
||||
return base_area * slope_factor * shade_factor * compact_factor * terrain_factor
|
||||
|
||||
|
||||
def calculate_pv_potential(available_area_sq_km, component_name, longitude, latitude, slope_deg=10,
|
||||
shading_factor=0.1, land_compactness=0.8, terrain_complexity=1.2,
|
||||
terrain_type="耕地", pv_type="centralized", land_availability=0.85,
|
||||
min_irradiance=800, max_slope=25, electricity_price=0.65, q=0.02,
|
||||
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4,
|
||||
E_S=1.0, K=0.8, project_lifetime=25, discount_rate=0.06):
|
||||
"""计算最小和最大组件数量的光伏系统潜力"""
|
||||
if available_area_sq_km <= 0:
|
||||
raise ValueError("可用面积必须大于0")
|
||||
if slope_deg < 0 or slope_deg > max_slope:
|
||||
raise ValueError(f"坡度必须在0-{max_slope}度之间")
|
||||
available_area_hectares = available_area_sq_km * 100
|
||||
valid_terrains = TERRAIN_COMPLEXITY_RANGES.get(pv_type, {})
|
||||
if terrain_type not in valid_terrains:
|
||||
raise ValueError(f"{pv_type} 光伏不支持 {terrain_type} 地形。可选地形:{list(valid_terrains.keys())}")
|
||||
terrain_adjustments = TERRAIN_ADJUSTMENTS.get(terrain_type, {"land_availability": 0.85, "K": 0.8})
|
||||
adjusted_land_availability = terrain_adjustments["land_availability"] / max(1.0, terrain_complexity)
|
||||
adjusted_K = terrain_adjustments["K"] / max(1.0, terrain_complexity)
|
||||
pv_info = get_pv_product_info(component_name)
|
||||
single_panel_capacity = pv_info["max_power"] / 1000
|
||||
pv_size = pv_info["pv_size"].split("×")
|
||||
panel_length = float(pv_size[0]) / 1000
|
||||
panel_width = float(pv_size[1]) / 1000
|
||||
panel_area_sqm = panel_length * panel_width
|
||||
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
|
||||
array_distance = calculate_array_distance(panel_width * 1.1, tilt, latitude)
|
||||
spacing_factor = PANEL_SPACING_FACTORS.get(pv_type, 1.2)
|
||||
adjusted_array_distance = array_distance * spacing_factor
|
||||
effective_area_hectares = available_area_hectares * adjusted_land_availability
|
||||
effective_area_sqm = effective_area_hectares * 10000
|
||||
area_per_mw = 10000 * (1 + slope_deg / 50 if slope_deg <= 15 else 1.5) * (
|
||||
1 + shading_factor * 2) * terrain_complexity * spacing_factor
|
||||
capacity_density_limit = CAPACITY_LIMITS.get(pv_type, 5.0) / 1000
|
||||
max_capacity_by_density = effective_area_sqm * capacity_density_limit
|
||||
row_spacing = panel_length * math.sin(math.radians(tilt)) + adjusted_array_distance
|
||||
effective_panel_area = panel_area_sqm * (row_spacing / panel_length) * 1.2
|
||||
min_area_per_panel = effective_panel_area * 0.8
|
||||
max_area_per_panel = effective_panel_area * 1.5
|
||||
max_panels = math.floor(effective_area_sqm / min_area_per_panel)
|
||||
min_panels = math.floor(effective_area_sqm / max_area_per_panel)
|
||||
max_capacity_raw = calculate_installed_capacity(pv_info["max_power"], max_panels)
|
||||
min_capacity_raw = calculate_installed_capacity(pv_info["max_power"], min_panels)
|
||||
max_capacity = min(max_capacity_raw, max_capacity_by_density)
|
||||
min_capacity = min(min_capacity_raw, max_capacity_by_density * 0.8)
|
||||
theoretical_max_capacity_mw = available_area_sq_km * CAPACITY_LIMITS.get(pv_type, 5.0)
|
||||
if max_capacity / 1000 > theoretical_max_capacity_mw:
|
||||
max_capacity = theoretical_max_capacity_mw * 1000
|
||||
max_panels = math.floor(max_capacity * 1000 / pv_info["max_power"])
|
||||
if min_capacity / 1000 > theoretical_max_capacity_mw * 0.8:
|
||||
min_capacity = theoretical_max_capacity_mw * 1000 * 0.8
|
||||
min_panels = math.floor(min_capacity * 1000 / pv_info["max_power"])
|
||||
min_metrics = calculate_pv_metrics(
|
||||
component_name=component_name, electricity_price=electricity_price, pv_number=min_panels,
|
||||
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
|
||||
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
|
||||
override_capacity=min_capacity
|
||||
)
|
||||
min_lcoe = calculate_lcoe(
|
||||
capacity=min_metrics["capacity"], annual_energy=min_metrics["annual_energy"],
|
||||
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
|
||||
discount_rate=discount_rate
|
||||
)
|
||||
max_metrics = calculate_pv_metrics(
|
||||
component_name=component_name, electricity_price=electricity_price, pv_number=max_panels,
|
||||
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
|
||||
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
|
||||
override_capacity=max_capacity
|
||||
)
|
||||
max_lcoe = calculate_lcoe(
|
||||
capacity=max_metrics["capacity"], annual_energy=max_metrics["annual_energy"],
|
||||
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
|
||||
discount_rate=discount_rate
|
||||
)
|
||||
if min_panels == max_panels:
|
||||
print(f"警告:最小和最大组件数量相同 ({min_panels}),请检查地形复杂性或面积是否过小")
|
||||
if min_panels == 0 or max_panels == 0:
|
||||
print(f"警告:组件数量为0,请检查输入参数")
|
||||
return {
|
||||
"min_case": {
|
||||
**min_metrics, "lcoe": min_lcoe, "actual_panels": min_panels,
|
||||
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
|
||||
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": max_area_per_panel,
|
||||
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
|
||||
},
|
||||
"max_case": {
|
||||
**max_metrics, "lcoe": max_lcoe, "actual_panels": max_panels,
|
||||
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
|
||||
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": min_area_per_panel,
|
||||
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def calculate_lcoe(capacity, annual_energy, cost_per_kw, q, project_lifetime=25, discount_rate=0.06):
|
||||
"""计算平准化度电成本(LCOE)"""
|
||||
total_investment = capacity * cost_per_kw * 1000
|
||||
annual_om_cost = total_investment * q
|
||||
discount_factors = [(1 + discount_rate) ** -t for t in range(1, project_lifetime + 1)]
|
||||
discounted_energy = sum(annual_energy * discount_factors[t] for t in range(project_lifetime))
|
||||
discounted_costs = total_investment + sum(annual_om_cost * discount_factors[t] for t in range(project_lifetime))
|
||||
if discounted_energy == 0:
|
||||
return float('inf')
|
||||
return discounted_costs / discounted_energy
|
||||
|
||||
|
||||
def get_pv_product_info(component_name, excel_path=PV_EXCEL_PATH):
|
||||
"""从Excel获取光伏组件信息"""
|
||||
INDIAN_SITES = {
|
||||
"Gujarat": {"latitude": 22.2587, "longitude": 71.1924},
|
||||
"Rajasthan": {"latitude": 27.0238, "longitude": 74.2179},
|
||||
"Tamil Nadu": {"latitude": 11.1271, "longitude": 78.6569}
|
||||
}
|
||||
try:
|
||||
df = pd.read_excel(excel_path)
|
||||
if len(df.columns) < 10:
|
||||
raise ValueError("Excel文件需包含至少10列:组件名称、尺寸、功率等")
|
||||
row = df[df.iloc[:, 1] == component_name]
|
||||
if row.empty:
|
||||
raise ValueError(f"未找到组件:{component_name}")
|
||||
return {
|
||||
"component_name": component_name,
|
||||
"max_power": row.iloc[0, 5],
|
||||
"efficiency": row.iloc[0, 9],
|
||||
"pv_size": row.iloc[0, 3]
|
||||
}
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"未找到Excel文件:{excel_path}")
|
||||
except Exception as e:
|
||||
raise Exception(f"读取Excel出错:{e}")
|
||||
|
||||
|
||||
def get_tilt_and_azimuth(is_fixed=True, optimize=True, longitude=116, latitude=None, peak_load_hour=16):
|
||||
"""计算光伏系统的倾角和方位角"""
|
||||
if optimize and latitude is None:
|
||||
raise ValueError("优化模式下需提供纬度")
|
||||
if is_fixed:
|
||||
if optimize:
|
||||
tilt = calculate_optimal_tilt(latitude)
|
||||
azimuth = (peak_load_hour - 12) * 15 + (longitude - 116)
|
||||
azimuth = azimuth % 360 if azimuth >= 0 else azimuth + 360
|
||||
else:
|
||||
print("倾角:0°(水平)-90°(垂直) | 方位角:0°(正北)-180°(正南),顺时针")
|
||||
tilt = float(input("请输入倾角(度):"))
|
||||
azimuth = float(input("请输入方位角(度):"))
|
||||
if not (0 <= tilt <= 90) or not (0 <= azimuth <= 360):
|
||||
raise ValueError("倾角需在0-90°,方位角需在0-360°")
|
||||
else:
|
||||
azimuth = 180
|
||||
if optimize:
|
||||
tilt = calculate_optimal_tilt(latitude)
|
||||
else:
|
||||
print("倾角:0°(水平)-90°(垂直)")
|
||||
tilt = float(input("请输入倾角(度):"))
|
||||
if not (0 <= tilt <= 90):
|
||||
raise ValueError("倾角需在0-90°")
|
||||
return tilt, azimuth
|
||||
|
||||
|
||||
def calculate_array_distance(L, tilt, latitude):
|
||||
"""计算阵列间距"""
|
||||
beta_rad = math.radians(tilt)
|
||||
phi_rad = math.radians(latitude)
|
||||
return (L * math.cos(beta_rad) +
|
||||
L * math.sin(beta_rad) * 0.707 * math.tan(phi_rad) +
|
||||
0.4338 * math.tan(phi_rad))
|
||||
|
||||
|
||||
def calculate_equivalent_hours(P, P_r):
|
||||
"""计算等效小时数"""
|
||||
if P_r == 0:
|
||||
raise ValueError("额定功率不能为 0")
|
||||
return P / P_r
|
||||
|
||||
|
||||
def calculate_installed_capacity(max_power, num_components):
|
||||
"""计算装机容量"""
|
||||
if max_power < 0 or num_components < 0 or not isinstance(num_components, int):
|
||||
raise ValueError("功率和数量需为非负数,数量需为整数")
|
||||
return (max_power * num_components) / 1000
|
||||
|
||||
|
||||
def calculate_annual_energy(peak_hours, capacity, E_S=1.0, K=0.8):
|
||||
"""计算年发电量"""
|
||||
if any(x < 0 for x in [peak_hours, capacity]) or E_S <= 0 or not 0 <= K <= 1:
|
||||
raise ValueError("输入参数需满足:辐射量、容量≥0,E_S>0,K∈[0,1]")
|
||||
return peak_hours * 365 * (capacity / E_S) * K
|
||||
|
||||
|
||||
def calculate_environmental_benefits(E_p_million_kwh):
|
||||
"""计算环境收益"""
|
||||
if E_p_million_kwh < 0:
|
||||
raise ValueError("年发电量需≥0")
|
||||
return {
|
||||
"coal_reduction": E_p_million_kwh * 0.404 * 10,
|
||||
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
|
||||
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
|
||||
"NOX_reduction": E_p_million_kwh * 0.015 * 10
|
||||
}
|
||||
|
||||
|
||||
def calculate_reference_yield(E_p, electricity_price, IC, q, n=25):
|
||||
"""计算净现值(NPV)和内部收益率(IRR)"""
|
||||
if E_p < 0 or electricity_price < 0 or IC <= 0 or not 0 <= q <= 1:
|
||||
raise ValueError("发电量、电价≥0,投资成本>0,回收比例∈[0,1]")
|
||||
|
||||
def npv_equation(irr, p, w, ic, q_val, n=n):
|
||||
term1 = (1 + irr) ** (-1)
|
||||
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
|
||||
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||||
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
|
||||
return pv_revenue - ic + pv_salvage
|
||||
|
||||
irr_guess = 0.1
|
||||
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
|
||||
if not 0 <= irr <= 1:
|
||||
raise ValueError(f"IRR计算结果{irr:.4f}不合理,请检查输入")
|
||||
npv = npv_equation(irr, E_p, electricity_price, IC, q)
|
||||
return {"NPV": npv, "IRR": irr * 100}
|
||||
|
||||
|
||||
def calculate_pv_metrics(component_name, electricity_price, pv_number, q, longitude, latitude,
|
||||
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4, E_S=1.0, K=0.8,
|
||||
override_capacity=None):
|
||||
"""计算光伏项目的各项指标"""
|
||||
try:
|
||||
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
|
||||
pv_info = get_pv_product_info(component_name)
|
||||
width_mm = float(pv_info["pv_size"].split("×")[1])
|
||||
L = (width_mm / 1000) * 1.1
|
||||
array_distance = calculate_array_distance(L, tilt, latitude)
|
||||
max_power = pv_info["max_power"]
|
||||
if override_capacity is not None:
|
||||
capacity = override_capacity
|
||||
else:
|
||||
capacity = calculate_installed_capacity(max_power, pv_number)
|
||||
peak_hours = calculate_psh_average(latitude, longitude)
|
||||
single_daily_energy = peak_hours * (capacity / pv_number) * K if pv_number > 0 else 0
|
||||
E_p = calculate_annual_energy(peak_hours, capacity, E_S, K)
|
||||
h = calculate_equivalent_hours(E_p, capacity) if capacity > 0 else 0
|
||||
E_p_million_kwh = E_p / 1000000
|
||||
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
|
||||
IC = capacity * cost_per_kw * 1000
|
||||
ref_yield = calculate_reference_yield(E_p, electricity_price, IC, q)
|
||||
return {
|
||||
"longitude": longitude,
|
||||
"latitude": latitude,
|
||||
"component_name": component_name,
|
||||
"tilt": tilt,
|
||||
"azimuth": azimuth,
|
||||
"array_distance": array_distance,
|
||||
"max_power": max_power,
|
||||
"capacity": capacity,
|
||||
"peak_sunshine_hours": peak_hours,
|
||||
"single_daily_energy": single_daily_energy,
|
||||
"annual_energy": E_p,
|
||||
"equivalent_hours": h,
|
||||
"coal_reduction": env_benefits["coal_reduction"],
|
||||
"CO2_reduction": env_benefits["CO2_reduction"],
|
||||
"SO2_reduction": env_benefits["SO2_reduction"],
|
||||
"NOX_reduction": env_benefits["NOX_reduction"],
|
||||
"IRR": ref_yield["IRR"]
|
||||
}
|
||||
except Exception as e:
|
||||
raise Exception(f"计算过程中发生错误: {str(e)}")
|
||||
|
||||
|
||||
def print_result(min_case, max_case):
|
||||
"""优化输出格式,使用表格展示结果"""
|
||||
headers = ["指标", "最小组件数量", "最大组件数量"]
|
||||
table_data = [
|
||||
["经度", f"{min_case['longitude']:.2f}", f"{max_case['longitude']:.2f}"],
|
||||
["纬度", f"{min_case['latitude']:.2f}", f"{max_case['latitude']:.2f}"],
|
||||
["光伏类型", min_case["pv_type"], max_case["pv_type"]],
|
||||
["地形类型", min_case["terrain_type"], max_case["terrain_type"]],
|
||||
["组件型号", min_case["component_name"], max_case["component_name"]],
|
||||
["识别面积 (平方千米)", f"{min_case['available_area_sq_km']:.2f}", f"{max_case['available_area_sq_km']:.2f}"],
|
||||
["识别面积 (公顷)", f"{min_case['available_area_hectares']:.2f}", f"{max_case['available_area_hectares']:.2f}"],
|
||||
["有效面积 (公顷)", f"{min_case['effective_area_hectares']:.2f}", f"{max_case['effective_area_hectares']:.2f}"],
|
||||
["理论最大容量 (MW)", f"{min_case['theoretical_max_capacity_mw']:.2f}",
|
||||
f"{max_case['theoretical_max_capacity_mw']:.2f}"],
|
||||
["单块组件占地 (m²)", f"{min_case['panel_area_sqm']:.2f}", f"{max_case['panel_area_sqm']:.2f}"],
|
||||
["组件数量", f"{min_case['actual_panels']:,}", f"{max_case['actual_panels']:,}"],
|
||||
["倾角 (度)", f"{min_case['tilt']:.2f}", f"{max_case['tilt']:.2f}"],
|
||||
["方位角 (度)", f"{min_case['azimuth']:.2f}", f"{max_case['azimuth']:.2f}"],
|
||||
["阵列间距 (m)", f"{min_case['array_distance']:.2f}", f"{max_case['array_distance']:.2f}"],
|
||||
["单块功率 (Wp)", f"{min_case['max_power']}", f"{max_case['max_power']}"],
|
||||
["装机容量 (MW)", f"{min_case['capacity'] / 1000:.2f}", f"{max_case['capacity'] / 1000:.2f}"],
|
||||
["峰值日照 (小时/天)", f"{min_case['peak_sunshine_hours']:.2f}", f"{max_case['peak_sunshine_hours']:.2f}"],
|
||||
["年发电量 (kWh)", f"{min_case['annual_energy']:,.0f}", f"{max_case['annual_energy']:,.0f}"],
|
||||
["等效小时数", f"{min_case['equivalent_hours']:.2f}", f"{max_case['equivalent_hours']:.2f}"],
|
||||
["LCOE (元/kWh)", f"{min_case['lcoe']:.4f}", f"{max_case['lcoe']:.4f}"],
|
||||
["标准煤减排 (kg)", f"{min_case['coal_reduction']:,.0f}", f"{max_case['coal_reduction']:,.0f}"],
|
||||
["CO₂减排 (kg)", f"{min_case['CO2_reduction']:,.0f}", f"{max_case['CO2_reduction']:,.0f}"],
|
||||
["SO₂减排 (kg)", f"{min_case['SO2_reduction']:,.0f}", f"{max_case['SO2_reduction']:,.0f}"],
|
||||
["NOx减排 (kg)", f"{min_case['NOX_reduction']:,.0f}", f"{max_case['NOX_reduction']:,.0f}"],
|
||||
["IRR (%)", f"{min_case['IRR']:.2f}", f"{max_case['IRR']:.2f}"]
|
||||
]
|
||||
print("\n===== 光伏系统潜力评估结果 =====")
|
||||
print(tabulate(table_data, headers=headers, tablefmt="grid"))
|
||||
|
||||
|
||||
# 主程序
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
try:
|
||||
# 加载配置文件
|
||||
try:
|
||||
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
|
||||
config = json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"错误:未找到配置文件 {CONFIG_PATH},使用默认值")
|
||||
config = {
|
||||
"default_terrain_complexity": {
|
||||
"耕地": 1.1, "裸地": 1.1, "草地": 1.2, "灌木": 1.4,
|
||||
"湿地": 1.65, "林地": 1.65, "建筑": 1.35, "水域": 1.35
|
||||
}
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
print(f"错误:配置文件 {CONFIG_PATH} 格式错误,使用默认值")
|
||||
config = {
|
||||
"default_terrain_complexity": {
|
||||
"耕地": 1.1, "裸地": 1.1, "草地": 1.2, "灌木": 1.4,
|
||||
"湿地": 1.65, "林地": 1.65, "建筑": 1.35, "水域": 1.35
|
||||
}
|
||||
}
|
||||
|
||||
print("\n======= 光伏系统潜力评估 =======")
|
||||
print("请输入以下必要参数:")
|
||||
latitude = float(input("请输入纬度(-90到90,例如39.9):"))
|
||||
if not -90 <= latitude <= 90:
|
||||
raise ValueError("纬度必须在-90到90之间")
|
||||
longitude = float(input("请输入经度(-180到180,例如116.4):"))
|
||||
if not -180 <= longitude <= 180:
|
||||
raise ValueError("经度必须在-180到180之间")
|
||||
available_area_sq_km = float(input("请输入识别面积(平方千米,例如10):"))
|
||||
if available_area_sq_km <= 0:
|
||||
raise ValueError("识别面积必须大于0")
|
||||
pv_type = input("请输入光伏类型(distributed, centralized, floating):").lower()
|
||||
if pv_type not in ["distributed", "centralized", "floating"]:
|
||||
raise ValueError("光伏类型必须是 distributed, centralized 或 floating")
|
||||
valid_terrains = list(TERRAIN_COMPLEXITY_RANGES.get(pv_type, {}).keys())
|
||||
print(f"支持的地形类型:{valid_terrains}")
|
||||
terrain_type = input("请输入地形类型:")
|
||||
if terrain_type not in valid_terrains:
|
||||
raise ValueError(f"不支持的地形类型:{terrain_type}")
|
||||
component_name = input("请输入光伏组件型号(需在Excel中存在,例如M10-72H):")
|
||||
electricity_price = float(input("请输入电价(元/kWh,例如0.65):"))
|
||||
if electricity_price < 0:
|
||||
raise ValueError("电价必须非负")
|
||||
|
||||
# 从配置文件获取地形复杂性因子
|
||||
terrain_complexity = config["default_terrain_complexity"].get(terrain_type, 1.2)
|
||||
|
||||
# 验证复杂性因子范围
|
||||
min_complexity, max_complexity = TERRAIN_COMPLEXITY_RANGES[pv_type][terrain_type]
|
||||
if not min_complexity <= terrain_complexity <= max_complexity:
|
||||
raise ValueError(f"地形复杂性因子 {terrain_complexity} 超出范围 [{min_complexity}, {max_complexity}]")
|
||||
|
||||
# 通过 API 获取坡度
|
||||
slope_deg = get_slope_from_api(latitude, longitude)
|
||||
|
||||
print(f"使用参数:坡度={slope_deg:.2f}°,地形复杂性因子={terrain_complexity}")
|
||||
|
||||
# 计算光伏潜力
|
||||
result = calculate_pv_potential(
|
||||
available_area_sq_km=available_area_sq_km,
|
||||
component_name=component_name,
|
||||
longitude=longitude,
|
||||
latitude=latitude,
|
||||
slope_deg=slope_deg,
|
||||
terrain_complexity=terrain_complexity,
|
||||
terrain_type=terrain_type,
|
||||
pv_type=pv_type,
|
||||
electricity_price=electricity_price
|
||||
)
|
||||
|
||||
# 输出结果
|
||||
print_result(result["min_case"], result["max_case"])
|
||||
|
||||
# 询问是否继续
|
||||
if input("\n是否继续评估?(y/n):").lower() != 'y':
|
||||
break
|
||||
|
||||
except ValueError as ve:
|
||||
print(f"输入错误:{ve}")
|
||||
except FileNotFoundError as fe:
|
||||
print(f"文件错误:{fe}")
|
||||
except Exception as e:
|
||||
print(f"发生错误:{e}")
|
||||
print("请重新输入参数或检查错误。\n")
|
|
@ -0,0 +1,19 @@
|
|||
目的:修改之前代码,其中输入经纬度进行倾角和峰值日照时数的查询(不通过城市)。
|
||||
|
||||
【1】文件夹:
|
||||
pv_product.xlsx
|
||||
为组件,相较之前没有改变
|
||||
倾角_峰值小时数.xlsx:
|
||||
为通过城市查的倾角和峰值时数,做了变动,增加了一些城市(和市级的json相匹配)一部分没添加数值
|
||||
中国_市.geojson:
|
||||
市一级别行政json图
|
||||
PV_total3.py:
|
||||
思路:通过经纬度查找市json图对应的所在市区,再通过”倾角_峰值小时数.xlsx“表格查找倾角和峰值日照时数
|
||||
主要修改:51-10行部分。
|
||||
|
||||
【2】文件夹:
|
||||
PV_total2.py:
|
||||
通过nasa的api接口获取10年-23年平均峰值日照时数
|
||||
倾角 = 纬度(弧度) × 0.86 + 24
|
||||
主要修改:47-97行部分。
|
||||
!注意!:需要挂梯子;计算的倾角在高纬度地区较之前较小,峰值日照时数较大。详细对照最后的结果
|
Loading…
Reference in New Issue