Compare commits

...

No commits in common. "master" and "adca4f89af1fd57239996a4af6697b36d975a16f" have entirely different histories.

31 changed files with 3183 additions and 1272 deletions

View File

@ -1,9 +0,0 @@
FROM python:3.10
WORKDIR /app
COPY . /app/
RUN pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple --no-cache-dir
RUN pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --no-cache-dir
CMD ["python3", "run.py"]

View File

@ -1,19 +0,0 @@
pv_product.xlsx
含有光伏组件设备信息,例如设备名称、组件尺寸、最大功率、组件效率等
倾角_峰值小时数.xlsx
照城市简称确定安装角度即倾角、峰值日照时数较之前做了变动增加了一些城市和市级的json相匹配一部分没添加数值
中国_市.geojson
市一级别行政json图
PV_total.py
通过城市找倾角和峰值日照时数对比与PV_total3.py的区别
PV_total2.py
通过nasa的api接口获取10年-23年平均峰值日照时数
倾角 = 纬度(弧度) × 0.86 + 24
主要修改47-97行部分。
!注意!:需要挂梯子;计算的倾角在高纬度地区较之前较小,峰值日照时数较大。详细对照最后的结果
PV_total3.py
思路通过经纬度查找市json图对应的所在市区再通过”倾角_峰值小时数.xlsx“表格查找倾角和峰值日照时数
主要修改51-10行部分。
【Photovoltaic Panel Evaluation System v2.zip】为韩耀朋做的最新一版。

View File

@ -0,0 +1,583 @@
import pandas as pd
import math
import numpy as np
import requests
from scipy.optimize import fsolve
from tabulate import tabulate
# 默认文件路径
PV_EXCEL_PATH = r"./pv_product.xlsx" # 请确保此文件存在或更改为正确路径
# 地形类型与复杂性因子范围
TERRAIN_COMPLEXITY_RANGES = {
"distributed": {
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
"灌木": (1.3, 1.5), "湿地": (1.5, 1.8), "林地": (1.5, 1.8), "建筑": (1.2, 1.5)
},
"centralized": {
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
"灌木": (1.3, 1.6), "湿地": (1.5, 1.8), "林地": (1.6, 2.0)
},
"floating": {"水域": (1.2, 1.5)}
}
# 地形类型与土地可用性、发电效率的映射
TERRAIN_ADJUSTMENTS = {
"耕地": {"land_availability": 0.85, "K": 0.8}, "裸地": {"land_availability": 0.85, "K": 0.8},
"草地": {"land_availability": 0.85, "K": 0.8}, "灌木": {"land_availability": 0.75, "K": 0.75},
"湿地": {"land_availability": 0.65, "K": 0.75}, "水域": {"land_availability": 0.85, "K": 0.8},
"林地": {"land_availability": 0.65, "K": 0.7}, "建筑": {"land_availability": 0.6, "K": 0.75}
}
# 光伏类型的装机容量上限 (MW/平方千米)
CAPACITY_LIMITS = {
"distributed": 25.0, "centralized": 50.0, "floating": 25.0
}
# 实际面板间距系数
PANEL_SPACING_FACTORS = {
"distributed": 1.5, "centralized": 1.2, "floating": 1.3
}
def calculate_psh_average(lat, lon, start_year=2010, end_year=2023):
"""
NASA POWER API 获取峰值日照小时数PSH
返回平均 PSH小时/失败时返回默认值 4.0
"""
print("DEBUG: Starting calculate_psh_average (version 2025-04-28)")
url = "https://power.larc.nasa.gov/api/temporal/monthly/point"
params = {
"parameters": "ALLSKY_SFC_SW_DWN",
"community": "RE",
"longitude": lon,
"latitude": lat,
"format": "JSON",
"start": str(start_year),
"end": str(end_year)
}
try:
print(f"DEBUG: Requesting NASA POWER API for lat={lat}, lon={lon}")
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
print("DEBUG: API response received")
data = response.json()
print("DEBUG: Validating API data")
if "properties" not in data or "parameter" not in data["properties"]:
print("ERROR: NASA POWER API returned invalid data format")
return 4.0
ghi_data = data["properties"]["parameter"].get("ALLSKY_SFC_SW_DWN", {})
if not ghi_data:
print("ERROR: No GHI data found in API response")
return 4.0
print("DEBUG: Filtering GHI data")
print(f"DEBUG: Raw GHI data keys: {list(ghi_data.keys())}")
ghi_data = {k: v for k, v in ghi_data.items() if not k.endswith("13")}
if not ghi_data:
print("ERROR: No valid GHI data after filtering")
return 4.0
print(f"DEBUG: Filtered GHI data keys: {list(ghi_data.keys())}")
print("DEBUG: Creating DataFrame")
df = pd.DataFrame.from_dict(ghi_data, orient="index", columns=["GHI (kWh/m²/day)"])
print(f"DEBUG: Original DataFrame index: {list(df.index)}")
print("DEBUG: Reformatting DataFrame index")
new_index = []
for k in df.index:
try:
# 验证索引格式
if not (isinstance(k, str) and len(k) == 6 and k.isdigit()):
print(f"ERROR: Invalid index format for {k}")
return 4.0
year = k[:4]
month = k[-2:]
formatted_index = f"{year}-{month:0>2}" # 使用 :0>2 确保两位数字
print(f"DEBUG: Formatting index {k} -> {formatted_index}")
new_index.append(formatted_index)
except Exception as e:
print(f"ERROR: Failed to format index {k}: {e}")
return 4.0
df.index = new_index
print(f"DEBUG: Reformatted DataFrame index: {list(df.index)}")
if df.empty:
print("ERROR: DataFrame is empty")
return 4.0
print("DEBUG: Calculating PSH")
df["PSH (hours/day)"] = df["GHI (kWh/m²/day)"]
if df["PSH (hours/day)"].isna().any():
print("ERROR: PSH data contains invalid values")
return 4.0
print("DEBUG: Grouping by year")
df['Year'] = df.index.str[:4]
print(f"DEBUG: Year column: {list(df['Year'])}")
annual_avg = df.groupby('Year')['PSH (hours/day)'].mean()
print(f"DEBUG: Annual averages: {annual_avg.to_dict()}")
if annual_avg.empty:
print("ERROR: Annual average PSH data is empty")
return 4.0
print("DEBUG: Calculating final PSH")
psh = annual_avg.mean()
if math.isnan(psh):
print("ERROR: PSH calculation resulted in NaN")
return 4.0
print(f"DEBUG: PSH calculated successfully, value={psh:.2f}")
print(f"获取成功平均PSH: {psh:.2f} 小时/天")
return psh
except requests.exceptions.RequestException as e:
print(f"ERROR: NASA POWER API request failed: {e}")
return 4.0
except Exception as e:
print(f"ERROR: Error processing API data: {e}")
return 4.0
def calculate_optimal_tilt(lat):
"""根据纬度计算最佳倾角(单位:度)"""
try:
lat_abs = abs(lat)
if lat_abs < 25:
optimal_tilt = lat_abs * 0.87
elif lat_abs <= 50:
optimal_tilt = lat_abs * 0.76 + 3.1
else:
optimal_tilt = lat_abs * 0.5 + 16.3
return optimal_tilt
except ValueError as e:
raise Exception(f"倾角计算错误: {e}")
def pv_area(panel_capacity, slope_deg, shading_factor=0.1, land_compactness=1.0, terrain_complexity=1.0):
"""计算单块光伏组件占地面积"""
base_area = panel_capacity * 6
slope_factor = 1 + (slope_deg / 50) if slope_deg <= 15 else 1.5
shade_factor = 1 + shading_factor * 2
compact_factor = 1 / land_compactness if land_compactness > 0 else 1.5
terrain_factor = terrain_complexity
return base_area * slope_factor * shade_factor * compact_factor * terrain_factor
def calculate_pv_potential(available_area_sq_km, component_name, longitude, latitude, slope_deg=10,
shading_factor=0.1, land_compactness=0.8, terrain_complexity=1.2,
terrain_type="耕地", pv_type="centralized", land_availability=0.85,
min_irradiance=800, max_slope=25, electricity_price=0.65, q=0.02,
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4,
E_S=1.0, K=0.8, project_lifetime=25, discount_rate=0.06):
"""计算最小和最大组件数量的光伏系统潜力"""
# 输入验证
if available_area_sq_km <= 0:
raise ValueError("可用面积必须大于0")
if slope_deg < 0 or slope_deg > max_slope:
raise ValueError(f"坡度必须在0-{max_slope}度之间")
# 转换为公顷
available_area_hectares = available_area_sq_km * 100
# 验证光伏类型与地形类型
valid_terrains = TERRAIN_COMPLEXITY_RANGES.get(pv_type, {})
if terrain_type not in valid_terrains:
raise ValueError(f"{pv_type} 光伏不支持 {terrain_type} 地形。可选地形:{list(valid_terrains.keys())}")
# 获取地形调整参数
terrain_adjustments = TERRAIN_ADJUSTMENTS.get(terrain_type, {"land_availability": 0.85, "K": 0.8})
adjusted_land_availability = terrain_adjustments["land_availability"] / max(1.0, terrain_complexity)
adjusted_K = terrain_adjustments["K"] / max(1.0, terrain_complexity)
# 获取组件信息
pv_info = get_pv_product_info(component_name)
single_panel_capacity = pv_info["max_power"] / 1000 # kWp
pv_size = pv_info["pv_size"].split("×")
panel_length = float(pv_size[0]) / 1000 # 米
panel_width = float(pv_size[1]) / 1000 # 米
panel_area_sqm = panel_length * panel_width
# 获取阵列间距
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
array_distance = calculate_array_distance(panel_width * 1.1, tilt, latitude)
spacing_factor = PANEL_SPACING_FACTORS.get(pv_type, 1.2)
adjusted_array_distance = array_distance * spacing_factor
# 计算有效面积
effective_area_hectares = available_area_hectares * adjusted_land_availability
effective_area_sqm = effective_area_hectares * 10000
# 计算每MW占地面积
area_per_mw = 10000 * (1 + slope_deg / 50 if slope_deg <= 15 else 1.5) * (
1 + shading_factor * 2) * terrain_complexity * spacing_factor
# 容量密度限制 (kW/m²)
capacity_density_limit = CAPACITY_LIMITS.get(pv_type, 5.0) / 1000
max_capacity_by_density = effective_area_sqm * capacity_density_limit
# 计算单块组件占地面积
row_spacing = panel_length * math.sin(math.radians(tilt)) + adjusted_array_distance
effective_panel_area = panel_area_sqm * (row_spacing / panel_length) * 1.2
# 调整 min/max 布局以确保差异
min_area_per_panel = effective_panel_area * 0.8 # 密集布局
max_area_per_panel = effective_panel_area * 1.5 # 稀疏布局
max_panels = math.floor(effective_area_sqm / min_area_per_panel)
min_panels = math.floor(effective_area_sqm / max_area_per_panel)
# 计算装机容量
max_capacity_raw = calculate_installed_capacity(pv_info["max_power"], max_panels)
min_capacity_raw = calculate_installed_capacity(pv_info["max_power"], min_panels)
# 应用容量密度限制
max_capacity = min(max_capacity_raw, max_capacity_by_density)
min_capacity = min(min_capacity_raw, max_capacity_by_density * 0.8) # 稀疏布局取80%
# 检查理论上限
theoretical_max_capacity_mw = available_area_sq_km * CAPACITY_LIMITS.get(pv_type, 5.0)
if max_capacity / 1000 > theoretical_max_capacity_mw:
max_capacity = theoretical_max_capacity_mw * 1000
max_panels = math.floor(max_capacity * 1000 / pv_info["max_power"])
if min_capacity / 1000 > theoretical_max_capacity_mw * 0.8:
min_capacity = theoretical_max_capacity_mw * 1000 * 0.8
min_panels = math.floor(min_capacity * 1000 / pv_info["max_power"])
# 计算指标
min_metrics = calculate_pv_metrics(
component_name=component_name, electricity_price=electricity_price, pv_number=min_panels,
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
override_capacity=min_capacity
)
min_lcoe = calculate_lcoe(
capacity=min_metrics["capacity"], annual_energy=min_metrics["annual_energy"],
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
discount_rate=discount_rate
)
max_metrics = calculate_pv_metrics(
component_name=component_name, electricity_price=electricity_price, pv_number=max_panels,
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
override_capacity=max_capacity
)
max_lcoe = calculate_lcoe(
capacity=max_metrics["capacity"], annual_energy=max_metrics["annual_energy"],
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
discount_rate=discount_rate
)
# 警告
if min_panels == max_panels:
print(f"警告:最小和最大组件数量相同 ({min_panels}),请检查地形复杂性或面积是否过小")
if min_panels == 0 or max_panels == 0:
print(f"警告组件数量为0请检查输入参数")
# 返回结果
return {
"min_case": {
**min_metrics, "lcoe": min_lcoe, "actual_panels": min_panels,
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": max_area_per_panel,
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
},
"max_case": {
**max_metrics, "lcoe": max_lcoe, "actual_panels": max_panels,
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": min_area_per_panel,
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
}
}
def calculate_lcoe(capacity, annual_energy, cost_per_kw, q, project_lifetime=25, discount_rate=0.06):
"""计算平准化度电成本(LCOE)"""
total_investment = capacity * cost_per_kw * 1000
annual_om_cost = total_investment * q
discount_factors = [(1 + discount_rate) ** -t for t in range(1, project_lifetime + 1)]
discounted_energy = sum(annual_energy * discount_factors[t] for t in range(project_lifetime))
discounted_costs = total_investment + sum(annual_om_cost * discount_factors[t] for t in range(project_lifetime))
if discounted_energy == 0:
return float('inf')
return discounted_costs / discounted_energy
def get_pv_product_info(component_name, excel_path=PV_EXCEL_PATH):
"""从Excel获取光伏组件信息"""
try:
df = pd.read_excel(excel_path)
if len(df.columns) < 10:
raise ValueError("Excel文件需包含至少10列组件名称、尺寸、功率等")
row = df[df.iloc[:, 1] == component_name]
if row.empty:
raise ValueError(f"未找到组件:{component_name}")
return {
"component_name": component_name,
"max_power": row.iloc[0, 5],
"efficiency": row.iloc[0, 9],
"pv_size": row.iloc[0, 3]
}
except FileNotFoundError:
raise FileNotFoundError(f"未找到Excel文件{excel_path}")
except Exception as e:
raise Exception(f"读取Excel出错{e}")
def get_tilt_and_azimuth(is_fixed=True, optimize=True, longitude=116, latitude=None, peak_load_hour=16):
"""计算光伏系统的倾角和方位角"""
if optimize and latitude is None:
raise ValueError("优化模式下需提供纬度")
if is_fixed:
if optimize:
tilt = calculate_optimal_tilt(latitude)
azimuth = (peak_load_hour - 12) * 15 + (longitude - 116)
azimuth = azimuth % 360 if azimuth >= 0 else azimuth + 360
else:
print("倾角0°(水平)-90°(垂直) | 方位角0°(正北)-180°(正南),顺时针")
tilt = float(input("请输入倾角(度)"))
azimuth = float(input("请输入方位角(度)"))
if not (0 <= tilt <= 90) or not (0 <= azimuth <= 360):
raise ValueError("倾角需在0-90°方位角需在0-360°")
else:
azimuth = 180
if optimize:
tilt = calculate_optimal_tilt(latitude)
else:
print("倾角0°(水平)-90°(垂直)")
tilt = float(input("请输入倾角(度)"))
if not (0 <= tilt <= 90):
raise ValueError("倾角需在0-90°")
return tilt, azimuth
def calculate_array_distance(L, tilt, latitude):
"""计算阵列间距"""
beta_rad = math.radians(tilt)
phi_rad = math.radians(latitude)
return (L * math.cos(beta_rad) +
L * math.sin(beta_rad) * 0.707 * math.tan(phi_rad) +
0.4338 * math.tan(phi_rad))
def calculate_equivalent_hours(P, P_r):
"""计算等效小时数"""
if P_r == 0:
raise ValueError("额定功率不能为 0")
return P / P_r
def calculate_installed_capacity(max_power, num_components):
"""计算装机容量"""
if max_power < 0 or num_components < 0 or not isinstance(num_components, int):
raise ValueError("功率和数量需为非负数,数量需为整数")
return (max_power * num_components) / 1000 # 单位kW
def calculate_annual_energy(peak_hours, capacity, E_S=1.0, K=0.8):
"""计算年发电量"""
if any(x < 0 for x in [peak_hours, capacity]) or E_S <= 0 or not 0 <= K <= 1:
raise ValueError("输入参数需满足辐射量、容量≥0E_S>0K∈[0,1]")
return peak_hours * 365 * (capacity / E_S) * K # 单位kWh
def calculate_environmental_benefits(E_p_million_kwh):
"""计算环境收益"""
if E_p_million_kwh < 0:
raise ValueError("年发电量需≥0")
return {
"coal_reduction": E_p_million_kwh * 0.404 * 10,
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
"NOX_reduction": E_p_million_kwh * 0.015 * 10
}
def calculate_reference_yield(E_p, electricity_price, IC, q, n=25):
"""计算净现值(NPV)和内部收益率(IRR)"""
if E_p < 0 or electricity_price < 0 or IC <= 0 or not 0 <= q <= 1:
raise ValueError("发电量、电价≥0投资成本>0回收比例∈[0,1]")
def npv_equation(irr, p, w, ic, q_val, n=n):
term1 = (1 + irr) ** (-1)
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
return pv_revenue - ic + pv_salvage
irr_guess = 0.1
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
if not 0 <= irr <= 1:
raise ValueError(f"IRR计算结果{irr:.4f}不合理,请检查输入")
npv = npv_equation(irr, E_p, electricity_price, IC, q)
return {"NPV": npv, "IRR": irr * 100}
def calculate_pv_metrics(component_name, electricity_price, pv_number, q, longitude, latitude,
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4, E_S=1.0, K=0.8,
override_capacity=None):
"""计算光伏项目的各项指标"""
try:
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
pv_info = get_pv_product_info(component_name)
width_mm = float(pv_info["pv_size"].split("×")[1])
L = (width_mm / 1000) * 1.1
array_distance = calculate_array_distance(L, tilt, latitude)
max_power = pv_info["max_power"]
# 使用提供的容量或计算容量
if override_capacity is not None:
capacity = override_capacity
else:
capacity = calculate_installed_capacity(max_power, pv_number)
# 使用NASA API获取峰值日照小时数
peak_hours = calculate_psh_average(latitude, longitude)
single_daily_energy = peak_hours * (capacity / pv_number) * K if pv_number > 0 else 0
E_p = calculate_annual_energy(peak_hours, capacity, E_S, K)
h = calculate_equivalent_hours(E_p, capacity) if capacity > 0 else 0
E_p_million_kwh = E_p / 1000000
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
IC = capacity * cost_per_kw * 1000
ref_yield = calculate_reference_yield(E_p, electricity_price, IC, q)
return {
"longitude": longitude,
"latitude": latitude,
"component_name": component_name,
"tilt": tilt,
"azimuth": azimuth,
"array_distance": array_distance,
"max_power": max_power,
"capacity": capacity,
"peak_sunshine_hours": peak_hours,
"single_daily_energy": single_daily_energy,
"annual_energy": E_p,
"equivalent_hours": h,
"coal_reduction": env_benefits["coal_reduction"],
"CO2_reduction": env_benefits["CO2_reduction"],
"SO2_reduction": env_benefits["SO2_reduction"],
"NOX_reduction": env_benefits["NOX_reduction"],
"IRR": ref_yield["IRR"]
}
except Exception as e:
raise Exception(f"计算过程中发生错误: {str(e)}")
def print_result(min_case, max_case):
"""优化输出格式,使用表格展示结果"""
headers = ["指标", "最小组件数量", "最大组件数量"]
table_data = [
["经度", f"{min_case['longitude']:.2f}", f"{max_case['longitude']:.2f}"],
["纬度", f"{min_case['latitude']:.2f}", f"{max_case['latitude']:.2f}"],
["光伏类型", min_case["pv_type"], max_case["pv_type"]],
["地形类型", min_case["terrain_type"], max_case["terrain_type"]],
["组件型号", min_case["component_name"], max_case["component_name"]],
["识别面积 (平方千米)", f"{min_case['available_area_sq_km']:.2f}", f"{max_case['available_area_sq_km']:.2f}"],
["识别面积 (公顷)", f"{min_case['available_area_hectares']:.2f}", f"{max_case['available_area_hectares']:.2f}"],
["有效面积 (公顷)", f"{min_case['effective_area_hectares']:.2f}", f"{max_case['effective_area_hectares']:.2f}"],
["理论最大容量 (MW)", f"{min_case['theoretical_max_capacity_mw']:.2f}",
f"{max_case['theoretical_max_capacity_mw']:.2f}"],
["单块组件占地 (m²)", f"{min_case['panel_area_sqm']:.2f}", f"{max_case['panel_area_sqm']:.2f}"],
["组件数量", f"{min_case['actual_panels']:,}", f"{max_case['actual_panels']:,}"],
["倾角 (度)", f"{min_case['tilt']:.2f}", f"{max_case['tilt']:.2f}"],
["方位角 (度)", f"{min_case['azimuth']:.2f}", f"{max_case['azimuth']:.2f}"],
["阵列间距 (m)", f"{min_case['array_distance']:.2f}", f"{max_case['array_distance']:.2f}"],
["单块功率 (Wp)", f"{min_case['max_power']}", f"{max_case['max_power']}"],
["装机容量 (MW)", f"{min_case['capacity'] / 1000:.2f}", f"{max_case['capacity'] / 1000:.2f}"],
["峰值日照 (小时/天)", f"{min_case['peak_sunshine_hours']:.2f}", f"{max_case['peak_sunshine_hours']:.2f}"],
["年发电量 (kWh)", f"{min_case['annual_energy']:,.0f}", f"{max_case['annual_energy']:,.0f}"],
["等效小时数", f"{min_case['equivalent_hours']:.2f}", f"{max_case['equivalent_hours']:.2f}"],
["LCOE (元/kWh)", f"{min_case['lcoe']:.4f}", f"{max_case['lcoe']:.4f}"],
["标准煤减排 (kg)", f"{min_case['coal_reduction']:,.0f}", f"{max_case['coal_reduction']:,.0f}"],
["CO₂减排 (kg)", f"{min_case['CO2_reduction']:,.0f}", f"{max_case['CO2_reduction']:,.0f}"],
["SO₂减排 (kg)", f"{min_case['SO2_reduction']:,.0f}", f"{max_case['SO2_reduction']:,.0f}"],
["NOx减排 (kg)", f"{min_case['NOX_reduction']:,.0f}", f"{max_case['NOX_reduction']:,.0f}"],
["IRR (%)", f"{min_case['IRR']:.2f}", f"{max_case['IRR']:.2f}"]
]
print("\n===== 光伏系统潜力评估结果 =====")
print(tabulate(table_data, headers=headers, tablefmt="grid"))
# 主程序
if __name__ == "__main__":
while True:
try:
# 输入参数
print("\n======= 光伏系统潜力评估 =======")
print("请输入以下必要参数:")
# 输入经纬度
latitude = float(input("请输入纬度(-90到90例如39.9"))
if not -90 <= latitude <= 90:
raise ValueError("纬度必须在-90到90之间")
longitude = float(input("请输入经度(-180到180例如116.4"))
if not -180 <= longitude <= 180:
raise ValueError("经度必须在-180到180之间")
# 输入可用面积
available_area_sq_km = float(input("请输入识别面积平方千米例如10"))
if available_area_sq_km <= 0:
raise ValueError("识别面积必须大于0")
# 输入光伏类型
pv_type = input("请输入光伏类型distributed, centralized, floating").lower()
if pv_type not in ["distributed", "centralized", "floating"]:
raise ValueError("光伏类型必须是 distributed, centralized 或 floating")
# 输入地形类型
valid_terrains = list(TERRAIN_COMPLEXITY_RANGES.get(pv_type, {}).keys())
print(f"支持的地形类型:{valid_terrains}")
terrain_type = input("请输入地形类型:")
if terrain_type not in valid_terrains:
raise ValueError(f"不支持的地形类型:{terrain_type}")
# 输入组件型号
component_name = input("请输入光伏组件型号需在Excel中存在例如M10-72H")
# 输入其他参数
slope_deg = float(input("请输入地形坡度0-25例如10"))
if not 0 <= slope_deg <= 25:
raise ValueError("坡度必须在0-25度之间")
terrain_complexity = float(input("请输入地形复杂性因子参考范围1.0-2.0例如1.2"))
min_complexity, max_complexity = TERRAIN_COMPLEXITY_RANGES[pv_type][terrain_type]
if not min_complexity <= terrain_complexity <= max_complexity:
raise ValueError(f"地形复杂性因子必须在 {min_complexity}-{max_complexity} 之间")
electricity_price = float(input("请输入电价(元/kWh例如0.65"))
if electricity_price < 0:
raise ValueError("电价必须非负")
# 计算光伏潜力
result = calculate_pv_potential(
available_area_sq_km=available_area_sq_km,
component_name=component_name,
longitude=longitude,
latitude=latitude,
slope_deg=slope_deg,
terrain_complexity=terrain_complexity,
terrain_type=terrain_type,
pv_type=pv_type,
electricity_price=electricity_price
)
# 输出结果
print_result(result["min_case"], result["max_case"])
# 询问是否继续
if input("\n是否继续评估?(y/n)").lower() != 'y':
break
except ValueError as ve:
print(f"输入错误:{ve}")
except FileNotFoundError as fe:
print(f"文件错误:{fe}")
except Exception as e:
print(f"发生错误:{e}")
print("请重新输入参数或检查错误。\n")

