# -*- coding:utf-8 -*-
import json
import rasterio
from uuid import uuid1
from rasterio.crs import CRS
from rasterio.transform import from_origin
from rasterio.features import shapes
from rasterio.mask import mask
from typing import Union, List, Dict
import shapely.ops
from shapely.geometry import Polygon, MultiPoint, box
import geopandas as gpd
import random
import numpy as np
class PolygonValPixelCounter:
"""一個用于統(tǒng)計多邊形范圍內柵格圖像象元個數的類"""
def __init__(self, raster_path: str):
"""
初始化類
Args:
raster_path (str): 柵格文件路徑
"""
self.raster_path = raster_path
self.raster = None
self.nodata = 255 # 明確指定nodata值為255
self._load_raster()
def _load_raster(self) -> None:
"""加載柵格數據"""
try:
self.raster = rasterio.open(self.raster_path)
except Exception as e:
raise ValueError(f"無法加載柵格文件: {str(e)}")
def count_pixels(self,
polygons: Union[Dict, List, gpd.GeoDataFrame],
band: int = 1) -> int:
"""
統(tǒng)計多邊形范圍內的有效象元個數
Args:
polygon: 多邊形幾何
band (int): 要統(tǒng)計的波段,默認為1
Returns:
int: 有效象元個數
"""
if self.raster is None:
raise ValueError("柵格數據未加載")
try:
# 裁剪多邊形范圍內的柵格數據,指定nodata為255
res = []
for ig in polygons:
out_image, _ = mask(
self.raster,
[ig],
crop=True,
nodata=self.nodata # 使用指定的nodata值255
)
# 獲取指定波段數據并統(tǒng)計有效象元個數
data = out_image[band-1]
valid_pixels = data[data != self.nodata] # 使用255作為nodata值
res.append(int(valid_pixels.size))
return res
except Exception as e:
raise ValueError(f"統(tǒng)計失敗: {str(e)}")
def close(self) -> None:
"""關閉柵格文件"""
if self.raster is not None:
self.raster.close()
self.raster = None
def __del__(self):
"""析構函數,確保文件被關閉"""
self.close()
def split_polygon(polygon, num_parts):
"""
將一個 polygon 隨機拆分為指定數量的小塊。
參數:
polygon (shapely.geometry.Polygon): 要拆分的多邊形。
num_parts (int): 拆分的小塊數量。
返回:
list: 包含拆分后的多邊形的列表。
"""
# 獲取多邊形的邊界框
min_x, min_y, max_x, max_y = polygon.bounds
# 生成隨機點
points = []
for _ in range(num_parts):
while True:
x = random.uniform(min_x, max_x)
y = random.uniform(min_y, max_y)
point = shapely.geometry.Point(x, y)
if polygon.contains(point):
points.append(point)
break
# 添加多邊形的中心點
points.append(polygon.centroid)
# 使用 Voronoi 圖將多邊形拆分為小塊
voronoi_diagram = shapely.ops.voronoi_diagram(shapely.geometry.MultiPoint(points))
polygons = [polygon.intersection(cell) for cell in voronoi_diagram.geoms]
return polygons
def raster_to_polygon(raster_file):
"""
提取柵格圖像的范圍
參數:raster_file (str): 柵格文件路徑。
返回:list: 包含多邊形特征的列表。
"""
with rasterio.open(raster_file) as src:
result = box(src.bounds.left, src.bounds.bottom, src.bounds.right, src.bounds.top)
return result
def create_geotiff(data, output_file, crs_epsg, origin_x, origin_y, pixel_size, nodata=None):
"""
創(chuàng)建一個帶空間參考的 GeoTIFF 文件。
參數:
data (numpy.ndarray): 柵格數據,二維數組。
output_file (str): 輸出的 GeoTIFF 文件路徑。
crs_epsg (int): 坐標系的 EPSG 代碼。
origin_x (float): 左上角的 X 坐標。
origin_y (float): 左上角的 Y 坐標。
pixel_size (float): 像元大小。
nodata (float, optional): 無數據值,默認為 None。
"""
# 設置空間參考信息
crs = CRS.from_epsg(crs_epsg)
# 設置仿射變換
transform = from_origin(origin_x, origin_y, pixel_size, pixel_size)
# 設置其他元數據
metadata = {
'driver': 'GTiff', # 驅動程序,指定為 GeoTIFF
'dtype': data.dtype, # 數據類型
'nodata': nodata, # 無數據值
'width': data.shape[1], # 寬度(列數)
'height': data.shape[0], # 高度(行數)
'count': 1, # 波段數
'crs': crs, # 坐標參考系統(tǒng)
'transform': transform # 仿射變換
}
# 保存到 GeoTIFF 文件
with rasterio.open(output_file, 'w', **metadata) as dst:
dst.write(data, 1) # 寫入數據到第一個波段
print(f'GeoTIFF 文件已保存到 {output_file}')
# 示例調用
if __name__ == "__main__":
# 示例數據:一個簡單的二維數組
data = np.random.randint(1, 100, (100,100)) # 隨機生成一個 100x100 的數組,值范圍為 0 到 100
out_file = F'tmpfile/example_{str(uuid1())[:8]}.tif'
# 調用函數創(chuàng)建 GeoTIFF 文件
create_geotiff(
data=data,
output_file=out_file,
crs_epsg=4326, # WGS84
origin_x=113.475, # 左上角 X 坐標
origin_y=34.85, # 左上角 Y 坐標
pixel_size=0.001, # 像元大小
nodata=0 # 無數據值
)
gg = raster_to_polygon(out_file)
gs = split_polygon(gg, num_parts=5)
# gs = gpd.GeoSeries(gs, crs=4326).to_file(out_file.replace('.tif', '.geojson'))
pvpc = PolygonValPixelCounter(out_file)
res = pvpc.count_pixels(gs)
print(res)