mi-task/utils/base_line_handler.py
2025-05-31 12:33:28 +08:00

167 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import cv2
def _are_lines_mergeable(line1_pts, line2_pts,
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
"""
判断两条线段是否可以合并。
lineX_pts: [x1, y1, x2, y2]
max_angle_diff_deg: 最大角度差 (度)
max_ep_gap_abs: 端点间最大绝对间隙
max_ep_gap_factor: 端点间最大相对间隙 (基于较短线段长度的因子)
max_p_dist_abs: 最大绝对垂直距离
max_p_dist_factor: 最大相对垂直距离 (基于较短线段长度的因子)
"""
x1a, y1a, x2a, y2a = line1_pts
x1b, y1b, x2b, y2b = line2_pts
p1a, p2a = np.array([x1a, y1a]), np.array([x2a, y2a])
p1b, p2b = np.array([x1b, y1b]), np.array([x2b, y2b])
len_a = np.linalg.norm(p2a - p1a)
len_b = np.linalg.norm(p2b - p1b)
if len_a < 1e-3 or len_b < 1e-3: return False # 避免零长度线段
shorter_len = min(len_a, len_b)
# 1. 角度相似性
angle_a = np.arctan2(y2a - y1a, x2a - x1a)
angle_b = np.arctan2(y2b - y1b, x2b - x1b)
angle_diff = np.degrees(angle_a - angle_b)
while angle_diff <= -180: angle_diff += 360
while angle_diff > 180: angle_diff -= 360
is_angle_similar = abs(angle_diff) < max_angle_diff_deg or \
abs(abs(angle_diff) - 180) < max_angle_diff_deg
if not is_angle_similar:
return False
# 2. 共线性: 检查一个线段的端点到另一个线段(无限延长)的垂直距离
current_max_allowed_perp_dist = max(max_p_dist_abs, max_p_dist_factor * shorter_len)
def get_perp_dist_point_to_inf_line(p, line_p1, line_p2, line_len):
if line_len < 1e-3: return np.linalg.norm(p - line_p1) # 线是点
return abs((line_p2[1]-line_p1[1])*p[0] - (line_p2[0]-line_p1[0])*p[1] + \
line_p2[0]*line_p1[1] - line_p2[1]*line_p1[0]) / line_len
dist_b1_to_a_inf = get_perp_dist_point_to_inf_line(p1b, p1a, p2a, len_a)
dist_b2_to_a_inf = get_perp_dist_point_to_inf_line(p2b, p1a, p2a, len_a)
dist_a1_to_b_inf = get_perp_dist_point_to_inf_line(p1a, p1b, p2b, len_b)
dist_a2_to_b_inf = get_perp_dist_point_to_inf_line(p2a, p1b, p2b, len_b)
# 如果两条线都很短,那么允许的垂直距离也应该小
# 如果一条线的端点离另一条无限延长的线太远,则不合并
if min(dist_b1_to_a_inf, dist_b2_to_a_inf) > current_max_allowed_perp_dist and \
min(dist_a1_to_b_inf, dist_a2_to_b_inf) > current_max_allowed_perp_dist:
return False
# 3. 邻近性/重叠性: 端点之间的最小距离
min_ep_dist = min(np.linalg.norm(p1a-p1b), np.linalg.norm(p1a-p2b),
np.linalg.norm(p2a-p1b), np.linalg.norm(p2a-p2b))
current_max_allowed_ep_gap = max(max_ep_gap_abs, max_ep_gap_factor * shorter_len)
if min_ep_dist > current_max_allowed_ep_gap:
return False
# 附加检查:确保线段确实是"连接的"或"重叠的"
# 计算一个端点到另一个 *线段* (非无限线) 的距离
def dist_point_to_segment(p, seg_a, seg_b):
ap = p - seg_a
ab = seg_b - seg_a
ab_len_sq = np.dot(ab, ab)
if ab_len_sq < 1e-6: return np.linalg.norm(ap)
t = np.dot(ap, ab) / ab_len_sq
if t < 0.0: closest_point = seg_a
elif t > 1.0: closest_point = seg_b
else: closest_point = seg_a + t * ab
return np.linalg.norm(p - closest_point)
dist_p1a_to_seg_b = dist_point_to_segment(p1a, p1b, p2b)
dist_p2a_to_seg_b = dist_point_to_segment(p2a, p1b, p2b)
dist_p1b_to_seg_a = dist_point_to_segment(p1b, p1a, p2a)
dist_p2b_to_seg_a = dist_point_to_segment(p2b, p1a, p2a)
if min(dist_p1a_to_seg_b, dist_p2a_to_seg_b, dist_p1b_to_seg_a, dist_p2b_to_seg_a) > current_max_allowed_ep_gap:
return False
return True
def _merge_two_lines(line1_pts, line2_pts):
points = np.array([
line1_pts[:2], line1_pts[2:],
line2_pts[:2], line2_pts[2:]
], dtype=np.float32)
vx, vy, x0, y0 = cv2.fitLine(points.reshape(-1,1,2), cv2.DIST_L2, 0, 0.01, 0.01)
vx, vy, x0, y0 = vx[0], vy[0], x0[0], y0[0]
projected_params = []
line_origin = np.array([x0, y0])
line_direction = np.array([vx, vy])
for i in range(4):
pt_vec = points[i] - line_origin
param = np.dot(pt_vec, line_direction)
projected_params.append(param)
min_param = min(projected_params)
max_param = max(projected_params)
p_start = line_origin + min_param * line_direction
p_end = line_origin + max_param * line_direction
return [int(round(p_start[0])), int(round(p_start[1])),
int(round(p_end[0])), int(round(p_end[1]))]
def _merge_collinear_lines_iterative(lines_from_hough,
min_initial_len,
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
if lines_from_hough is None:
return None
# 将HoughLinesP的输出转换为Python列表 [[x1,y1,x2,y2], ...]
py_lines = [list(line[0]) for line in lines_from_hough]
# 过滤掉初始太短的线
current_lines = [line for line in py_lines if np.linalg.norm(np.array(line[:2]) - np.array(line[2:])) >= min_initial_len]
if not current_lines or len(current_lines) < 2:
return np.array([[line] for line in current_lines]) if current_lines else np.array([])
made_change_in_pass = True
while made_change_in_pass:
made_change_in_pass = False
i = 0
while i < len(current_lines):
j = i + 1
merged_line_i = False
while j < len(current_lines):
if _are_lines_mergeable(current_lines[i], current_lines[j],
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
merged_line = _merge_two_lines(current_lines[i], current_lines[j])
current_lines[i] = merged_line # 更新第i条线
current_lines.pop(j) # 移除第j条线
made_change_in_pass = True
merged_line_i = True
# 由于current_lines[i]已改变,且列表长度已改变,
# 最安全的是从头开始或至少重新评估current_lines[i]
# 但对于这种贪心策略继续尝试将新的current_lines[i]与后续线合并是可以的
# j的位置不需要改变因为它指向被删除元素后的下一个元素
continue # j 现在指向下一条未检查的线相对于更新后的i
j += 1
# 如果 line_i 在内部循环中被修改过,则外部的 i 也应该重新评估
# 但由于我们是迭代地进行整个过程(while made_change_in_pass), 最终会收敛
i += 1
return np.array([[line] for line in current_lines]) if current_lines else np.array([])