View File

@ -0,0 +1,583 @@
import pandas as pd
import math
import numpy as np
import requests
from scipy.optimize import fsolve
from tabulate import tabulate
# 默认文件路径
PV_EXCEL_PATH = r"./pv_product.xlsx" # 请确保此文件存在或更改为正确路径
# 地形类型与复杂性因子范围
TERRAIN_COMPLEXITY_RANGES = {
"distributed": {
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
"灌木": (1.3, 1.5), "湿地": (1.5, 1.8), "林地": (1.5, 1.8), "建筑": (1.2, 1.5)
},
"centralized": {
"耕地": (1.0, 1.2), "裸地": (1.0, 1.2), "草地": (1.1, 1.3),
"灌木": (1.3, 1.6), "湿地": (1.5, 1.8), "林地": (1.6, 2.0)
},
"floating": {"水域": (1.2, 1.5)}
}
# 地形类型与土地可用性、发电效率的映射
TERRAIN_ADJUSTMENTS = {
"耕地": {"land_availability": 0.85, "K": 0.8}, "裸地": {"land_availability": 0.85, "K": 0.8},
"草地": {"land_availability": 0.85, "K": 0.8}, "灌木": {"land_availability": 0.75, "K": 0.75},
"湿地": {"land_availability": 0.65, "K": 0.75}, "水域": {"land_availability": 0.85, "K": 0.8},
"林地": {"land_availability": 0.65, "K": 0.7}, "建筑": {"land_availability": 0.6, "K": 0.75}
}
# 光伏类型的装机容量上限 (MW/平方千米)
CAPACITY_LIMITS = {
"distributed": 25.0, "centralized": 50.0, "floating": 25.0
}
# 实际面板间距系数
PANEL_SPACING_FACTORS = {
"distributed": 1.5, "centralized": 1.2, "floating": 1.3
}
def calculate_psh_average(lat, lon, start_year=2010, end_year=2023):
"""
NASA POWER API 获取峰值日照小时数PSH
返回平均 PSH小时/失败时返回默认值 4.0
"""
print("DEBUG: Starting calculate_psh_average (version 2025-04-28)")
url = "https://power.larc.nasa.gov/api/temporal/monthly/point"
params = {
"parameters": "ALLSKY_SFC_SW_DWN",
"community": "RE",
"longitude": lon,
"latitude": lat,
"format": "JSON",
"start": str(start_year),
"end": str(end_year)
}
try:
print(f"DEBUG: Requesting NASA POWER API for lat={lat}, lon={lon}")
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
print("DEBUG: API response received")
data = response.json()
print("DEBUG: Validating API data")
if "properties" not in data or "parameter" not in data["properties"]:
print("ERROR: NASA POWER API returned invalid data format")
return 4.0
ghi_data = data["properties"]["parameter"].get("ALLSKY_SFC_SW_DWN", {})
if not ghi_data:
print("ERROR: No GHI data found in API response")
return 4.0
print("DEBUG: Filtering GHI data")
print(f"DEBUG: Raw GHI data keys: {list(ghi_data.keys())}")
ghi_data = {k: v for k, v in ghi_data.items() if not k.endswith("13")}
if not ghi_data:
print("ERROR: No valid GHI data after filtering")
return 4.0
print(f"DEBUG: Filtered GHI data keys: {list(ghi_data.keys())}")
print("DEBUG: Creating DataFrame")
df = pd.DataFrame.from_dict(ghi_data, orient="index", columns=["GHI (kWh/m²/day)"])
print(f"DEBUG: Original DataFrame index: {list(df.index)}")
print("DEBUG: Reformatting DataFrame index")
new_index = []
for k in df.index:
try:
# 验证索引格式
if not (isinstance(k, str) and len(k) == 6 and k.isdigit()):
print(f"ERROR: Invalid index format for {k}")
return 4.0
year = k[:4]
month = k[-2:]
formatted_index = f"{year}-{month:0>2}" # 使用 :0>2 确保两位数字
print(f"DEBUG: Formatting index {k} -> {formatted_index}")
new_index.append(formatted_index)
except Exception as e:
print(f"ERROR: Failed to format index {k}: {e}")
return 4.0
df.index = new_index
print(f"DEBUG: Reformatted DataFrame index: {list(df.index)}")
if df.empty:
print("ERROR: DataFrame is empty")
return 4.0
print("DEBUG: Calculating PSH")
df["PSH (hours/day)"] = df["GHI (kWh/m²/day)"]
if df["PSH (hours/day)"].isna().any():
print("ERROR: PSH data contains invalid values")
return 4.0
print("DEBUG: Grouping by year")
df['Year'] = df.index.str[:4]
print(f"DEBUG: Year column: {list(df['Year'])}")
annual_avg = df.groupby('Year')['PSH (hours/day)'].mean()
print(f"DEBUG: Annual averages: {annual_avg.to_dict()}")
if annual_avg.empty:
print("ERROR: Annual average PSH data is empty")
return 4.0
print("DEBUG: Calculating final PSH")
psh = annual_avg.mean()
if math.isnan(psh):
print("ERROR: PSH calculation resulted in NaN")
return 4.0
print(f"DEBUG: PSH calculated successfully, value={psh:.2f}")
print(f"获取成功平均PSH: {psh:.2f} 小时/天")
return psh
except requests.exceptions.RequestException as e:
print(f"ERROR: NASA POWER API request failed: {e}")
return 4.0
except Exception as e:
print(f"ERROR: Error processing API data: {e}")
return 4.0
def calculate_optimal_tilt(lat):
"""根据纬度计算最佳倾角(单位:度)"""
try:
lat_abs = abs(lat)
if lat_abs < 25:
optimal_tilt = lat_abs * 0.87
elif lat_abs <= 50:
optimal_tilt = lat_abs * 0.76 + 3.1
else:
optimal_tilt = lat_abs * 0.5 + 16.3
return optimal_tilt
except ValueError as e:
raise Exception(f"倾角计算错误: {e}")
def pv_area(panel_capacity, slope_deg, shading_factor=0.1, land_compactness=1.0, terrain_complexity=1.0):
"""计算单块光伏组件占地面积"""
base_area = panel_capacity * 6
slope_factor = 1 + (slope_deg / 50) if slope_deg <= 15 else 1.5
shade_factor = 1 + shading_factor * 2
compact_factor = 1 / land_compactness if land_compactness > 0 else 1.5
terrain_factor = terrain_complexity
return base_area * slope_factor * shade_factor * compact_factor * terrain_factor
def calculate_pv_potential(available_area_sq_km, component_name, longitude, latitude, slope_deg=10,
shading_factor=0.1, land_compactness=0.8, terrain_complexity=1.2,
terrain_type="耕地", pv_type="centralized", land_availability=0.85,
min_irradiance=800, max_slope=25, electricity_price=0.65, q=0.02,
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4,
E_S=1.0, K=0.8, project_lifetime=25, discount_rate=0.06):
"""计算最小和最大组件数量的光伏系统潜力"""
# 输入验证
if available_area_sq_km <= 0:
raise ValueError("可用面积必须大于0")
if slope_deg < 0 or slope_deg > max_slope:
raise ValueError(f"坡度必须在0-{max_slope}度之间")
# 转换为公顷
available_area_hectares = available_area_sq_km * 100
# 验证光伏类型与地形类型
valid_terrains = TERRAIN_COMPLEXITY_RANGES.get(pv_type, {})
if terrain_type not in valid_terrains:
raise ValueError(f"{pv_type} 光伏不支持 {terrain_type} 地形。可选地形:{list(valid_terrains.keys())}")
# 获取地形调整参数
terrain_adjustments = TERRAIN_ADJUSTMENTS.get(terrain_type, {"land_availability": 0.85, "K": 0.8})
adjusted_land_availability = terrain_adjustments["land_availability"] / max(1.0, terrain_complexity)
adjusted_K = terrain_adjustments["K"] / max(1.0, terrain_complexity)
# 获取组件信息
pv_info = get_pv_product_info(component_name)
single_panel_capacity = pv_info["max_power"] / 1000 # kWp
pv_size = pv_info["pv_size"].split("×")
panel_length = float(pv_size[0]) / 1000 # 米
panel_width = float(pv_size[1]) / 1000 # 米
panel_area_sqm = panel_length * panel_width
# 获取阵列间距
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
array_distance = calculate_array_distance(panel_width * 1.1, tilt, latitude)
spacing_factor = PANEL_SPACING_FACTORS.get(pv_type, 1.2)
adjusted_array_distance = array_distance * spacing_factor
# 计算有效面积
effective_area_hectares = available_area_hectares * adjusted_land_availability
effective_area_sqm = effective_area_hectares * 10000
# 计算每MW占地面积
area_per_mw = 10000 * (1 + slope_deg / 50 if slope_deg <= 15 else 1.5) * (
1 + shading_factor * 2) * terrain_complexity * spacing_factor
# 容量密度限制 (kW/m²)
capacity_density_limit = CAPACITY_LIMITS.get(pv_type, 5.0) / 1000
max_capacity_by_density = effective_area_sqm * capacity_density_limit
# 计算单块组件占地面积
row_spacing = panel_length * math.sin(math.radians(tilt)) + adjusted_array_distance
effective_panel_area = panel_area_sqm * (row_spacing / panel_length) * 1.2
# 调整 min/max 布局以确保差异
min_area_per_panel = effective_panel_area * 0.8 # 密集布局
max_area_per_panel = effective_panel_area * 1.5 # 稀疏布局
max_panels = math.floor(effective_area_sqm / min_area_per_panel)
min_panels = math.floor(effective_area_sqm / max_area_per_panel)
# 计算装机容量
max_capacity_raw = calculate_installed_capacity(pv_info["max_power"], max_panels)
min_capacity_raw = calculate_installed_capacity(pv_info["max_power"], min_panels)
# 应用容量密度限制
max_capacity = min(max_capacity_raw, max_capacity_by_density)
min_capacity = min(min_capacity_raw, max_capacity_by_density * 0.8) # 稀疏布局取80%
# 检查理论上限
theoretical_max_capacity_mw = available_area_sq_km * CAPACITY_LIMITS.get(pv_type, 5.0)
if max_capacity / 1000 > theoretical_max_capacity_mw:
max_capacity = theoretical_max_capacity_mw * 1000
max_panels = math.floor(max_capacity * 1000 / pv_info["max_power"])
if min_capacity / 1000 > theoretical_max_capacity_mw * 0.8:
min_capacity = theoretical_max_capacity_mw * 1000 * 0.8
min_panels = math.floor(min_capacity * 1000 / pv_info["max_power"])
# 计算指标
min_metrics = calculate_pv_metrics(
component_name=component_name, electricity_price=electricity_price, pv_number=min_panels,
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
override_capacity=min_capacity
)
min_lcoe = calculate_lcoe(
capacity=min_metrics["capacity"], annual_energy=min_metrics["annual_energy"],
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
discount_rate=discount_rate
)
max_metrics = calculate_pv_metrics(
component_name=component_name, electricity_price=electricity_price, pv_number=max_panels,
q=q, longitude=longitude, latitude=latitude, is_fixed=is_fixed, optimize=optimize,
peak_load_hour=peak_load_hour, cost_per_kw=cost_per_kw * terrain_complexity, E_S=E_S, K=adjusted_K,
override_capacity=max_capacity
)
max_lcoe = calculate_lcoe(
capacity=max_metrics["capacity"], annual_energy=max_metrics["annual_energy"],
cost_per_kw=cost_per_kw * terrain_complexity, q=q, project_lifetime=project_lifetime,
discount_rate=discount_rate
)
# 警告
if min_panels == max_panels:
print(f"警告:最小和最大组件数量相同 ({min_panels}),请检查地形复杂性或面积是否过小")
if min_panels == 0 or max_panels == 0:
print(f"警告组件数量为0请检查输入参数")
# 返回结果
return {
"min_case": {
**min_metrics, "lcoe": min_lcoe, "actual_panels": min_panels,
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": max_area_per_panel,
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
},
"max_case": {
**max_metrics, "lcoe": max_lcoe, "actual_panels": max_panels,
"available_area_sq_km": available_area_sq_km, "available_area_hectares": available_area_hectares,
"effective_area_hectares": effective_area_hectares, "panel_area_sqm": min_area_per_panel,
"terrain_type": terrain_type, "pv_type": pv_type, "theoretical_max_capacity_mw": theoretical_max_capacity_mw
}
}
def calculate_lcoe(capacity, annual_energy, cost_per_kw, q, project_lifetime=25, discount_rate=0.06):
"""计算平准化度电成本(LCOE)"""
total_investment = capacity * cost_per_kw * 1000
annual_om_cost = total_investment * q
discount_factors = [(1 + discount_rate) ** -t for t in range(1, project_lifetime + 1)]
discounted_energy = sum(annual_energy * discount_factors[t] for t in range(project_lifetime))
discounted_costs = total_investment + sum(annual_om_cost * discount_factors[t] for t in range(project_lifetime))
if discounted_energy == 0:
return float('inf')
return discounted_costs / discounted_energy
def get_pv_product_info(component_name, excel_path=PV_EXCEL_PATH):
"""从Excel获取光伏组件信息"""
try:
df = pd.read_excel(excel_path)
if len(df.columns) < 10:
raise ValueError("Excel文件需包含至少10列组件名称、尺寸、功率等")
row = df[df.iloc[:, 1] == component_name]
if row.empty:
raise ValueError(f"未找到组件:{component_name}")
return {
"component_name": component_name,
"max_power": row.iloc[0, 5],
"efficiency": row.iloc[0, 9],
"pv_size": row.iloc[0, 3]
}
except FileNotFoundError:
raise FileNotFoundError(f"未找到Excel文件{excel_path}")
except Exception as e:
raise Exception(f"读取Excel出错{e}")
def get_tilt_and_azimuth(is_fixed=True, optimize=True, longitude=116, latitude=None, peak_load_hour=16):
"""计算光伏系统的倾角和方位角"""
if optimize and latitude is None:
raise ValueError("优化模式下需提供纬度")
if is_fixed:
if optimize:
tilt = calculate_optimal_tilt(latitude)
azimuth = (peak_load_hour - 12) * 15 + (longitude - 116)
azimuth = azimuth % 360 if azimuth >= 0 else azimuth + 360
else:
print("倾角0°(水平)-90°(垂直) | 方位角0°(正北)-180°(正南),顺时针")
tilt = float(input("请输入倾角(度)"))
azimuth = float(input("请输入方位角(度)"))
if not (0 <= tilt <= 90) or not (0 <= azimuth <= 360):
raise ValueError("倾角需在0-90°方位角需在0-360°")
else:
azimuth = 180
if optimize:
tilt = calculate_optimal_tilt(latitude)
else:
print("倾角0°(水平)-90°(垂直)")
tilt = float(input("请输入倾角(度)"))
if not (0 <= tilt <= 90):
raise ValueError("倾角需在0-90°")
return tilt, azimuth
def calculate_array_distance(L, tilt, latitude):
"""计算阵列间距"""
beta_rad = math.radians(tilt)
phi_rad = math.radians(latitude)
return (L * math.cos(beta_rad) +
L * math.sin(beta_rad) * 0.707 * math.tan(phi_rad) +
0.4338 * math.tan(phi_rad))
def calculate_equivalent_hours(P, P_r):
"""计算等效小时数"""
if P_r == 0:
raise ValueError("额定功率不能为 0")
return P / P_r
def calculate_installed_capacity(max_power, num_components):
"""计算装机容量"""
if max_power < 0 or num_components < 0 or not isinstance(num_components, int):
raise ValueError("功率和数量需为非负数,数量需为整数")
return (max_power * num_components) / 1000 # 单位kW
def calculate_annual_energy(peak_hours, capacity, E_S=1.0, K=0.8):
"""计算年发电量"""
if any(x < 0 for x in [peak_hours, capacity]) or E_S <= 0 or not 0 <= K <= 1:
raise ValueError("输入参数需满足辐射量、容量≥0E_S>0K∈[0,1]")
return peak_hours * 365 * (capacity / E_S) * K # 单位kWh
def calculate_environmental_benefits(E_p_million_kwh):
"""计算环境收益"""
if E_p_million_kwh < 0:
raise ValueError("年发电量需≥0")
return {
"coal_reduction": E_p_million_kwh * 0.404 * 10,
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
"NOX_reduction": E_p_million_kwh * 0.015 * 10
}
def calculate_reference_yield(E_p, electricity_price, IC, q, n=25):
"""计算净现值(NPV)和内部收益率(IRR)"""
if E_p < 0 or electricity_price < 0 or IC <= 0 or not 0 <= q <= 1:
raise ValueError("发电量、电价≥0投资成本>0回收比例∈[0,1]")
def npv_equation(irr, p, w, ic, q_val, n=n):
term1 = (1 + irr) ** (-1)
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
return pv_revenue - ic + pv_salvage
irr_guess = 0.1
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
if not 0 <= irr <= 1:
raise ValueError(f"IRR计算结果{irr:.4f}不合理,请检查输入")
npv = npv_equation(irr, E_p, electricity_price, IC, q)
return {"NPV": npv, "IRR": irr * 100}
def calculate_pv_metrics(component_name, electricity_price, pv_number, q, longitude, latitude,
is_fixed=True, optimize=True, peak_load_hour=16, cost_per_kw=3.4, E_S=1.0, K=0.8,
override_capacity=None):
"""计算光伏项目的各项指标"""
try:
tilt, azimuth = get_tilt_and_azimuth(is_fixed, optimize, longitude, latitude, peak_load_hour)
pv_info = get_pv_product_info(component_name)
width_mm = float(pv_info["pv_size"].split("×")[1])
L = (width_mm / 1000) * 1.1
array_distance = calculate_array_distance(L, tilt, latitude)
max_power = pv_info["max_power"]
# 使用提供的容量或计算容量
if override_capacity is not None:
capacity = override_capacity
else:
capacity = calculate_installed_capacity(max_power, pv_number)
# 使用NASA API获取峰值日照小时数
peak_hours = calculate_psh_average(latitude, longitude)
single_daily_energy = peak_hours * (capacity / pv_number) * K if pv_number > 0 else 0
E_p = calculate_annual_energy(peak_hours, capacity, E_S, K)
h = calculate_equivalent_hours(E_p, capacity) if capacity > 0 else 0
E_p_million_kwh = E_p / 1000000
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
IC = capacity * cost_per_kw * 1000
ref_yield = calculate_reference_yield(E_p, electricity_price, IC, q)
return {
"longitude": longitude,
"latitude": latitude,
"component_name": component_name,
"tilt": tilt,
"azimuth": azimuth,
"array_distance": array_distance,
"max_power": max_power,
"capacity": capacity,
"peak_sunshine_hours": peak_hours,
"single_daily_energy": single_daily_energy,
"annual_energy": E_p,
"equivalent_hours": h,
"coal_reduction": env_benefits["coal_reduction"],
"CO2_reduction": env_benefits["CO2_reduction"],
"SO2_reduction": env_benefits["SO2_reduction"],
"NOX_reduction": env_benefits["NOX_reduction"],
"IRR": ref_yield["IRR"]
}
except Exception as e:
raise Exception(f"计算过程中发生错误: {str(e)}")
def print_result(min_case, max_case):
"""优化输出格式,使用表格展示结果"""
headers = ["指标", "最小组件数量", "最大组件数量"]
table_data = [
["经度", f"{min_case['longitude']:.2f}", f"{max_case['longitude']:.2f}"],
["纬度", f"{min_case['latitude']:.2f}", f"{max_case['latitude']:.2f}"],
["光伏类型", min_case["pv_type"], max_case["pv_type"]],
["地形类型", min_case["terrain_type"], max_case["terrain_type"]],
["组件型号", min_case["component_name"], max_case["component_name"]],
["识别面积 (平方千米)", f"{min_case['available_area_sq_km']:.2f}", f"{max_case['available_area_sq_km']:.2f}"],
["识别面积 (公顷)", f"{min_case['available_area_hectares']:.2f}", f"{max_case['available_area_hectares']:.2f}"],
["有效面积 (公顷)", f"{min_case['effective_area_hectares']:.2f}", f"{max_case['effective_area_hectares']:.2f}"],
["理论最大容量 (MW)", f"{min_case['theoretical_max_capacity_mw']:.2f}",
f"{max_case['theoretical_max_capacity_mw']:.2f}"],
["单块组件占地 (m²)", f"{min_case['panel_area_sqm']:.2f}", f"{max_case['panel_area_sqm']:.2f}"],
["组件数量", f"{min_case['actual_panels']:,}", f"{max_case['actual_panels']:,}"],
["倾角 (度)", f"{min_case['tilt']:.2f}", f"{max_case['tilt']:.2f}"],
["方位角 (度)", f"{min_case['azimuth']:.2f}", f"{max_case['azimuth']:.2f}"],
["阵列间距 (m)", f"{min_case['array_distance']:.2f}", f"{max_case['array_distance']:.2f}"],
["单块功率 (Wp)", f"{min_case['max_power']}", f"{max_case['max_power']}"],
["装机容量 (MW)", f"{min_case['capacity'] / 1000:.2f}", f"{max_case['capacity'] / 1000:.2f}"],
["峰值日照 (小时/天)", f"{min_case['peak_sunshine_hours']:.2f}", f"{max_case['peak_sunshine_hours']:.2f}"],
["年发电量 (kWh)", f"{min_case['annual_energy']:,.0f}", f"{max_case['annual_energy']:,.0f}"],
["等效小时数", f"{min_case['equivalent_hours']:.2f}", f"{max_case['equivalent_hours']:.2f}"],
["LCOE (元/kWh)", f"{min_case['lcoe']:.4f}", f"{max_case['lcoe']:.4f}"],
["标准煤减排 (kg)", f"{min_case['coal_reduction']:,.0f}", f"{max_case['coal_reduction']:,.0f}"],
["CO₂减排 (kg)", f"{min_case['CO2_reduction']:,.0f}", f"{max_case['CO2_reduction']:,.0f}"],
["SO₂减排 (kg)", f"{min_case['SO2_reduction']:,.0f}", f"{max_case['SO2_reduction']:,.0f}"],
["NOx减排 (kg)", f"{min_case['NOX_reduction']:,.0f}", f"{max_case['NOX_reduction']:,.0f}"],
["IRR (%)", f"{min_case['IRR']:.2f}", f"{max_case['IRR']:.2f}"]
]
print("\n===== 光伏系统潜力评估结果 =====")
print(tabulate(table_data, headers=headers, tablefmt="grid"))
# 主程序
if __name__ == "__main__":
while True:
try:
# 输入参数
print("\n======= 光伏系统潜力评估 =======")
print("请输入以下必要参数:")
# 输入经纬度
latitude = float(input("请输入纬度(-90到90例如39.9"))
if not -90 <= latitude <= 90:
raise ValueError("纬度必须在-90到90之间")
longitude = float(input("请输入经度(-180到180例如116.4"))
if not -180 <= longitude <= 180:
raise ValueError("经度必须在-180到180之间")
# 输入可用面积
available_area_sq_km = float(input("请输入识别面积平方千米例如10"))
if available_area_sq_km <= 0:
raise ValueError("识别面积必须大于0")
# 输入光伏类型
pv_type = input("请输入光伏类型distributed, centralized, floating").lower()
if pv_type not in ["distributed", "centralized", "floating"]:
raise ValueError("光伏类型必须是 distributed, centralized 或 floating")
# 输入地形类型
valid_terrains = list(TERRAIN_COMPLEXITY_RANGES.get(pv_type, {}).keys())
print(f"支持的地形类型:{valid_terrains}")
terrain_type = input("请输入地形类型:")
if terrain_type not in valid_terrains:
raise ValueError(f"不支持的地形类型:{terrain_type}")
# 输入组件型号
component_name = input("请输入光伏组件型号需在Excel中存在例如M10-72H")
# 输入其他参数
slope_deg = float(input("请输入地形坡度0-25例如10"))
if not 0 <= slope_deg <= 25:
raise ValueError("坡度必须在0-25度之间")
terrain_complexity = float(input("请输入地形复杂性因子参考范围1.0-2.0例如1.2"))
min_complexity, max_complexity = TERRAIN_COMPLEXITY_RANGES[pv_type][terrain_type]
if not min_complexity <= terrain_complexity <= max_complexity:
raise ValueError(f"地形复杂性因子必须在 {min_complexity}-{max_complexity} 之间")
electricity_price = float(input("请输入电价(元/kWh例如0.65"))
if electricity_price < 0:
raise ValueError("电价必须非负")
# 计算光伏潜力
result = calculate_pv_potential(
available_area_sq_km=available_area_sq_km,
component_name=component_name,
longitude=longitude,
latitude=latitude,
slope_deg=slope_deg,
terrain_complexity=terrain_complexity,
terrain_type=terrain_type,
pv_type=pv_type,
electricity_price=electricity_price
)
# 输出结果
print_result(result["min_case"], result["max_case"])
# 询问是否继续
if input("\n是否继续评估?(y/n)").lower() != 'y':
break
except ValueError as ve:
print(f"输入错误:{ve}")
except FileNotFoundError as fe:
print(f"文件错误:{fe}")
except Exception as e:
print(f"发生错误:{e}")
print("请重新输入参数或检查错误。\n")

