389 lines
17 KiB
Python
389 lines
17 KiB
Python
|
import numpy as np
|
|||
|
import copy
|
|||
|
import cv2 as cv
|
|||
|
from tqdm import tqdm
|
|||
|
import os
|
|||
|
import torch
|
|||
|
import torch.nn.functional as F
|
|||
|
from PIL import Image
|
|||
|
import uuid
|
|||
|
from pathlib import Path
|
|||
|
|
|||
|
class segmentation():
|
|||
|
|
|||
|
def __init__(self, model_name="xx"):
|
|||
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|||
|
self.base_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
self.model_name = model_name
|
|||
|
self.palette = [0, 0, 0, 255, 0, 0, 0, 255, 0]
|
|||
|
self.fusin = True
|
|||
|
|
|||
|
def create_tmp_path(self, path):
|
|||
|
folder_name = str(uuid.uuid4())
|
|||
|
folder_path = os.path.join(path,'tmp', folder_name)
|
|||
|
os.makedirs(folder_path)
|
|||
|
return folder_path
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 图片分割识别结果
|
|||
|
def start_segmentation(self,path,net):
|
|||
|
assert os.path.isfile(path)
|
|||
|
image = Image.open(path)
|
|||
|
orininal_h = image.size[1]
|
|||
|
orininal_w = image.size[0]
|
|||
|
item = {"orininal_h": orininal_h, "orininal_w": orininal_w, "image": image}
|
|||
|
image = item["image"]
|
|||
|
orininal_w = item["orininal_w"]
|
|||
|
orininal_h = item["orininal_h"]
|
|||
|
old_img = copy.deepcopy(image)
|
|||
|
imaged = cv.resize(np.array(image), dsize=(512, 512), interpolation=cv.INTER_LINEAR)
|
|||
|
image_data = np.expand_dims(
|
|||
|
np.transpose(self.preprocess_input(np.array(imaged, np.float32), md=False), (2, 0, 1)), 0)
|
|||
|
|
|||
|
with torch.no_grad():
|
|||
|
images = torch.from_numpy(image_data)
|
|||
|
image = images.to(device="cuda", dtype=torch.float32)
|
|||
|
model = net.to(device="cuda")
|
|||
|
out = model(image) # batch_size, 2, 512, 512
|
|||
|
if isinstance(out, list) or isinstance(out, tuple): # 可能有多个输出(这里把辅助解码头的也输出的所以是多个)
|
|||
|
out = out[0] # 就取第一个
|
|||
|
out = out[0] # 去掉batch
|
|||
|
pr = F.softmax(out.permute(1, 2, 0), dim=-1).cpu().numpy()
|
|||
|
result = pr.argmax(axis=-1)
|
|||
|
# 二值图像
|
|||
|
output_im = Image.fromarray(np.uint8(result)).convert('P')
|
|||
|
output_im.putpalette(self.palette)
|
|||
|
file_name = os.path.basename(path)
|
|||
|
file_directory = os.path.dirname(path)
|
|||
|
file_name_ = "binary_"+ file_name
|
|||
|
file_path = os.path.join(file_directory,file_name_)
|
|||
|
output_im.save(file_path)
|
|||
|
# 融合图像
|
|||
|
PALETTE = [(0, 0, 0), (255, 0, 0), (0, 255, 0)] # 这里主动改分类了,将识别图像与底图融合
|
|||
|
|
|||
|
seg_img0 = np.reshape(np.array(PALETTE, np.uint8)[np.reshape(result, [-1])],
|
|||
|
[orininal_h, orininal_w, -1])
|
|||
|
|
|||
|
image0 = Image.fromarray(np.uint8(seg_img0))
|
|||
|
|
|||
|
fusion = Image.blend(old_img, image0, 0.4) # 图像融合
|
|||
|
file_name_ = "segment_"+ file_name
|
|||
|
file_path = os.path.join(file_directory,file_name_)
|
|||
|
fusion.save(file_path)
|
|||
|
|
|||
|
|
|||
|
# 获取大型图片,需要进行裁剪
|
|||
|
def cut_big_image(self,path):
|
|||
|
file_name = os.path.basename(path)
|
|||
|
file_directory = os.path.dirname(path)
|
|||
|
output_floder_normal = os.path.join(file_directory,'ori')
|
|||
|
# 创建输出文件夹(如果不存在)
|
|||
|
os.makedirs(output_floder_normal, exist_ok=True)
|
|||
|
# 打开图像
|
|||
|
image = Image.open(path)
|
|||
|
width, height = image.size
|
|||
|
# 定义块的大小
|
|||
|
block_size = 512
|
|||
|
|
|||
|
# 计算需要的块数
|
|||
|
num_blocks_x = (width + block_size - 1) // block_size # 向上取整
|
|||
|
num_blocks_y = (height + block_size - 1) // block_size # 向上取整
|
|||
|
# 裁剪并保存图像块
|
|||
|
for i in range(num_blocks_x):
|
|||
|
for j in range(num_blocks_y):
|
|||
|
# 计算裁剪区域
|
|||
|
left = i * block_size
|
|||
|
upper = j * block_size
|
|||
|
right = min(left + block_size, width)
|
|||
|
lower = min(upper + block_size, height)
|
|||
|
|
|||
|
# 裁剪图像
|
|||
|
block = image.crop((left, upper, right, lower))
|
|||
|
|
|||
|
# 创建一个新的512x512白色图像
|
|||
|
new_block = Image.new('RGB', (block_size, block_size), (255, 255, 255))
|
|||
|
# 将裁剪的图像粘贴到白色图像上
|
|||
|
new_block.paste(block, (0, 0))
|
|||
|
|
|||
|
# 保存图像块
|
|||
|
block_filename = f'block_{i}_{j}.png'
|
|||
|
new_block.save(os.path.join(output_floder_normal, block_filename))
|
|||
|
|
|||
|
print("裁剪完成,图像块已保存。")
|
|||
|
|
|||
|
|
|||
|
def get_images(self, path):
|
|||
|
imglist = []
|
|||
|
lines = os.listdir(path)
|
|||
|
# print(lines)
|
|||
|
for ii, line in enumerate(lines):
|
|||
|
name = line
|
|||
|
_imagepath = os.path.join(path, name)
|
|||
|
assert os.path.isfile(_imagepath)
|
|||
|
image = Image.open(_imagepath)
|
|||
|
orininal_h = image.size[1]
|
|||
|
orininal_w = image.size[0]
|
|||
|
item = {"name": name, "orininal_h": orininal_h, "orininal_w": orininal_w, "image": image}
|
|||
|
imglist.append(item)
|
|||
|
# print("共监测到{}张原始图像和标签".format(len(imglist)))
|
|||
|
return imglist
|
|||
|
|
|||
|
def folder_segment(self,path,net):
|
|||
|
# file_name = os.path.basename(path)
|
|||
|
# file_directory = os.path.dirname(path)
|
|||
|
# ori_path = os.path.join(file_directory,'ori')
|
|||
|
imglist = self.get_images(path)
|
|||
|
assert len(imglist) != 0
|
|||
|
for i in tqdm(imglist):
|
|||
|
image = i["image"]
|
|||
|
name = i["name"]
|
|||
|
orininal_w = i["orininal_w"]
|
|||
|
orininal_h = i["orininal_h"]
|
|||
|
old_img = copy.deepcopy(image)
|
|||
|
imaged = cv.resize(np.array(image), dsize=(512, 512), interpolation=cv.INTER_LINEAR)
|
|||
|
image_data = np.expand_dims(
|
|||
|
np.transpose(self.preprocess_input(np.array(imaged, np.float32), md=False), (2, 0, 1)), 0)
|
|||
|
with torch.no_grad():
|
|||
|
images = torch.from_numpy(image_data)
|
|||
|
image = images.to(device="cuda", dtype=torch.float32)
|
|||
|
model = net.to(device="cuda")
|
|||
|
out = model(image) # batch_size, 2, 512, 512
|
|||
|
if isinstance(out, list) or isinstance(out, tuple): # 可能有多个输出(这里把辅助解码头的也输出的所以是多个)
|
|||
|
out = out[0] # 就取第一个
|
|||
|
out = out[0] # 去掉batch
|
|||
|
|
|||
|
pr = F.softmax(out.permute(1, 2, 0), dim=-1).cpu().numpy()
|
|||
|
result = pr.argmax(axis=-1)
|
|||
|
# 结果图
|
|||
|
path_ = Path(path)
|
|||
|
parent_directory = path_.parent
|
|||
|
directory = os.path.join(parent_directory,'binary')
|
|||
|
if not os.path.exists(directory):
|
|||
|
os.makedirs(directory)
|
|||
|
output_im = Image.fromarray(np.uint8(result)).convert('P')
|
|||
|
|
|||
|
output_im.putpalette(self.palette)
|
|||
|
tmp_path_binary= os.path.join(directory,name)
|
|||
|
output_im.save(tmp_path_binary)
|
|||
|
|
|||
|
directory = os.path.join(parent_directory,'fusion')
|
|||
|
if not os.path.exists(directory):
|
|||
|
os.makedirs(directory)
|
|||
|
PALETTE = [(0, 0, 0), (255, 0, 0), (0, 255, 0)] # 这里主动改分类了,将识别图像与底图融合
|
|||
|
seg_img0 = np.reshape(np.array(PALETTE, np.uint8)[np.reshape(result, [-1])],
|
|||
|
[orininal_h, orininal_w, -1])
|
|||
|
|
|||
|
image0 = Image.fromarray(np.uint8(seg_img0))
|
|||
|
|
|||
|
fusion = Image.blend(old_img, image0, 0.4) # 图像融合
|
|||
|
tmp_path_fusion= os.path.join(directory,name)
|
|||
|
fusion.save(tmp_path_fusion)
|
|||
|
|
|||
|
|
|||
|
def folder_segment_all(self,path,net,mode):
|
|||
|
# file_name = os.path.basename(path)
|
|||
|
# file_directory = os.path.dirname(path)
|
|||
|
# ori_path = os.path.join(file_directory,'ori')
|
|||
|
imglist = self.get_images(path)
|
|||
|
assert len(imglist) != 0
|
|||
|
for i in tqdm(imglist):
|
|||
|
image = i["image"]
|
|||
|
name = i["name"]
|
|||
|
orininal_w = i["orininal_w"]
|
|||
|
orininal_h = i["orininal_h"]
|
|||
|
old_img = copy.deepcopy(image)
|
|||
|
imaged = cv.resize(np.array(image), dsize=(512, 512), interpolation=cv.INTER_LINEAR)
|
|||
|
image_data = np.expand_dims(
|
|||
|
np.transpose(self.preprocess_input(np.array(imaged, np.float32), md=False), (2, 0, 1)), 0)
|
|||
|
with torch.no_grad():
|
|||
|
images = torch.from_numpy(image_data)
|
|||
|
image = images.to(device="cuda", dtype=torch.float32)
|
|||
|
model = net.to(device="cuda")
|
|||
|
out = model(image) # batch_size, 2, 512, 512
|
|||
|
if isinstance(out, list) or isinstance(out, tuple): # 可能有多个输出(这里把辅助解码头的也输出的所以是多个)
|
|||
|
out = out[0] # 就取第一个
|
|||
|
out = out[0] # 去掉batch
|
|||
|
|
|||
|
pr = F.softmax(out.permute(1, 2, 0), dim=-1).cpu().numpy()
|
|||
|
result = pr.argmax(axis=-1)
|
|||
|
# 结果图
|
|||
|
path_ = Path(path)
|
|||
|
parent_directory = path_.parent
|
|||
|
directory = os.path.join(parent_directory, mode+'_binary')
|
|||
|
if not os.path.exists(directory):
|
|||
|
os.makedirs(directory)
|
|||
|
output_im = Image.fromarray(np.uint8(result)).convert('P')
|
|||
|
|
|||
|
output_im.putpalette(self.palette)
|
|||
|
tmp_path_binary= os.path.join(directory,name)
|
|||
|
output_im.save(tmp_path_binary)
|
|||
|
|
|||
|
# directory = os.path.join(parent_directory,mode + '_fusion')
|
|||
|
# if not os.path.exists(directory):
|
|||
|
# os.makedirs(directory)
|
|||
|
# PALETTE = [(0, 0, 0), (255, 0, 0), (0, 255, 0)] # 这里主动改分类了,将识别图像与底图融合
|
|||
|
# seg_img0 = np.reshape(np.array(PALETTE, np.uint8)[np.reshape(result, [-1])],
|
|||
|
# [orininal_h, orininal_w, -1])
|
|||
|
|
|||
|
# image0 = Image.fromarray(np.uint8(seg_img0))
|
|||
|
|
|||
|
# fusion = Image.blend(old_img, image0, 0.4) # 图像融合
|
|||
|
# tmp_path_fusion= os.path.join(directory,name)
|
|||
|
# fusion.save(tmp_path_fusion)
|
|||
|
|
|||
|
|
|||
|
def merge_pic(self,path):
|
|||
|
image = Image.open(path)
|
|||
|
width, height = image.size
|
|||
|
file_name = os.path.basename(path)
|
|||
|
file_directory = os.path.dirname(path)
|
|||
|
output_image_path = os.path.join(file_directory,file_name)
|
|||
|
for name in ['fusion','binary']:
|
|||
|
file_name_ = "merge_" +name+ "_" + file_name
|
|||
|
input_folder = os.path.join(file_directory,name)
|
|||
|
# 获取所有小图片文件名
|
|||
|
image_files = [f for f in os.listdir(input_folder) if f.endswith('.png')]
|
|||
|
image_files.sort() # 按文件名排序,确保按顺序合并
|
|||
|
# 假设小图的尺寸是512x512
|
|||
|
block_size = 512
|
|||
|
# 计算需要的块数
|
|||
|
num_blocks_x = (width + block_size - 1) // block_size # 向上取整
|
|||
|
num_blocks_y = (height + block_size - 1) // block_size # 向上取整
|
|||
|
|
|||
|
# 创建一个新的空白图像,用于合并块
|
|||
|
merged_image = Image.new('RGB', (width, height))
|
|||
|
|
|||
|
# 遍历所有块并将它们粘贴到合并图像中
|
|||
|
for i in range(num_blocks_x):
|
|||
|
for j in range(num_blocks_y):
|
|||
|
# 计算块的文件名
|
|||
|
block_filename = f'block_{i}_{j}.png'
|
|||
|
block_path = os.path.join(input_folder, block_filename)
|
|||
|
|
|||
|
# 打开块图像
|
|||
|
if os.path.exists(block_path):
|
|||
|
block = Image.open(block_path)
|
|||
|
# 计算粘贴位置
|
|||
|
left = i * block_size
|
|||
|
upper = j * block_size
|
|||
|
merged_image.paste(block, (left, upper))
|
|||
|
# 保存合并后的图像
|
|||
|
target = os.path.join(file_directory,file_name_)
|
|||
|
merged_image.save(target) # 替换为你想保存的路径
|
|||
|
print("合并完成,保存为:", target)
|
|||
|
|
|||
|
|
|||
|
def merge_pic_all(self,path):
|
|||
|
image = Image.open(path)
|
|||
|
width, height = image.size
|
|||
|
file_name = os.path.basename(path)
|
|||
|
file_directory = os.path.dirname(path)
|
|||
|
path_list = []
|
|||
|
for name in ['pv_binary','roof_binary']:
|
|||
|
file_name_ = "merge_" +name+ "_" + file_name
|
|||
|
input_folder = os.path.join(file_directory,name)
|
|||
|
# 获取所有小图片文件名
|
|||
|
image_files = [f for f in os.listdir(input_folder) if f.endswith('.png')]
|
|||
|
image_files.sort() # 按文件名排序,确保按顺序合并
|
|||
|
# 假设小图的尺寸是512x512
|
|||
|
block_size = 512
|
|||
|
# 计算需要的块数
|
|||
|
num_blocks_x = (width + block_size - 1) // block_size # 向上取整
|
|||
|
num_blocks_y = (height + block_size - 1) // block_size # 向上取整
|
|||
|
|
|||
|
# 创建一个新的空白图像,用于合并块
|
|||
|
merged_image = Image.new('RGB', (width, height))
|
|||
|
|
|||
|
# 遍历所有块并将它们粘贴到合并图像中
|
|||
|
for i in range(num_blocks_x):
|
|||
|
for j in range(num_blocks_y):
|
|||
|
# 计算块的文件名
|
|||
|
block_filename = f'block_{i}_{j}.png'
|
|||
|
block_path = os.path.join(input_folder, block_filename)
|
|||
|
|
|||
|
# 打开块图像
|
|||
|
if os.path.exists(block_path):
|
|||
|
block = Image.open(block_path)
|
|||
|
# 计算粘贴位置
|
|||
|
left = i * block_size
|
|||
|
upper = j * block_size
|
|||
|
merged_image.paste(block, (left, upper))
|
|||
|
# 保存合并后的图像
|
|||
|
target = os.path.join(file_directory,file_name_)
|
|||
|
merged_image.save(target) # 替换为你想保存的路径
|
|||
|
print("合并完成,保存为:", target)
|
|||
|
path_list.append(target)
|
|||
|
return path_list
|
|||
|
|
|||
|
|
|||
|
|
|||
|
def preprocess_input(self, image, md=False):
|
|||
|
mean = (0.231, 0.217, 0.22) # 针对北京的数据集
|
|||
|
std = (0.104, 0.086, 0.085)
|
|||
|
if md:
|
|||
|
image /= 255.0
|
|||
|
image -= mean
|
|||
|
image /= std
|
|||
|
return image
|
|||
|
else:
|
|||
|
image /= 255.0
|
|||
|
return image
|
|||
|
|
|||
|
|
|||
|
# 将屋顶与光伏的二值图进行合并
|
|||
|
def merge_binary(self, path_list):
|
|||
|
|
|||
|
# 打开二值图像
|
|||
|
image1_path = path_list[1] # 替换为你的屋顶图像路径
|
|||
|
image2_path = path_list[0] # 替换为你的光伏图像路径
|
|||
|
file_directory = os.path.dirname(image1_path)
|
|||
|
# 加载图像并确保它们是RGB模式
|
|||
|
image1 = Image.open(image1_path).convert('RGB')
|
|||
|
image2 = Image.open(image2_path).convert('RGB')
|
|||
|
|
|||
|
# 创建一个新的图像,RGB模式,初始为黑色
|
|||
|
output_image = Image.new('RGB', image1.size, (0, 0, 0))
|
|||
|
|
|||
|
# 获取像素数据
|
|||
|
pixels1 = image1.load()
|
|||
|
pixels2 = image2.load()
|
|||
|
output_pixels = output_image.load()
|
|||
|
|
|||
|
# 遍历所有像素点
|
|||
|
for x in range(image1.width):
|
|||
|
for y in range(image1.height):
|
|||
|
# 检查两个图像在当前像素点上的颜色
|
|||
|
if pixels1[x, y] == (255, 0, 0) and pixels2[x, y] == (255, 0, 0): # 红色
|
|||
|
output_pixels[x, y] = (255, 0, 0) # 设置为红色
|
|||
|
else:
|
|||
|
output_pixels[x, y] = (0, 0, 0) # 设置为黑色
|
|||
|
|
|||
|
# 保存输出图像
|
|||
|
final_path = os.path.join(file_directory, 'merge_pvroof_bianry.png')
|
|||
|
output_image.save(final_path) # 替换为你想保存的路径
|
|||
|
return final_path
|
|||
|
|
|||
|
# 将二值图与原始图合并
|
|||
|
def merge_final(self,path_list):
|
|||
|
ori_path = path_list[1] # 原始图像
|
|||
|
binary_path = path_list[0] # 二值图像
|
|||
|
# 加载图像并确保它们是RGBA模式(如果需要透明度)
|
|||
|
ori_image = Image.open(ori_path).convert('RGBA')
|
|||
|
binary_image = Image.open(binary_path).convert('RGBA')
|
|||
|
|
|||
|
# 确保两张图像的大小相同
|
|||
|
if ori_image.size != binary_image.size:
|
|||
|
raise ValueError("两张图像的尺寸必须相同!")
|
|||
|
|
|||
|
# 设置混合比例,0.5表示两张图像各占50%
|
|||
|
alpha = 0.4
|
|||
|
|
|||
|
# 融合图像
|
|||
|
blended_image = Image.blend(ori_image, binary_image, alpha)
|
|||
|
file_directory = os.path.dirname(ori_path)
|
|||
|
file_name = os.path.basename(ori_path)
|
|||
|
final_path = os.path.join(file_directory,'fusion_'+file_name)
|
|||
|
|
|||
|
# 保存融合后的图像
|
|||
|
blended_image.save(final_path) # 替换为你想保存的路径
|