import geopandas as gpd import pandas as pd import rasterio from rasterio.features import rasterize from rasterio.windows import Window import numpy as np from shapely.geometry import mapping import os def large_vector_to_raster(input_shp_path, output_raster_path, raster_resolution=30, chunk_size=1000, window_size=1000, category_field='value', category_value=1): """ 将大型面要素转为栅格数据,分块读取矢量并按窗口写入栅格,适用于大文件(例如2GB) 参数: input_shp_path: 输入shapefile路径 output_raster_path: 输出栅格文件路径 raster_resolution: 输出栅格分辨率(单位:米,默认30) chunk_size: 每次读取的矢量特征数量(默认1000) window_size: 每个栅格窗口的像素大小(默认1000x1000) category_field: 矢量数据中用于栅格化的字段名(默认'value') category_value: 默认类别值(若无字段,则所有特征赋此值,默认1) """ try: # 检查输入文件是否存在 if not os.path.exists(input_shp_path): raise FileNotFoundError(f"输入文件 {input_shp_path} 不存在") # 读取矢量文件的元信息以确定范围和CRS print("正在读取矢量元信息...") gdf_meta = gpd.read_file(input_shp_path, rows=1) # 只读一行获取元数据 base_crs = gdf_meta.crs bounds = gdf_meta.total_bounds # [minx, miny, maxx, maxy] # 计算栅格尺寸 width = int((bounds[2] - bounds[0]) / raster_resolution) height = int((bounds[3] - bounds[1]) / raster_resolution) transform = rasterio.transform.from_bounds(bounds[0], bounds[1], bounds[2], bounds[3], width, height) # 初始化输出栅格文件 print(f"初始化输出栅格文件: {output_raster_path}") with rasterio.open( output_raster_path, 'w', driver='GTiff', height=height, width=width, count=1, dtype=rasterio.uint8, crs=base_crs, transform=transform, nodata=0 # 背景值为0 ) as dst: # 分窗口处理 for row_offset in range(0, height, window_size): for col_offset in range(0, width, window_size): window_height = min(window_size, height - row_offset) window_width = min(window_size, width - col_offset) window = Window(col_offset, row_offset, window_width, window_height) # 计算当前窗口的地理范围 window_transform = rasterio.windows.transform(window, transform) window_bounds = rasterio.windows.bounds(window, transform) # 创建窗口的边界几何,用于筛选矢量数据 window_poly = gpd.GeoSeries.from_wkt([f"POLYGON(({window_bounds[0]} {window_bounds[1]}, " f"{window_bounds[2]} {window_bounds[1]}, " f"{window_bounds[2]} {window_bounds[3]}, " f"{window_bounds[0]} {window_bounds[3]}, " f"{window_bounds[0]} {window_bounds[1]}))"]).iloc[0] # 分块读取矢量数据并筛选当前窗口内的特征 print(f"处理窗口: row={row_offset}, col={col_offset}") chunk_raster = np.zeros((window_height, window_width), dtype=rasterio.uint8) chunk_iterator = gpd.read_file(input_shp_path, chunksize=chunk_size) for chunk_idx, chunk_gdf in enumerate(chunk_iterator): # 筛选与当前窗口相交的特征 window_gdf = chunk_gdf[chunk_gdf.intersects(window_poly)] if len(window_gdf) == 0: continue print(f" - 处理分块 {chunk_idx},筛选出 {len(window_gdf)} 个特征") # 如果没有指定category_field,则赋默认值 if category_field not in window_gdf.columns: window_gdf['value'] = category_value # 栅格化当前分块 shapes = ((mapping(geom), value) for geom, value in zip(window_gdf.geometry, window_gdf[category_field])) temp_raster = rasterize( shapes=shapes, out_shape=(window_height, window_width), transform=window_transform, fill=0, dtype=rasterio.uint8 ) # 更新窗口栅格(保留非0值) mask = temp_raster > 0 chunk_raster[mask] = temp_raster[mask] # 写入当前窗口到栅格文件 if np.any(chunk_raster > 0): dst.write(chunk_raster, 1, window=window) print("处理完成!") print(f"栅格保存为: {output_raster_path}") print(f"栅格中类别值: 0=背景, 其他值由 {category_field} 字段或默认值 {category_value} 确定") except Exception as e: print(f"发生错误: {str(e)}") # 使用示例 if __name__ == "__main__": input_shp = r"E:\large_data\large_vector.shp" # 替换为您的2GB矢量文件路径 output_raster = r"E:\large_data\large_raster.tif" # 输出栅格路径 large_vector_to_raster( input_shp_path=input_shp, output_raster_path=output_raster, raster_resolution=30, # 分辨率30米 chunk_size=1000, # 每次读取1000个特征 window_size=1000, # 每个窗口1000x1000像素 category_field='value', # 假设矢量数据中有'value'字段,若无则用默认值 category_value=1 # 默认类别值 )