View File

@ -0,0 +1,19 @@
目的:修改之前代码,其中输入经纬度进行倾角和峰值日照时数的查询(不通过城市)。
【1】文件夹
pv_product.xlsx
为组件,相较之前没有改变
倾角_峰值小时数.xlsx
为通过城市查的倾角和峰值时数做了变动增加了一些城市和市级的json相匹配一部分没添加数值
中国_市.geojson
市一级别行政json图
PV_total3.py
思路通过经纬度查找市json图对应的所在市区再通过”倾角_峰值小时数.xlsx“表格查找倾角和峰值日照时数
主要修改51-10行部分。
【2】文件夹
PV_total2.py
通过nasa的api接口获取10年-23年平均峰值日照时数
倾角 = 纬度(弧度) × 0.86 + 24
主要修改47-97行部分。
!注意!:需要挂梯子;计算的倾角在高纬度地区较之前较小,峰值日照时数较大。详细对照最后的结果

View File

@ -0,0 +1,189 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping, box
from tqdm import tqdm
import os
def merge_and_rasterize_vectors_seven_categories(file_paths, output_vector_path, output_raster_path,
raster_resolution=10, block_size=2000, chunk_size=10000):
"""
合并7个地貌类型矢量图层转为栅格并显示7个类别保留0值作为背景优化合并策略去掉cropland
参数:
file_paths: 包含7个矢量文件路径的列表 (按顺序: forest, water, shrub, wetland, jianzhu, grass, bareland)
output_vector_path: 输出合并后的矢量文件路径建议使用.gpkg格式
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认10米
block_size: 每个分块的宽度和高度像素单位默认2000
chunk_size: 每次处理矢量数据的块大小默认10000条记录
"""
try:
# 检查输入文件数量
if len(file_paths) != 7:
raise ValueError("请提供正好7个矢量文件路径")
# 定义类别映射按照指定顺序去掉cropland
categories = ['forest', 'water', 'shrub', 'wetland', 'jianzhu', 'grass', 'bareland']
category_values = {cat: i + 1 for i, cat in enumerate(categories)} # forest=1, water=2, ..., bareland=7
# 逐块读取矢量文件,合并到内存中
gdfs = []
base_crs = None
for i, (path, category) in enumerate(tqdm(zip(file_paths, categories), total=7, desc="读取矢量文件进度"), 1):
print(f"正在处理第{i}个矢量文件: {path}")
gdf = gpd.read_file(path, engine="pyogrio")
gdf['category'] = category # 添加类别字段
gdf['value'] = category_values[category] # 添加数值字段用于栅格化
gdf['value'] = gdf['value'].astype(int) # 确保 value 为整数类型
if 'Shape_Area' in gdf.columns:
gdf['Shape_Area'] = gdf['Shape_Area'].clip(upper=1e9) # 限制最大值,避免溢出
# 统一坐标系(以第一个图层为基准)
if i == 1:
base_crs = gdf.crs
else:
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdf = gdf.to_crs(base_crs)
print(f"{i}个图层 value 字段分布:", gdf['value'].value_counts(dropna=False))
gdfs.append(gdf)
# 在内存中合并所有 gdf
print("正在合并所有图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True), crs=base_crs)
print("合并后 value 字段分布:", merged_gdf['value'].value_counts(dropna=False))
# 保存合并后的矢量文件
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path, driver='GPKG', engine="pyogrio")
# 重新加载合并后的文件,检查 value 字段
print("正在重新加载合并后的文件...")
merged_gdf = gpd.read_file(output_vector_path, engine="pyogrio")
print("重新加载后 value 字段分布:", merged_gdf['value'].value_counts(dropna=False))
# 数据清洗:处理 value 字段
print("正在清洗 'value' 字段数据...")
# 处理 nan 值
if merged_gdf['value'].isna().any():
print("警告: 'value' 字段包含 nan 值,将替换为 0")
merged_gdf['value'] = merged_gdf['value'].fillna(0)
# 转换为整数类型
merged_gdf['value'] = merged_gdf['value'].astype(float).astype(int)
# 确保值在 0-7 范围内
valid_values = set(range(0, 8)) # 0=背景, 1=forest, ..., 7=bareland
if not merged_gdf['value'].isin(valid_values).all():
print(f"警告: 'value' 字段包含无效值: {set(merged_gdf['value'].unique()) - valid_values},将替换为 0")
merged_gdf['value'] = merged_gdf['value'].apply(lambda x: x if x in valid_values else 0)
print("清洗后 value 字段分布:", merged_gdf['value'].value_counts(dropna=False))
# 构建空间索引以加速后续栅格化
print("正在构建空间索引以加速栅格化...")
sindex = merged_gdf.sindex
# 计算栅格化的范围
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
width = int((bounds[2] - bounds[0]) / raster_resolution)
height = int((bounds[3] - bounds[1]) / raster_resolution)
# 检查栅格尺寸是否有效
if width <= 0 or height <= 0:
raise ValueError(f"栅格尺寸无效: 宽度={width}, 高度={height},检查范围或分辨率")
# 创建栅格化变换
transform = rasterio.transform.from_bounds(
bounds[0], bounds[1], bounds[2], bounds[3], width, height
)
# 计算总分块数以设置进度条
total_blocks = ((height + block_size - 1) // block_size) * ((width + block_size - 1) // block_size)
print(f"总分块数: {total_blocks}")
# 创建并写入栅格文件(逐块写入,避免一次性占用大量内存)
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype=rasterio.uint8,
crs=base_crs,
transform=transform,
nodata=0
) as dst:
# 分块处理并添加进度条
with tqdm(total=total_blocks, desc="栅格化进度") as pbar:
for y in range(0, height, block_size):
for x in range(0, width, block_size):
block_height = min(block_size, height - y)
block_width = min(block_size, width - x)
# 计算当前分块的地理范围
block_minx = bounds[0] + x * raster_resolution
block_maxy = bounds[3] - y * raster_resolution
block_maxx = block_minx + block_width * raster_resolution
block_miny = block_maxy - block_height * raster_resolution
# 创建分块边界框并使用空间索引查询
block_bounds = box(block_minx, block_miny, block_maxx, block_maxy)
indices = sindex.intersection(block_bounds.bounds)
block_gdf = merged_gdf.iloc[list(indices)].copy()
if block_gdf.empty:
pbar.update(1)
continue
# 打印分块中的 value 分布
print(f"分块 (x={x}, y={y}) value 字段分布:", block_gdf['value'].value_counts(dropna=False))
# 分块栅格化
block_transform = rasterio.transform.from_bounds(
block_minx, block_miny, block_maxx, block_maxy, block_width, block_height
)
shapes = [(mapping(geom), value) for geom, value in zip(block_gdf.geometry, block_gdf['value'])]
block_raster = rasterize(
shapes=shapes,
out_shape=(block_height, block_width),
transform=block_transform,
fill=0,
dtype=rasterio.uint8
)
# 直接写入当前分块到文件中
dst.write(block_raster, 1, window=((y, y + block_height), (x, x + block_width)))
pbar.update(1)
print("处理完成!")
print(f"合并矢量保存为: {output_vector_path}")
print(f"栅格保存为: {output_raster_path}")
print(
f"栅格中类别值: 0=背景, 1=forest, 2=water, 3=shrub, 4=wetland, 5=jianzhu, 6=grass, 7=bareland")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
# 输入7个矢量文件路径按顺序: forest, water, shrub, wetland, jianzhu, grass, bareland去掉cropland
input_files = [
r"weifang/weifang/forest/forest31.shp",
r"weifang/weifang/water/water1.shp",
r"weifang/weifang/shrub/shrub.shp",
r"weifang/weifang/wetland/wetland.shp",
r"weifang/weifang/jianzhu/jianzhu1.shp",
r"weifang/weifang/grass/grass.shp",
r"weifang/weifang/bareland/bareland.shp"
]
output_vector_file = r"weifang/weifang/weifanghebing/weifangdimiao.gpkg" # 改为.gpkg
output_raster_file = r"weifang/weifang/weifanghebing/weifangdimao.tif"
# 执行合并和栅格化
merge_and_rasterize_vectors_seven_categories(input_files, output_vector_file, output_raster_file,
raster_resolution=10, block_size=2000, chunk_size=10000)

View File

@ -0,0 +1,229 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping, box
import os
from tqdm import tqdm # 导入 tqdm 用于进度条
def merge_vectors_seven_categories(file_paths, output_vector_path):
"""
合并7个地貌类型矢量图层并保存为GeoPackage格式去掉cropland
参数:
file_paths: 包含7个矢量文件路径的列表 (按顺序: forest, water, shrub, wetland, jianzhu, grass, bareland)
output_vector_path: 输出合并后的矢量文件路径使用.gpkg格式
"""
try:
# 检查输入文件数量是否为7
if len(file_paths) != 7:
raise ValueError("请提供正好7个矢量文件路径")
for path in file_paths:
if not os.path.exists(path):
raise FileNotFoundError(f"文件不存在: {path}")
# 定义7个类别去掉cropland
categories = ['forest', 'water', 'shrub', 'wetland', 'jianzhu', 'grass', 'bareland']
category_values = {cat: i + 1 for i, cat in enumerate(categories)} # forest=1, water=2, ..., bareland=7
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path, engine="pyogrio")
gdf['category'] = category
gdf['value'] = category_values[category]
if 'Shape_Area' in gdf.columns:
gdf['Shape_Area'] = gdf['Shape_Area'].clip(upper=1e9)
gdfs.append(gdf)
# 统一坐标系
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
if not output_vector_path.endswith('.gpkg'):
output_vector_path = output_vector_path.replace('.shp', '.gpkg')
print("输出格式已更改为GeoPackage以支持大文件")
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path, driver='GPKG', engine="pyogrio")
print("矢量合并完成!")
print(f"合并矢量保存为: {output_vector_path}")
return output_vector_path
except Exception as e:
print(f"发生错误: {str(e)}")
return None
def rasterize_vector_by_blocks(input_vector_path, output_raster_path, raster_resolution=30, block_size=5000):
"""
将合并后的矢量文件分块转为栅格显示7个类别背景值设为 value=8添加进度条和像素统计
参数:
input_vector_path: 输入的合并矢量文件路径.gpkg格式
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
block_size: 每个分块的宽度和高度像素单位默认5000
"""
try:
# 读取合并后的矢量文件
print(f"正在读取合并矢量文件: {input_vector_path}")
merged_gdf = gpd.read_file(input_vector_path, engine="pyogrio")
# 检查 value 字段是否有效
if 'value' not in merged_gdf.columns:
raise ValueError("矢量文件中缺少 'value' 字段,请确保文件包含正确的分类值")
print("合并矢量文件中的 value 字段分布:")
print(merged_gdf['value'].value_counts(dropna=False))
if not merged_gdf['value'].between(1, 7).all():
print("警告: 'value' 字段包含无效值(应在 1-7 之间),可能导致数据丢失")
merged_gdf = merged_gdf[merged_gdf['value'].between(1, 7)]
# 检查合并后的矢量数据是否为空
if merged_gdf.empty:
raise ValueError("合并后的矢量数据为空,请检查输入数据是否有效")
# 按 value 字段排序,优先级高的类别(例如 value 较大)后处理,避免被覆盖
print("正在按 'value' 字段排序以确保优先级...")
merged_gdf = merged_gdf.sort_values(by='value', ascending=True)
# 计算总范围和栅格尺寸
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
total_width = int((bounds[2] - bounds[0]) / raster_resolution)
total_height = int((bounds[3] - bounds[1]) / raster_resolution)
if total_width <= 0 or total_height <= 0:
raise ValueError(f"栅格尺寸无效: 宽度={total_width}, 高度={total_height}")
print(f"栅格范围: minx={bounds[0]}, miny={bounds[1]}, maxx={bounds[2]}, maxy={bounds[3]}")
print(f"栅格尺寸: 宽度={total_width}, 高度={total_height}")
# 创建栅格变换
transform = rasterio.transform.from_bounds(bounds[0], bounds[1], bounds[2], bounds[3], total_width,
total_height)
# 计算总分块数以设置进度条
total_blocks = ((total_height + block_size - 1) // block_size) * ((total_width + block_size - 1) // block_size)
print(f"总分块数: {total_blocks}")
# 创建并写入栅格文件,逐块写入以减少内存占用
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=total_height,
width=total_width,
count=1,
dtype=rasterio.uint8,
crs=merged_gdf.crs,
transform=transform,
nodata=255 # 使用 255 作为 nodata 值,区分 value=8 的背景
) as dst:
# 分块处理并添加进度条
with tqdm(total=total_blocks, desc="栅格化进度") as pbar:
for y in range(0, total_height, block_size):
for x in range(0, total_width, block_size):
block_height = min(block_size, total_height - y)
block_width = min(block_size, total_width - x)
# 计算当前分块的地理范围
block_minx = bounds[0] + x * raster_resolution
block_maxy = bounds[3] - y * raster_resolution
block_maxx = block_minx + block_width * raster_resolution
block_miny = block_maxy - block_height * raster_resolution
# 创建分块边界框
block_bounds = box(block_minx, block_miny, block_maxx, block_maxy)
block_gdf = merged_gdf[merged_gdf.geometry.intersects(block_bounds)].copy()
# 初始化分块为背景值 8
block_raster = np.full((block_height, block_width), 8, dtype=np.uint8)
if not block_gdf.empty:
# 打印分块中的 value 字段分布
print(f"分块 (x={x}, y={y}) 中的 value 字段分布:")
print(block_gdf['value'].value_counts(dropna=False))
# 分块栅格化
block_transform = rasterio.transform.from_bounds(
block_minx, block_miny, block_maxx, block_maxy, block_width, block_height
)
shapes = ((mapping(geom), value) for geom, value in
zip(block_gdf.geometry, block_gdf['value']))
block_raster = rasterize(
shapes=shapes,
out=block_raster, # 使用预初始化的数组
transform=block_transform,
fill=8, # 未覆盖区域为 8
dtype=rasterio.uint8,
all_touched=True # 确保所有触及的像素都被计入
)
# 统计分块中 value=1 到 value=7 的像素点个数
print(f"分块 (x={x}, y={y}) 中像素值统计:")
unique, counts = np.unique(block_raster, return_counts=True)
value_counts = dict(zip(unique, counts))
for val in range(1, 8): # 检查 value=1 到 value=7
count = value_counts.get(val, 0)
print(f"Value={val}: {count} 像素")
# 单独打印 value=8背景值
background_count = value_counts.get(8, 0)
print(f"Value=8 (background): {background_count} 像素")
# 直接写入当前分块到文件中
dst.write(block_raster, 1, window=((y, y + block_height), (x, x + block_width)))
print(f"已完成分块栅格化: x={x}, y={y}, 宽度={block_width}, 高度={block_height}")
pbar.update(1)
# 读取整个栅格文件,统计最终像素分布
with rasterio.open(output_raster_path) as src:
final_raster = src.read(1)
print("最终栅格文件中的像素值统计:")
unique, counts = np.unique(final_raster, return_counts=True)
value_counts = dict(zip(unique, counts))
for val in range(1, 9): # 检查 value=1 到 value=8
count = value_counts.get(val, 0)
print(f"Value={val}: {count} 像素")
# 打印 nodata 值255
nodata_count = value_counts.get(255, 0)
print(f"Nodata (255): {nodata_count} 像素")
print("栅格化完成!")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 1=forest, 2=water, 3=shrub, 4=wetland, 5=jianzhu, 6=grass, 7=bareland, 8=background")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
input_files = [
r"sddimaoshp/forest/sdforest.shp",
r"sddimaoshp/water/shandongsuiyu3.shp",
r"sddimaoshp/shrub/sdshrub.shp",
r"sddimaoshp/wetland/sdwetland.shp",
r"sddimaoshp/jianzhu/shandongjianzhu3.shp",
r"sddimaoshp/grass/sdgrass.shp",
r"sddimaoshp/bareland/sdbareland.shp"
]
output_vector_file = r"sddimaoshp/hebingtif/shandongdimiao_10_1m.gpkg"
output_raster_file = r"sddimaoshp/hebingtif/sddimao_10_1m.tif"
# Step 1: 合并矢量
vector_path = merge_vectors_seven_categories(input_files, output_vector_file)
# Step 2: 如果合并成功,进行分块栅格化
if vector_path:
rasterize_vector_by_blocks(vector_path, output_raster_file, raster_resolution=10, block_size=5000)

View File

@ -0,0 +1,191 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping, box
import os
from tqdm import tqdm # 导入 tqdm 用于进度条
def merge_vectors_seven_categories(file_paths, output_vector_path):
"""
合并7个地貌类型矢量图层并保存为GeoPackage格式去掉cropland
参数:
file_paths: 包含7个矢量文件路径的列表 (按顺序: forest, water, shrub, wetland, jianzhu, grass, bareland)
output_vector_path: 输出合并后的矢量文件路径使用.gpkg格式
"""
try:
# 检查输入文件数量是否为7
if len(file_paths) != 7:
raise ValueError("请提供正好7个矢量文件路径")
for path in file_paths:
if not os.path.exists(path):
raise FileNotFoundError(f"文件不存在: {path}")
# 定义7个类别去掉cropland
categories = ['forest', 'water', 'shrub', 'wetland', 'jianzhu', 'grass', 'bareland']
category_values = {cat: i + 1 for i, cat in enumerate(categories)} # forest=1, water=2, ..., bareland=7
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path, engine="pyogrio")
gdf['category'] = category
gdf['value'] = category_values[category]
if 'Shape_Area' in gdf.columns:
gdf['Shape_Area'] = gdf['Shape_Area'].clip(upper=1e9)
gdfs.append(gdf)
# 统一坐标系
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
if not output_vector_path.endswith('.gpkg'):
output_vector_path = output_vector_path.replace('.shp', '.gpkg')
print("输出格式已更改为GeoPackage以支持大文件")
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path, driver='GPKG', engine="pyogrio")
print("矢量合并完成!")
print(f"合并矢量保存为: {output_vector_path}")
return output_vector_path
except Exception as e:
print(f"发生错误: {str(e)}")
return None
def rasterize_vector_by_blocks(input_vector_path, output_raster_path, raster_resolution=30, block_size=5000):
"""
将合并后的矢量文件分块转为栅格显示7个类别背景值设为 value=8添加进度条不清理数据
参数:
input_vector_path: 输入的合并矢量文件路径.gpkg格式
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
block_size: 每个分块的宽度和高度像素单位默认5000
"""
try:
# 读取合并后的矢量文件
print(f"正在读取合并矢量文件: {input_vector_path}")
merged_gdf = gpd.read_file(input_vector_path, engine="pyogrio")
# 检查 value 字段是否有效
if 'value' not in merged_gdf.columns:
raise ValueError("矢量文件中缺少 'value' 字段,请确保文件包含正确的分类值")
if not merged_gdf['value'].between(1, 7).all():
print("警告: 'value' 字段包含无效值(应在 1-7 之间),可能导致数据丢失")
merged_gdf = merged_gdf[merged_gdf['value'].between(1, 7)]
# 按 value 字段排序,优先级高的类别(例如 value 较大)后处理,避免被覆盖
print("正在按 'value' 字段排序以确保优先级...")
merged_gdf = merged_gdf.sort_values(by='value', ascending=True)
# 计算总范围和栅格尺寸
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
total_width = int((bounds[2] - bounds[0]) / raster_resolution)
total_height = int((bounds[3] - bounds[1]) / raster_resolution)
if total_width <= 0 or total_height <= 0:
raise ValueError(f"栅格尺寸无效: 宽度={total_width}, 高度={total_height}")
# 创建栅格变换
transform = rasterio.transform.from_bounds(bounds[0], bounds[1], bounds[2], bounds[3], total_width,
total_height)
# 计算总分块数以设置进度条
total_blocks = ((total_height + block_size - 1) // block_size) * ((total_width + block_size - 1) // block_size)
print(f"总分块数: {total_blocks}")
# 创建并写入栅格文件,逐块写入以减少内存占用
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=total_height,
width=total_width,
count=1,
dtype=rasterio.uint8,
crs=merged_gdf.crs,
transform=transform,
nodata=255 # 使用 255 作为 nodata 值,区分 value=8 的背景
) as dst:
# 分块处理并添加进度条
with tqdm(total=total_blocks, desc="栅格化进度") as pbar:
for y in range(0, total_height, block_size):
for x in range(0, total_width, block_size):
block_height = min(block_size, total_height - y)
block_width = min(block_size, total_width - x)
# 计算当前分块的地理范围
block_minx = bounds[0] + x * raster_resolution
block_maxy = bounds[3] - y * raster_resolution
block_maxx = block_minx + block_width * raster_resolution
block_miny = block_maxy - block_height * raster_resolution
# 创建分块边界框
block_bounds = box(block_minx, block_miny, block_maxx, block_maxy)
block_gdf = merged_gdf[merged_gdf.geometry.intersects(block_bounds)].copy()
# 初始化分块为背景值 8
block_raster = np.full((block_height, block_width), 8, dtype=np.uint8)
if not block_gdf.empty:
# 分块栅格化
block_transform = rasterio.transform.from_bounds(
block_minx, block_miny, block_maxx, block_miny, block_width, block_height
)
shapes = ((mapping(geom), value) for geom, value in
zip(block_gdf.geometry, block_gdf['value']))
block_raster = rasterize(
shapes=shapes,
out=block_raster, # 使用预初始化的数组
transform=block_transform,
fill=8, # 未覆盖区域为 8
dtype=rasterio.uint8,
all_touched=True # 确保所有触及的像素都被计入
)
# 直接写入当前分块到文件中
dst.write(block_raster, 1, window=((y, y + block_height), (x, x + block_width)))
print(f"已完成分块栅格化: x={x}, y={y}, 宽度={block_width}, 高度={block_height}")
pbar.update(1)
print("栅格化完成!")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 1=forest, 2=water, 3=shrub, 4=wetland, 5=jianzhu, 6=grass, 7=bareland, 8=background")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
input_files = [
r"sddimaoshp/forest/sdforest.shp",
r"sddimaoshp/water/sdwater.shp",
r"sddimaoshp/shrub/sdshrub.shp",
r"sddimaoshp/wetland/sdwetland.shp",
r"sddimaoshp/jianzhu/jianzhu2.shp",
r"sddimaoshp/grass/sdgrass.shp",
r"sddimaoshp/bareland/sdbareland.shp"
]
output_vector_file = r"sddimaoshp/hebingtif/shandongdimiao31.gpkg"
output_raster_file = r"sddimaoshp/hebingtif/sddimao31.tif"
# Step 1: 合并矢量
vector_path = merge_vectors_seven_categories(input_files, output_vector_file)
# Step 2: 如果合并成功,进行分块栅格化
if vector_path:
rasterize_vector_by_blocks(vector_path, output_raster_file, raster_resolution=10, block_size=5000)

View File

@ -0,0 +1,111 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping
def merge_and_rasterize_vectors_two_categories(file_paths, output_vector_path, output_raster_path,
raster_resolution=30):
"""
合并2个矢量图层forest, water转为栅格并显示2个类别保留0值作为背景
参数:
file_paths: 包含2个矢量文件路径的列表 (按顺序: forest, water)
output_vector_path: 输出合并后的矢量文件路径
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
"""
try:
# 检查输入文件数量
if len(file_paths) != 2:
raise ValueError("请提供正好2个矢量文件路径")
# 定义类别映射,按照指定顺序
categories = ['forest', 'water']
category_values = {'forest': 1, 'water': 2} # forest=1, water=2
# 读取所有矢量文件并添加分类
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path)
gdf['category'] = category # 添加类别字段
gdf['value'] = category_values[category] # 添加数值字段用于栅格化
gdfs.append(gdf)
# 检查和统一坐标系(以第一个图层为基准)
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
# 合并所有GeoDataFrame
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
# 保存合并后的矢量文件
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path)
# 计算栅格化的范围
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
width = int((bounds[2] - bounds[0]) / raster_resolution)
height = int((bounds[3] - bounds[1]) / raster_resolution)
# 创建栅格化变换
transform = rasterio.transform.from_bounds(
bounds[0], bounds[1], bounds[2], bounds[3], width, height
)
# 栅格化:将几何对象转为栅格,使用'value'字段作为像素值
print("正在将矢量转为栅格...")
shapes = ((mapping(geom), value) for geom, value in zip(merged_gdf.geometry, merged_gdf['value']))
raster = rasterize(
shapes=shapes,
out_shape=(height, width),
transform=transform,
fill=0, # 背景值为0
dtype=rasterio.uint8
)
# 保存栅格文件
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype=rasterio.uint8,
crs=base_crs,
transform=transform,
nodata=0 # 设置nodata值为0表示背景
) as dst:
dst.write(raster, 1)
print("处理完成!")
print(f"合并矢量保存为: {output_vector_path}")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 0=背景, 1=forest, 2=water")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
# 输入2个矢量文件路径按顺序: forest, water
input_files = [
r"E:\Data\z18\sd\Total_sd\shandonghebingshp\sdshp\forest\sdforest.shp",
r"E:\Data\z18\sd\Total_sd\shandonghebingshp\sdshp\water\true\shandongsuiyu3.shp"
]
output_vector_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_vector_forest_water.shp"
output_raster_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_raster_forest_water.tif"
# 执行合并和栅格化
merge_and_rasterize_vectors_two_categories(input_files, output_vector_file, output_raster_file,
raster_resolution=10)

View File

@ -0,0 +1,113 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping
def merge_and_rasterize_vectors_four_categories(file_paths, output_vector_path, output_raster_path,
raster_resolution=30):
"""
合并4个矢量图层bareland, grass, shrub, building转为栅格并显示4个类别保留0值作为背景
参数:
file_paths: 包含4个矢量文件路径的列表 (按顺序: bareland, grass, shrub, building)
output_vector_path: 输出合并后的矢量文件路径
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
"""
try:
# 检查输入文件数量
if len(file_paths) != 4:
raise ValueError("请提供正好4个矢量文件路径")
# 定义类别映射,按照指定顺序
categories = ['bareland', 'grass', 'shrub', 'building']
category_values = {'bareland': 1, 'grass': 2, 'shrub': 3, 'building': 4} # bareland=1, grass=2, shrub=3, building=4
# 读取所有矢量文件并添加分类
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path)
gdf['category'] = category # 添加类别字段
gdf['value'] = category_values[category] # 添加数值字段用于栅格化
gdfs.append(gdf)
# 检查和统一坐标系(以第一个图层为基准)
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
# 合并所有GeoDataFrame
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
# 保存合并后的矢量文件
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path)
# 计算栅格化的范围
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
width = int((bounds[2] - bounds[0]) / raster_resolution)
height = int((bounds[3] - bounds[1]) / raster_resolution)
# 创建栅格化变换
transform = rasterio.transform.from_bounds(
bounds[0], bounds[1], bounds[2], bounds[3], width, height
)
# 栅格化:将几何对象转为栅格,使用'value'字段作为像素值
print("正在将矢量转为栅格...")
shapes = ((mapping(geom), value) for geom, value in zip(merged_gdf.geometry, merged_gdf['value']))
raster = rasterize(
shapes=shapes,
out_shape=(height, width),
transform=transform,
fill=0, # 背景值为0
dtype=rasterio.uint8
)
# 保存栅格文件
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype=rasterio.uint8,
crs=base_crs,
transform=transform,
nodata=0 # 设置nodata值为0表示背景
) as dst:
dst.write(raster, 1)
print("处理完成!")
print(f"合并矢量保存为: {output_vector_path}")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 0=背景, 1=bareland, 2=grass, 3=shrub, 4=building")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
# 输入4个矢量文件路径按顺序: bareland, grass, shrub, building
input_files = [
r"E:\njds\shenyang\shengyangshp\bareland\bareland.shp",
r"E:\njds\shenyang\shengyangshp\grass\grass2.shp",
r"E:\njds\shenyang\shengyangshp\shrub\shrub.shp",
r"E:\Data\z18\sd\Total_sd\shandonghebingshp\sdshp\jianzhu\true\shandongjianzhu3.shp"
]
output_vector_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_vector_four.shp"
output_raster_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_raster_four.tif"
# 执行合并和栅格化
merge_and_rasterize_vectors_four_categories(input_files, output_vector_file, output_raster_file,
raster_resolution=10)

View File

@ -0,0 +1,112 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping
def merge_and_rasterize_vectors_three_categories(file_paths, output_vector_path, output_raster_path,
raster_resolution=30):
"""
合并3个矢量图层bareland, grass, shrub转为栅格并显示3个类别保留0值作为背景
参数:
file_paths: 包含3个矢量文件路径的列表 (按顺序: bareland, grass, shrub)
output_vector_path: 输出合并后的矢量文件路径
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
"""
try:
# 检查输入文件数量
if len(file_paths) != 3:
raise ValueError("请提供正好3个矢量文件路径")
# 定义类别映射,按照指定顺序
categories = ['bareland', 'grass', 'shrub']
category_values = {'bareland': 1, 'grass': 2, 'shrub': 3} # bareland=1, grass=2, shrub=3
# 读取所有矢量文件并添加分类
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path)
gdf['category'] = category # 添加类别字段
gdf['value'] = category_values[category] # 添加数值字段用于栅格化
gdfs.append(gdf)
# 检查和统一坐标系(以第一个图层为基准)
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
# 合并所有GeoDataFrame
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
# 保存合并后的矢量文件
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path)
# 计算栅格化的范围
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
width = int((bounds[2] - bounds[0]) / raster_resolution)
height = int((bounds[3] - bounds[1]) / raster_resolution)
# 创建栅格化变换
transform = rasterio.transform.from_bounds(
bounds[0], bounds[1], bounds[2], bounds[3], width, height
)
# 栅格化:将几何对象转为栅格,使用'value'字段作为像素值
print("正在将矢量转为栅格...")
shapes = ((mapping(geom), value) for geom, value in zip(merged_gdf.geometry, merged_gdf['value']))
raster = rasterize(
shapes=shapes,
out_shape=(height, width),
transform=transform,
fill=0, # 背景值为0
dtype=rasterio.uint8
)
# 保存栅格文件
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype=rasterio.uint8,
crs=base_crs,
transform=transform,
nodata=0 # 设置nodata值为0表示背景
) as dst:
dst.write(raster, 1)
print("处理完成!")
print(f"合并矢量保存为: {output_vector_path}")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 0=背景, 1=bareland, 2=grass, 3=shrub")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
# 输入3个矢量文件路径按顺序: bareland, grass, shrub
input_files = [
r"E:\njds\shenyang\shengyangshp\bareland\bareland.shp",
r"E:\njds\shenyang\shengyangshp\grass\grass2.shp",
r"E:\njds\shenyang\shengyangshp\shrub\shrub.shp"
]
output_vector_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_vector_three.shp"
output_raster_file = r"E:\njds\shenyang\shengyangshp\kejianshe\merged_raster_three.tif"
# 执行合并和栅格化
merge_and_rasterize_vectors_three_categories(input_files, output_vector_file, output_raster_file,
raster_resolution=30)

View File

@ -0,0 +1,190 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
import numpy as np
from shapely.geometry import mapping, box
import os
from tqdm import tqdm # 导入 tqdm 用于进度条
def merge_vectors_seven_categories(file_paths, output_vector_path):
"""
合并7个地貌类型矢量图层并保存为GeoPackage格式去掉cropland
参数:
file_paths: 包含7个矢量文件路径的列表 (按顺序: forest, water, shrub, wetland, jianzhu, grass, bareland)
output_vector_path: 输出合并后的矢量文件路径使用.gpkg格式
"""
try:
# 检查输入文件数量是否为7
if len(file_paths) != 7:
raise ValueError("请提供正好7个矢量文件路径")
for path in file_paths:
if not os.path.exists(path):
raise FileNotFoundError(f"文件不存在: {path}")
# 定义7个类别去掉cropland
categories = ['forest', 'water', 'shrub', 'wetland', 'jianzhu', 'grass', 'bareland']
category_values = {cat: i + 1 for i, cat in enumerate(categories)} # forest=1, water=2, ..., bareland=7
gdfs = []
for i, (path, category) in enumerate(zip(file_paths, categories), 1):
print(f"正在读取第{i}个矢量文件: {path}")
gdf = gpd.read_file(path, engine="pyogrio")
gdf['category'] = category
gdf['value'] = category_values[category]
if 'Shape_Area' in gdf.columns:
gdf['Shape_Area'] = gdf['Shape_Area'].clip(upper=1e9)
gdfs.append(gdf)
# 统一坐标系
base_crs = gdfs[0].crs
for i, gdf in enumerate(gdfs[1:], 2):
if gdf.crs != base_crs:
print(f"{i}个图层坐标系不同,正在转换为第一个图层的坐标系...")
gdfs[i - 1] = gdf.to_crs(base_crs)
print("正在合并图层...")
merged_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
merged_gdf = merged_gdf.set_geometry('geometry')
if not output_vector_path.endswith('.gpkg'):
output_vector_path = output_vector_path.replace('.shp', '.gpkg')
print("输出格式已更改为GeoPackage以支持大文件")
print(f"正在保存合并矢量结果到: {output_vector_path}")
merged_gdf.to_file(output_vector_path, driver='GPKG', engine="pyogrio")
print("矢量合并完成!")
print(f"合并矢量保存为: {output_vector_path}")
return output_vector_path
except Exception as e:
print(f"发生错误: {str(e)}")
return None
def rasterize_vector_by_blocks(input_vector_path, output_raster_path, raster_resolution=30, block_size=5000):
"""
将合并后的矢量文件分块转为栅格显示7个类别保留0值作为背景添加进度条不清理数据
参数:
input_vector_path: 输入的合并矢量文件路径.gpkg格式
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格的分辨率默认30米
block_size: 每个分块的宽度和高度像素单位默认5000
"""
try:
# 读取合并后的矢量文件
print(f"正在读取合并矢量文件: {input_vector_path}")
merged_gdf = gpd.read_file(input_vector_path, engine="pyogrio")
# 检查 value 字段是否有效
if 'value' not in merged_gdf.columns:
raise ValueError("矢量文件中缺少 'value' 字段,请确保文件包含正确的分类值")
if not merged_gdf['value'].between(1, 7).all():
print("警告: 'value' 字段包含无效值(应在 1-7 之间),可能导致数据丢失")
merged_gdf = merged_gdf[merged_gdf['value'].between(1, 7)]
# 按 value 字段排序,优先级高的类别(例如 value 较大)后处理,避免被覆盖
print("正在按 'value' 字段排序以确保优先级...")
merged_gdf = merged_gdf.sort_values(by='value', ascending=True)
# 计算总范围和栅格尺寸
bounds = merged_gdf.total_bounds # [minx, miny, maxx, maxy]
total_width = int((bounds[2] - bounds[0]) / raster_resolution)
total_height = int((bounds[3] - bounds[1]) / raster_resolution)
if total_width <= 0 or total_height <= 0:
raise ValueError(f"栅格尺寸无效: 宽度={total_width}, 高度={total_height}")
# 创建栅格变换
transform = rasterio.transform.from_bounds(bounds[0], bounds[1], bounds[2], bounds[3], total_width,
total_height)
# 计算总分块数以设置进度条
total_blocks = ((total_height + block_size - 1) // block_size) * ((total_width + block_size - 1) // block_size)
print(f"总分块数: {total_blocks}")
# 创建并写入栅格文件,逐块写入以减少内存占用
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=total_height,
width=total_width,
count=1,
dtype=rasterio.uint8,
crs=merged_gdf.crs,
transform=transform,
nodata=0
) as dst:
# 分块处理并添加进度条
with tqdm(total=total_blocks, desc="栅格化进度") as pbar:
for y in range(0, total_height, block_size):
for x in range(0, total_width, block_size):
block_height = min(block_size, total_height - y)
block_width = min(block_size, total_width - x)
# 计算当前分块的地理范围
block_minx = bounds[0] + x * raster_resolution
block_maxy = bounds[3] - y * raster_resolution
block_maxx = block_minx + block_width * raster_resolution
block_miny = block_maxy - block_height * raster_resolution
# 创建分块边界框
block_bounds = box(block_minx, block_miny, block_maxx, block_maxy)
block_gdf = merged_gdf[merged_gdf.geometry.intersects(block_bounds)].copy()
if block_gdf.empty:
pbar.update(1)
continue
# 分块栅格化
block_transform = rasterio.transform.from_bounds(
block_minx, block_miny, block_maxx, block_maxy, block_width, block_height
)
shapes = ((mapping(geom), value) for geom, value in zip(block_gdf.geometry, block_gdf['value']))
block_raster = rasterize(
shapes=shapes,
out_shape=(block_height, block_width),
transform=block_transform,
fill=0,
dtype=rasterio.uint8,
all_touched=True # 确保所有触及的像素都被计入,避免小对象丢失
)
# 直接写入当前分块到文件中
dst.write(block_raster, 1, window=((y, y + block_height), (x, x + block_width)))
print(f"已完成分块栅格化: x={x}, y={y}, 宽度={block_width}, 高度={block_height}")
pbar.update(1)
print("栅格化完成!")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 0=背景, 1=forest, 2=water, 3=shrub, 4=wetland, 5=jianzhu, 6=grass, 7=bareland")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
input_files = [
r"sddimaoshp/forest/sdforest.shp",
r"sddimaoshp/water/sdwater.shp",
r"sddimaoshp/shrub/sdshrub.shp",
r"sddimaoshp/wetland/sdwetland.shp",
r"sddimaoshp/jianzhu/jianzhu2.shp",
r"sddimaoshp/grass/sdgrass.shp",
r"sddimaoshp/bareland/sdbareland.shp"
]
output_vector_file = r"sddimaoshp/hebingtif/shandongdimiao_30m.gpkg"
output_raster_file = r"sddimaoshp/hebingtif/sddimao_30m.tif"
# Step 1: 合并矢量
vector_path = merge_vectors_seven_categories(input_files, output_vector_file)
# Step 2: 如果合并成功,进行分块栅格化
if vector_path:
rasterize_vector_by_blocks(vector_path, output_raster_file, raster_resolution=30, block_size=5000)

View File

@ -0,0 +1,153 @@
import numpy as np
import sys
import os
# 尝试多种方式导入GDAL
try:
# 尝试直接导入GDAL
import gdal
import ogr
import osr
print("成功导入GDAL库")
except ImportError:
try:
# 尝试从osgeo导入
from osgeo import gdal, ogr, osr
print("从osgeo成功导入GDAL库")
except ImportError:
print("无法导入GDAL库请确保已安装GDAL及其Python绑定")
sys.exit(1)
def process_rasters(shp_path, raster_path, output_path):
"""
使用shp文件裁剪栅格保留不重叠部分设为value=8然后与原栅格合并
参数:
shp_path: shapefile的路径
raster_path: 原始栅格文件路径 (value 1-7)
output_path: 输出栅格文件路径
"""
try:
# 设置GDAL错误处理
gdal.UseExceptions()
# 打开栅格文件
raster = gdal.Open(raster_path)
if raster is None:
print(f"无法打开栅格文件: {raster_path}")
return None
# 获取栅格信息
print(f"栅格大小: {raster.RasterXSize} x {raster.RasterYSize}, {raster.RasterCount} 波段")
geo_transform = raster.GetGeoTransform()
projection = raster.GetProjection()
print(f"栅格投影: {projection}")
# 读取栅格数据
band = raster.GetRasterBand(1)
raster_data = band.ReadAsArray()
nodata = band.GetNoDataValue()
if nodata is None:
nodata = 0
# 打开shapefile
driver = ogr.GetDriverByName("ESRI Shapefile")
try:
vector = driver.Open(shp_path, 0) # 0表示只读模式
except Exception as e:
print(f"无法打开shapefile: {e}")
return None
if vector is None:
print(f"无法打开shapefile: {shp_path}")
return None
# 获取shapefile信息
layer = vector.GetLayer()
feature_count = layer.GetFeatureCount()
print(f"Shapefile有 {feature_count} 个要素")
# 创建栅格掩码
# 使用GDAL内置的栅格化功能将shapefile转换为栅格
print("将shapefile栅格化...")
memory_driver = gdal.GetDriverByName('MEM')
mask_ds = memory_driver.Create('', raster.RasterXSize, raster.RasterYSize, 1, gdal.GDT_Byte)
mask_ds.SetGeoTransform(geo_transform)
mask_ds.SetProjection(projection)
# 栅格化
mask_band = mask_ds.GetRasterBand(1)
mask_band.Fill(0) # 初始化为0
# 将shapefile栅格化到掩码上
gdal.RasterizeLayer(mask_ds, [1], layer, burn_values=[1])
# 读取掩码数据
mask_data = mask_band.ReadAsArray()
# 栅格有值区域的掩码 (value 1-7)
valid_data_mask = (raster_data >= 1) & (raster_data <= 7)
# 获取被shapefile覆盖但不与原始有效栅格区域重叠的部分
shp_mask = mask_data > 0 # shapefile 覆盖区域
non_overlapping_mask = shp_mask & (~valid_data_mask) # shapefile 覆盖但不与原始栅格重叠区域
# 合并两个栅格
merged_raster = raster_data.copy()
merged_raster[non_overlapping_mask] = 8
# 创建输出栅格
driver = gdal.GetDriverByName('GTiff')
out_ds = driver.Create(output_path, raster.RasterXSize, raster.RasterYSize, 1, band.DataType)
out_ds.SetGeoTransform(geo_transform)
out_ds.SetProjection(projection)
# 写入数据
out_band = out_ds.GetRasterBand(1)
out_band.WriteArray(merged_raster)
out_band.SetNoDataValue(nodata)
# 关闭数据集
out_ds = None
mask_ds = None
raster = None
vector = None
print(f"处理完成,结果保存至: {output_path}")
# 返回处理后的统计信息
stats = {
"原始栅格有效区域(value 1-7)像素数": int(np.sum(valid_data_mask)),
"shapefile覆盖区域像素数": int(np.sum(shp_mask)),
"不重叠区域(value=8)像素数": int(np.sum(non_overlapping_mask)),
"结果栅格总有效像素数": int(np.sum(merged_raster > 0))
}
return stats
except Exception as e:
print(f"发生错误: {e}")
import traceback
traceback.print_exc()
return None
# 使用示例
if __name__ == "__main__":
# 输入文件路径
shp_path = r"E:\Data\z18\shandongshp\sd.shp"
raster_path = r"E:\Data\z18\sd\sd_y10m\sddimao_10m.tif"
output_path = r"E:\Data\z18\sd\sd_10m\sddimao_10mmerged.tif"
# 执行处理
stats = process_rasters(shp_path, raster_path, output_path)
# 打印统计信息
if stats:
for key, value in stats.items():
print(f"{key}: {value}")
else:
print("处理失败,无统计信息可显示")

View File

@ -0,0 +1,125 @@
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.features import rasterize
from rasterio.windows import Window
import numpy as np
from shapely.geometry import mapping
import os
def large_vector_to_raster(input_shp_path, output_raster_path, raster_resolution=30, chunk_size=1000, window_size=1000, category_field='value', category_value=1):
"""
将大型面要素转为栅格数据分块读取矢量并按窗口写入栅格适用于大文件例如2GB
参数:
input_shp_path: 输入shapefile路径
output_raster_path: 输出栅格文件路径
raster_resolution: 输出栅格分辨率单位默认30
chunk_size: 每次读取的矢量特征数量默认1000
window_size: 每个栅格窗口的像素大小默认1000x1000
category_field: 矢量数据中用于栅格化的字段名默认'value'
category_value: 默认类别值若无字段则所有特征赋此值默认1
"""
try:
# 检查输入文件是否存在
if not os.path.exists(input_shp_path):
raise FileNotFoundError(f"输入文件 {input_shp_path} 不存在")
# 读取矢量文件的元信息以确定范围和CRS
print("正在读取矢量元信息...")
gdf_meta = gpd.read_file(input_shp_path, rows=1) # 只读一行获取元数据
base_crs = gdf_meta.crs
bounds = gdf_meta.total_bounds # [minx, miny, maxx, maxy]
# 计算栅格尺寸
width = int((bounds[2] - bounds[0]) / raster_resolution)
height = int((bounds[3] - bounds[1]) / raster_resolution)
transform = rasterio.transform.from_bounds(bounds[0], bounds[1], bounds[2], bounds[3], width, height)
# 初始化输出栅格文件
print(f"初始化输出栅格文件: {output_raster_path}")
with rasterio.open(
output_raster_path,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype=rasterio.uint8,
crs=base_crs,
transform=transform,
nodata=0 # 背景值为0
) as dst:
# 分窗口处理
for row_offset in range(0, height, window_size):
for col_offset in range(0, width, window_size):
window_height = min(window_size, height - row_offset)
window_width = min(window_size, width - col_offset)
window = Window(col_offset, row_offset, window_width, window_height)
# 计算当前窗口的地理范围
window_transform = rasterio.windows.transform(window, transform)
window_bounds = rasterio.windows.bounds(window, transform)
# 创建窗口的边界几何,用于筛选矢量数据
window_poly = gpd.GeoSeries.from_wkt([f"POLYGON(({window_bounds[0]} {window_bounds[1]}, "
f"{window_bounds[2]} {window_bounds[1]}, "
f"{window_bounds[2]} {window_bounds[3]}, "
f"{window_bounds[0]} {window_bounds[3]}, "
f"{window_bounds[0]} {window_bounds[1]}))"]).iloc[0]
# 分块读取矢量数据并筛选当前窗口内的特征
print(f"处理窗口: row={row_offset}, col={col_offset}")
chunk_raster = np.zeros((window_height, window_width), dtype=rasterio.uint8)
chunk_iterator = gpd.read_file(input_shp_path, chunksize=chunk_size)
for chunk_idx, chunk_gdf in enumerate(chunk_iterator):
# 筛选与当前窗口相交的特征
window_gdf = chunk_gdf[chunk_gdf.intersects(window_poly)]
if len(window_gdf) == 0:
continue
print(f" - 处理分块 {chunk_idx},筛选出 {len(window_gdf)} 个特征")
# 如果没有指定category_field则赋默认值
if category_field not in window_gdf.columns:
window_gdf['value'] = category_value
# 栅格化当前分块
shapes = ((mapping(geom), value) for geom, value in zip(window_gdf.geometry, window_gdf[category_field]))
temp_raster = rasterize(
shapes=shapes,
out_shape=(window_height, window_width),
transform=window_transform,
fill=0,
dtype=rasterio.uint8
)
# 更新窗口栅格保留非0值
mask = temp_raster > 0
chunk_raster[mask] = temp_raster[mask]
# 写入当前窗口到栅格文件
if np.any(chunk_raster > 0):
dst.write(chunk_raster, 1, window=window)
print("处理完成!")
print(f"栅格保存为: {output_raster_path}")
print(f"栅格中类别值: 0=背景, 其他值由 {category_field} 字段或默认值 {category_value} 确定")
except Exception as e:
print(f"发生错误: {str(e)}")
# 使用示例
if __name__ == "__main__":
input_shp = r"E:\large_data\large_vector.shp" # 替换为您的2GB矢量文件路径
output_raster = r"E:\large_data\large_raster.tif" # 输出栅格路径
large_vector_to_raster(
input_shp_path=input_shp,
output_raster_path=output_raster,
raster_resolution=30, # 分辨率30米
chunk_size=1000, # 每次读取1000个特征
window_size=1000, # 每个窗口1000x1000像素
category_field='value', # 假设矢量数据中有'value'字段,若无则用默认值
category_value=1 # 默认类别值
)

View File

View File

@ -1,12 +0,0 @@
{
"default_terrain_complexity": {
"耕地": 1.1,
"裸地": 1.1,
"草地": 1.2,
"灌木": 1.4,
"湿地": 1.65,
"林地": 1.65,
"建筑": 1.35,
"水域": 1.35
}
}

102
readme.md
View File

@ -1,102 +0,0 @@
# 文件夹说明
### 【PV文件夹】
- 分为了**两个运行代码**pv_total2.py和pv_total3.py查看说明.txt文件。
#### **pv_total2.py**
- **输入参数**
- 请输入纬度(-90 到 90例如 39.904239
- 请输入经度(-180 到 180例如 116.4074116
- 组件名称TWMND-72HD580
- 电价0.6 元/kWh
- 光伏支架:固定式
- 是否优化倾角和方位角:是
- **输出结果**
- 组件名称TWMND-72HD580
- 倾角24.59° | 方位角60.00°
- 阵列间距5.56 米
- 单个组件最大功率Wp580
- 装机容量696.00 kW
- 峰值日照小时数3.99 小时/天
- 一天单个组件发电量2 kWh
- 年发电量810,000 kWh
- 等效小时数1164 小时
- **环境收益**
- 标准煤减排量3 kg
- CO₂减排量8 kg
- SO₂减排量0 kg
- NOx减排量0 kg
- 内部收益率 IRR22.39%
#### **pv_total3.py**
- **输入参数**
- 请输入纬度(-90 到 90例如 39.904239
- 请输入经度(-180 到 180例如 116.4074116
- 组件名称TWMND-72HD580
- 电价0.6 元/kWh
- 光伏支架:固定式
- 是否优化倾角和方位角:是
- **输出结果**
- 组件名称TWMND-72HD580
- 倾角32.00° | 方位角60.00°
- 阵列间距5.57 米
- 单个组件最大功率Wp580
- 装机容量696.00 kW
- 峰值日照小时数4.10 小时/天
- 一天单个组件发电量2 kWh
- 年发电量33,251 kWh
- 等效小时数1197 小时
- **环境收益**
- 标准煤减排量3 kg
- CO₂减排量8 kg
- SO₂减排量0 kg
- NOx减排量0 kg
- 内部收益率 IRR23.00%
- **解释说明**
-两个代码执行看文件夹中的说明(根据需要使用)。倾角和峰值日照时数不同。
- 组件名称:从 `pv_product.xlsx` 中获取
- 光伏支架:分为固定式和跟踪式
- 是否优化:如果优化,则按照 `倾角_峰值小时数.xlsx` 中的数据
- 固定式/跟踪式 + 优化/不优化可以随意组合
- 光伏个数:暂时按照默认值 1200区域内可安装光伏个数后续韩耀朋优化
---
### 【Wind文件夹】
#### **temperature.txt**
- 某地区12个月份的平均气温后续需要替换为实际温度
#### **wind_product.xlsx**
- 含有风机设备信息,例如设备名称、额定功率、叶轮直径、扫风面积和轮毂高度等
#### **wind_speed.txt**
- 某地区12个月的平均风速后续需要替换为实际风速
#### **wind_total.py**
- **风机相关封装函数**
- **注意**
- 需要输入:设备名称、风电场面积(后续可替换为预测值)、当地电价
- **示例**
- 输入:
- 请输入设备名称: GWH191-4.0
- 请输入风电场面积(平方公里): 10
- 请输入电价(元/kWh: 0.6
- 输出:
- 找到设备 'GWH191-4.0',额定功率: 4.0 MW, 扫风面积: 28652 m², 叶片直径: 191 米
- 单台风机占地面积: 1,824,050 平方米 (横向间距: 955 米, 纵向间距: 1910 米)
- 估算风机数量: 5 台
- 设备: GWH191-4.0
- 额定功率: 4.00 MW
- 扫风面积: 28652.00 m^2
- 叶片直径: 191.00 m
- 风机数量: 5 台
- 平均空气密度: 1.205 kg/m^3
- 风功率密度: 63.93 W/m^2
- 项目装机容量: 20.00 MW
- 年发电量: 2888.404 万 kWh
- 等效小时数: 7221.01 小时
- 内部收益率 IRR: 18.70%

View File

@ -1,9 +0,0 @@
numpy==1.22.0
pandas==1.5.3
scikit_learn==1.2.1
xlrd==2.0.1
logzero==1.7.0
scipy==1.11.4
flask==3.1.0
requests==2.31.0
openpyxl==3.1.2

90
run.py
View File

@ -1,90 +0,0 @@
# -*-coding:utf-8-*-
import os
import json
from flask import Flask, request, make_response, jsonify
from logzero import logger
current_path = os.path.dirname(os.path.abspath(__file__)) # for local
wind_product_path = f"{current_path}/wind/wind_product.xlsx"
# current_path = "/app" # for docker
logger.info(f"{current_path}")
app = Flask(__name__)
from pv.eva_pv import calculate_pv_potential, get_slope_from_api
from wind.wind_total import wind_farm_analysis
terrain_config = {
"耕地": 1.1, "裸地": 1.1, "草地": 1.2, "灌木": 1.4,
"湿地": 1.65, "林地": 1.65, "建筑": 1.35, "水域": 1.35
}
@app.route('/pv_power/', methods=["POST"])
def get_pv_potential():
resp_info = dict()
if request.method == "POST":
logger.info(request.get_json())
latitude = request.json.get('latitude')
longitude = request.json.get('longitude')
available_area_sq_km = float(request.json.get('available_area_sq_km'))
pv_type = request.json.get('pv_type')
terrain_type = request.json.get('terrain_type')
component_name = request.json.get('component_name')
electricity_price = float(request.json.get('electricity_price'))
terrain_complexity = terrain_config.get(terrain_type)
try:
# 通过 API 获取坡度
slope_deg = get_slope_from_api(latitude, longitude)
logger.info(f"使用参数:坡度={slope_deg:.2f}°,地形复杂性因子={terrain_complexity}")
pv_potential = calculate_pv_potential(
available_area_sq_km=available_area_sq_km,
component_name=component_name,
longitude=longitude,
latitude=latitude,
slope_deg=slope_deg,
terrain_complexity=terrain_complexity,
pv_type=pv_type,
terrain_type=terrain_type,
electricity_price=electricity_price
)
resp_info["code"] = 200
resp_info["data"] = pv_potential
except Exception as e:
logger.info(e)
resp_info["code"] = 406
resp_info["data"] = str(e)
resp = make_response(json.dumps(resp_info))
resp.status_code = 200
return resp
@app.route('/wind_power/', methods=["POST"])
def get_wind_potential():
resp_info = dict()
if request.method == "POST":
area_km2 = float(request.json.get('available_area_sq_km'))
device_name = request.json.get('component_name')
electricity_price = float(request.json.get('electricity_price'))
v_avg = float(request.json.get("velocity_avg"))
t_avg = float(request.json.get("temp_avg"))
try:
wind_potential = wind_farm_analysis(
device_name=device_name,
area_km2=area_km2,
electricity_price=electricity_price,
file_path=wind_product_path,
velocity_avg=v_avg,
T_avg=t_avg
)
resp_info["code"] = 200
resp_info["data"] = wind_potential
except Exception as e:
logger.info(e)
resp_info["code"] = 406
resp_info["data"] = str(e)
resp = make_response(json.dumps(resp_info))
resp.status_code = 200
return resp
if __name__ == '__main__':
app.run(port=12123, host="0.0.0.0", debug=False)

View File

View File

@ -1,254 +0,0 @@
import pandas as pd
import math
def wind_farm_analysis(device_name, area_km2, file_path, avg_temp, avg_wind_speed,
lateral_spacing_factor=5, longitudinal_spacing_factor=10, altitude=11,
hub_height=100, Cp=0.45, eta=0.8):
"""
封装函数分析风电场的风机数量及各项经济和技术指标直接输入年平均气温和年平均风速
参数
device_name (str): 风力发电机型号名称
area_km2 (float): 风电场面积平方公里
file_path (str): 包含风机参数的Excel文件路径
avg_temp (float): 年平均气温摄氏度
avg_wind_speed (float): 年平均风速m/s
lateral_spacing_factor (float): 横向间距因子默认为5倍叶片直径5D
longitudinal_spacing_factor (float): 纵向间距因子默认为10倍叶片直径10D
altitude (float): 海拔高度m默认11m
hub_height (float): 轮毂高度m默认100m
Cp (float): 风能利用系数功率系数默认0.45反映风能转换效率
eta (float): 总系统效率包括机械和电气效率默认0.8
返回
dict: 包含风电场分析结果的字典包括装机容量发电量环境效益等
"""
def estimate_wind_turbine_count(area_km2, blade_diameter):
"""
估算风电场可容纳的风机数量基于面积和风机间距
参数
area_km2 (float): 风电场面积平方公里
blade_diameter (float): 风机叶片直径m
返回
int: 估算的风机数量
"""
# 将面积从平方公里转换为平方米
area_m2 = area_km2 * 1_000_000
# 计算横向和纵向间距(以叶片直径为单位)
lateral_spacing = lateral_spacing_factor * blade_diameter
longitudinal_spacing = longitudinal_spacing_factor * blade_diameter
# 单台风机占地面积 = 横向间距 * 纵向间距
turbine_area = lateral_spacing * longitudinal_spacing
# 风机数量 = 总面积 / 单台风机占地面积(取整数)
turbine_count = int(area_m2 / turbine_area)
print(f"单台风机占地面积: {turbine_area:,} 平方米 "
f"(横向间距: {lateral_spacing} 米, 纵向间距: {longitudinal_spacing} 米)")
print(f"估算风机数量: {turbine_count}")
return turbine_count
def get_wind_turbine_specs(device_name, file_path):
"""
从Excel文件中获取指定风机的参数
参数
device_name (str): 风机型号名称
file_path (str): Excel文件路径
返回
tuple: 额定功率kW扫风面积叶片直径m
"""
try:
# 读取Excel文件
df = pd.read_excel(file_path)
# 查找匹配的设备名称
match = df[df.iloc[:, 0] == device_name]
if not match.empty:
rated_power = match.iloc[0, 1] # 额定功率kW
swept_area = match.iloc[0, 7] # 扫风面积
blade_diameter = match.iloc[0, 6] # 叶片直径m
print(f"找到设备 '{device_name}',额定功率: {rated_power} KW, "
f"扫风面积: {swept_area} m², 叶片直径: {blade_diameter}")
return rated_power, swept_area, blade_diameter
else:
raise ValueError(f"未找到设备名称: {device_name}")
except FileNotFoundError:
raise FileNotFoundError(f"文件未找到: {file_path}")
except Exception as e:
raise Exception(f"发生错误: {str(e)}")
def air_density(altitude, hub_height, T0):
"""
计算空气密度考虑海拔和轮毂高度的影响
参数
altitude (float): 海拔高度m
hub_height (float): 轮毂高度m
T0 (float): 地面平均气温摄氏度
返回
float: 空气密度kg/
公式
ρ = (353.05 / T) * exp(-0.034 * (z / T))
其中 T = T0 - LR * z + 273.15T0为地面温度LR为温度递减率z为总高度
"""
# 计算总高度(海拔 + 轮毂高度)
z = altitude + hub_height
# 温度递减率lapse rate每升高1米温度降低0.0065°C
LR = 0.0065
# 计算绝对温度K考虑高度引起的温度变化
T = T0 - LR * z + 273.15
# 计算空气密度
return (353.05 / T) * math.exp(-0.034 * (z / T))
def wind_power_density(density, velocity_avg):
"""
计算风功率密度单位面积的风能功率
参数
density (float): 空气密度kg/
velocity_avg (float): 平均风速m/s
返回
float: 风功率密度W/
公式
P = 0.5 * ρ * 按照年平均风速来算
"""
return 0.5 * density * velocity_avg**3
def estimated_wind_power(num_turbines, rated_power):
"""
计算风电场总装机容量
参数
num_turbines (int): 风机数量
rated_power (float): 单台风机额定功率kW
返回
float: 总装机容量kW
"""
if not isinstance(num_turbines, int) or num_turbines < 0:
raise ValueError("风机数量必须为非负整数")
return rated_power * num_turbines
def calculate_power_output(S, w, Cp, eta, num_turbines):
"""
计算风电场年发电量
参数
S (float): 扫风面积
w (float): 风功率密度W/
Cp (float): 风能利用系数
eta (float): 系统效率
num_turbines (int): 风机数量
返回
float: 年发电量Wh
公式
E = w * S * Cp * 8760 * η * N (N为风机个数
其中 8760 为一年小时数
"""
return w * S * Cp * 8760 * eta * num_turbines
def calculate_equivalent_hours(P, P_r):
"""
计算等效满负荷小时数
参数
P (float): 年发电量Wh
P_r (float): 单台风机额定功率kW
返回
float: 等效小时数小时
"""
if P_r == 0:
raise ValueError("额定功率不能为 0")
return (P / 1000) / P_r
def calculate_environmental_benefits(E_p_million_kwh):
"""
计算环境效益减排量
参数
E_p_million_kwh (float): 年发电量万kWh
返回
dict: 包含标准煤CO₂SO₂NOx减排量的字典
假设
每万kWh可节约标准煤0.404减排CO₂ 0.977SO₂ 0.03NOx 0.015
"""
if E_p_million_kwh < 0:
raise ValueError("年发电量需≥0")
return {
"coal_reduction": E_p_million_kwh * 0.404 * 10, # kg
"CO2_reduction": E_p_million_kwh * 0.977 * 10, # kg
"SO2_reduction": E_p_million_kwh * 0.03 * 10, # kg
"NOX_reduction": E_p_million_kwh * 0.015 * 10 # kg
}
# 获取风机参数
rated_power, swept_area, blade_diameter = get_wind_turbine_specs(device_name, file_path)
# 估算风机数量
num_turbines = estimate_wind_turbine_count(area_km2, blade_diameter)
# 计算空气密度
avg_density = air_density(altitude, hub_height, avg_temp)
# 计算风功率密度W/m²
wpd = wind_power_density(avg_density, avg_wind_speed)
# 计算总装机容量kW
total_power = estimated_wind_power(num_turbines, rated_power)
# 计算年发电量Wh
P_test = calculate_power_output(swept_area, wpd, Cp, eta, num_turbines)
# 计算等效满负荷小时数
h = calculate_equivalent_hours(P_test, rated_power)
# 转换为万kWh以计算环境效益
E_p_million_kwh = P_test / 10000000
env_benefits = calculate_environmental_benefits(E_p_million_kwh)
# 返回结果字典
return {
"device": device_name,
"rated_power": rated_power,
"swept_area": swept_area,
"blade_diameter": blade_diameter,
"num_turbines": num_turbines,
"avg_density": avg_density,
"wpd": wpd,
"total_power": total_power / 1000, # 转换为MW
"annual_power_output": P_test / 10000000, # 转换为万kWh
"equivalent_hours": h,
"coal_reduction": env_benefits["coal_reduction"],
"CO2_reduction": env_benefits["CO2_reduction"],
"SO2_reduction": env_benefits["SO2_reduction"],
"NOX_reduction": env_benefits["NOX_reduction"]
}
# 主程序
if __name__ == "__main__":
# 定义输入参数
file_path = r".\wind_product.xlsx" # 风机参数文件路径
device_name = 'GW165-4.0' # 风机型号
area_km2 = 10 # 风电场面积(平方公里)
avg_temp = 13.0 # 年平均气温(摄氏度)
avg_wind_speed = 6 # 年平均风速m/s
# 调用风电场分析函数
result = wind_farm_analysis(
device_name=device_name,
area_km2=area_km2,
file_path=file_path,
avg_temp=avg_temp,
avg_wind_speed=avg_wind_speed
)
# 输出结果
print(f"\n设备: {result['device']}")
print(f"额定功率: {result['rated_power']:.2f} KW")
print(f"扫风面积: {result['swept_area']:.2f} m^2")
print(f"叶片直径: {result['blade_diameter']:.2f} m")
print(f"风机数量: {result['num_turbines']}")
print(f"平均空气密度: {result['avg_density']:.3f} kg/m^3")
print(f"风功率密度: {result['wpd']:.2f} W/m^2")
print(f"项目装机容量: {result['total_power']:.2f} MW")
print(f"年发电量: {result['annual_power_output']:.3f} 万 kWh")
print(f"等效小时数: {result['equivalent_hours']:.2f} 小时")
print(f"标准煤减排量:{result['coal_reduction']:,.0f} kg")
print(f"CO₂减排量{result['CO2_reduction']:,.0f} kg")
print(f"SO₂减排量{result['SO2_reduction']:,.0f} kg")
print(f"NOx减排量{result['NOX_reduction']:,.0f} kg")

Binary file not shown.

View File

@ -1,204 +0,0 @@
import pandas as pd
import math
from scipy.optimize import fsolve
import os
import sys
# 获取当前文件的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
print(current_dir)
# 添加当前目录到sys.path
sys.path.append(current_dir)
def wind_farm_analysis(device_name, area_km2, electricity_price, file_path, velocity_avg, T_avg,
lateral_spacing_factor=5, longitudinal_spacing_factor=10, q=0.02, altitude=11,
hub_height=100, Cp=0.45, eta=0.8, cost_per_mw=5000):
"""
封装函数分析风电场的风机数量及各项经济和技术指标
参数
device_name (str): 设备名称
area_km2 (float): 风电场面积平方公里
electricity_price (float): 电价/kWh
file_path (str): 风机参数 Excel 文件路径
velocity_avg (float): 全年平均风速
T_path (str): 全年平均温度
lateral_spacing_factor (float): 横向间距因子默认为 5D
longitudinal_spacing_factor (float): 纵向间距因子默认为 10D
q (float): 运维成本占初始投资成本的比例默认 0.02 表示 2%
altitude (float): 海拔高度m默认 11m
hub_height (float): 轮毂高度m默认 100m
Cp (float): 风能利用系数默认 0.45
eta (float): 总系统效率默认 0.8
cost_per_mw (float): MW 投资成本万元/MW默认 5000 万元/MW
返回
dict: 包含风电场分析结果的字典
"""
def estimate_wind_turbine_count(area_km2, blade_diameter):
area_m2 = area_km2 * 1_000_000
lateral_spacing = lateral_spacing_factor * blade_diameter
longitudinal_spacing = longitudinal_spacing_factor * blade_diameter
turbine_area = lateral_spacing * longitudinal_spacing
turbine_count = int(area_m2 / turbine_area)
print(f"单台风机占地面积: {turbine_area:,} 平方米 "
f"(横向间距: {lateral_spacing} 米, 纵向间距: {longitudinal_spacing} 米)")
print(f"估算风机数量: {turbine_count}")
return turbine_count
def get_wind_turbine_specs(device_name, file_path):
try:
df = pd.read_excel(file_path)
match = df[df.iloc[:, 0] == device_name]
if not match.empty:
rated_power = match.iloc[0, 1]
swept_area = match.iloc[0, 7] # 扫风面积
blade_diameter = match.iloc[0, 6] # 叶片直径
print(f"找到设备 '{device_name}',额定功率: {rated_power} kW, "
f"扫风面积: {swept_area} m², 叶片直径: {blade_diameter}")
return rated_power, swept_area, blade_diameter
else:
raise ValueError(f"未找到设备名称: {device_name}")
except FileNotFoundError:
raise FileNotFoundError(f"文件未找到: {file_path}")
except Exception as e:
raise Exception(f"发生错误: {str(e)}")
def air_density(altitude, hub_height, T0):
z = altitude + hub_height
LR = 0.0065
T = T0 - LR * z + 273.15
return (353.05 / T) * math.exp(-0.034 * (z / T))
def wind_power_density(densities, velocity_avg):
rho_v3 = densities * velocity_avg
return 0.5 * rho_v3
def estimated_wind_power(num_turbines, rated_power):
if not isinstance(num_turbines, int) or num_turbines < 0:
raise ValueError("风机数量必须为非负整数")
return rated_power * num_turbines
def calculate_power_output(S, w, Cp, eta):
# 瓦时
return w * S * Cp * 8760 * eta
def calculate_equivalent_hours(P, P_r):
if P_r == 0:
raise ValueError("额定功率不能为 0")
return (P / 1000) / P_r
def calculate_environmental_benefits(E_p_million_kwh):
"""计算环境收益"""
if E_p_million_kwh < 0:
raise ValueError("年发电量需≥0")
return {
"coal_reduction": E_p_million_kwh * 0.404 * 10,
"CO2_reduction": E_p_million_kwh * 0.977 * 10,
"SO2_reduction": E_p_million_kwh * 0.03 * 10,
"NOX_reduction": E_p_million_kwh * 0.015 * 10
}
def calculate_reference_yield(E_p, electricity_price, IC, q, n=20):
def npv_equation(irr, p, w, ic, q_val, n=n):
term1 = (1 + irr) ** (-1)
term2 = irr * (1 + irr) ** (-1) if irr != 0 else float('inf')
pv_revenue = p * w * (term1 / term2) * (1 - (1 + irr) ** (-n))
pv_salvage = q_val * ic * (term1 / term2) * (1 - (1 + irr) ** (-n))
return pv_revenue - ic + pv_salvage
irr_guess = 0.1
irr = float(fsolve(npv_equation, irr_guess, args=(E_p, electricity_price, IC, q))[0])
if not 0 <= irr <= 1:
raise ValueError(f"IRR计算结果{irr:.4f}不合理")
return irr * 100
# 获取设备信息
rated_power, swept_area, blade_diameter = get_wind_turbine_specs(device_name, file_path)
# 估算风机数量
num_turbines = estimate_wind_turbine_count(area_km2, blade_diameter)
# 读取温度数据并计算空气密度
avg_density = air_density(altitude, hub_height, T_avg)
# 计算风功率密度
wpd = wind_power_density(avg_density, velocity_avg)
# 计算装机容量
total_power = estimated_wind_power(num_turbines, rated_power)
# 计算初始投资成本
IC = total_power * cost_per_mw * 1000000
# 计算年发电量 kwh
P_test = calculate_power_output(swept_area, wpd, Cp, eta) * num_turbines
# 计算等效小时数
h = calculate_equivalent_hours(P_test, rated_power)
# 计算 IRR
P_test_IRR = P_test/1000
irr = calculate_reference_yield(P_test_IRR, electricity_price, IC, q)
env_benefits = calculate_environmental_benefits((P_test / 10000000))
# 返回结果
out = {
"device": device_name,
"rated_power": rated_power,
"swept_area": swept_area,
"blade_diameter": blade_diameter,
"num_turbines": num_turbines,
"avg_density": avg_density,
"wpd": wpd,
"total_power": total_power,
"annual_power_output": P_test / 10000000, # 万 kWh
"equivalent_hours": h,
"IRR": irr
}
out.update(env_benefits)
return out
# 主程序
if __name__ == "__main__":
file_path = f"{current_dir}/wind_product.xlsx"
v_avg = 4.2
tavg = 15
device_name = "GW165-5.2"
area_km2 = 23.2
electricity_price = 0.45
result = wind_farm_analysis(
device_name=device_name,
area_km2=area_km2,
electricity_price=electricity_price,
file_path=file_path,
velocity_avg=v_avg,
T_avg=tavg
)
print(result)
"""
{
"code": 200,
"data": {
"device": "GW165-5.2",
"rated_power": 5.2,
"swept_area": 21382,
"blade_diameter": 165,
"num_turbines": 23,
"avg_density": 1.2118668826686871,
"wpd": 1.9995803564033336,
"total_power": 119.60000000000001,
"annual_power_output": 310.11418354861905,
"equivalent_hours": 596.3734299011904,
"IRR": 9.985793133871693,
"coal_reduction": 12528.61301536421,
"CO2_reduction": 30298.155732700077,
"SO2_reduction": 930.342550645857,
"NOX_reduction": 465.1712753229285
}
}
"""