mi-task/utils/base_line_handler.py

169 lines
7.2 KiB
Python
Raw Normal View History

2025-05-31 12:33:28 +08:00
import numpy as np
import cv2
def _are_lines_mergeable(line1_pts, line2_pts,
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
"""
判断两条线段是否可以合并
lineX_pts: [x1, y1, x2, y2]
max_angle_diff_deg: 最大角度差 ()
max_ep_gap_abs: 端点间最大绝对间隙
max_ep_gap_factor: 端点间最大相对间隙 (基于较短线段长度的因子)
max_p_dist_abs: 最大绝对垂直距离
max_p_dist_factor: 最大相对垂直距离 (基于较短线段长度的因子)
"""
x1a, y1a, x2a, y2a = line1_pts
x1b, y1b, x2b, y2b = line2_pts
p1a, p2a = np.array([x1a, y1a]), np.array([x2a, y2a])
p1b, p2b = np.array([x1b, y1b]), np.array([x2b, y2b])
len_a = np.linalg.norm(p2a - p1a)
len_b = np.linalg.norm(p2b - p1b)
if len_a < 1e-3 or len_b < 1e-3: return False # 避免零长度线段
shorter_len = min(len_a, len_b)
# 1. 角度相似性
angle_a = np.arctan2(y2a - y1a, x2a - x1a)
angle_b = np.arctan2(y2b - y1b, x2b - x1b)
angle_diff = np.degrees(angle_a - angle_b)
while angle_diff <= -180: angle_diff += 360
while angle_diff > 180: angle_diff -= 360
is_angle_similar = abs(angle_diff) < max_angle_diff_deg or \
abs(abs(angle_diff) - 180) < max_angle_diff_deg
if not is_angle_similar:
return False
# 2. 共线性: 检查一个线段的端点到另一个线段(无限延长)的垂直距离
current_max_allowed_perp_dist = max(max_p_dist_abs, max_p_dist_factor * shorter_len)
def get_perp_dist_point_to_inf_line(p, line_p1, line_p2, line_len):
if line_len < 1e-3: return np.linalg.norm(p - line_p1) # 线是点
return abs((line_p2[1]-line_p1[1])*p[0] - (line_p2[0]-line_p1[0])*p[1] + \
line_p2[0]*line_p1[1] - line_p2[1]*line_p1[0]) / line_len
dist_b1_to_a_inf = get_perp_dist_point_to_inf_line(p1b, p1a, p2a, len_a)
dist_b2_to_a_inf = get_perp_dist_point_to_inf_line(p2b, p1a, p2a, len_a)
dist_a1_to_b_inf = get_perp_dist_point_to_inf_line(p1a, p1b, p2b, len_b)
dist_a2_to_b_inf = get_perp_dist_point_to_inf_line(p2a, p1b, p2b, len_b)
# 如果两条线都很短,那么允许的垂直距离也应该小
# 如果一条线的端点离另一条无限延长的线太远,则不合并
if min(dist_b1_to_a_inf, dist_b2_to_a_inf) > current_max_allowed_perp_dist and \
min(dist_a1_to_b_inf, dist_a2_to_b_inf) > current_max_allowed_perp_dist:
return False
# 3. 邻近性/重叠性: 端点之间的最小距离
min_ep_dist = min(np.linalg.norm(p1a-p1b), np.linalg.norm(p1a-p2b),
np.linalg.norm(p2a-p1b), np.linalg.norm(p2a-p2b))
current_max_allowed_ep_gap = max(max_ep_gap_abs, max_ep_gap_factor * shorter_len)
if min_ep_dist > current_max_allowed_ep_gap:
return False
# 附加检查:确保线段确实是"连接的"或"重叠的"
# 计算一个端点到另一个 *线段* (非无限线) 的距离
def dist_point_to_segment(p, seg_a, seg_b):
ap = p - seg_a
ab = seg_b - seg_a
ab_len_sq = np.dot(ab, ab)
if ab_len_sq < 1e-6: return np.linalg.norm(ap)
t = np.dot(ap, ab) / ab_len_sq
if t < 0.0: closest_point = seg_a
elif t > 1.0: closest_point = seg_b
else: closest_point = seg_a + t * ab
return np.linalg.norm(p - closest_point)
dist_p1a_to_seg_b = dist_point_to_segment(p1a, p1b, p2b)
dist_p2a_to_seg_b = dist_point_to_segment(p2a, p1b, p2b)
dist_p1b_to_seg_a = dist_point_to_segment(p1b, p1a, p2a)
dist_p2b_to_seg_a = dist_point_to_segment(p2b, p1a, p2a)
if min(dist_p1a_to_seg_b, dist_p2a_to_seg_b, dist_p1b_to_seg_a, dist_p2b_to_seg_a) > current_max_allowed_ep_gap:
return False
return True
def _merge_two_lines(line1_pts, line2_pts):
points = np.array([
line1_pts[:2], line1_pts[2:],
line2_pts[:2], line2_pts[2:]
], dtype=np.float32)
vx, vy, x0, y0 = cv2.fitLine(points.reshape(-1,1,2), cv2.DIST_L2, 0, 0.01, 0.01)
vx, vy, x0, y0 = vx[0], vy[0], x0[0], y0[0]
projected_params = []
line_origin = np.array([x0, y0])
line_direction = np.array([vx, vy])
for i in range(4):
pt_vec = points[i] - line_origin
param = np.dot(pt_vec, line_direction)
projected_params.append(param)
min_param = min(projected_params)
max_param = max(projected_params)
p_start = line_origin + min_param * line_direction
p_end = line_origin + max_param * line_direction
return [int(round(p_start[0])), int(round(p_start[1])),
int(round(p_end[0])), int(round(p_end[1]))]
def _merge_collinear_lines_iterative(lines_from_hough,
min_initial_len,
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
2025-05-31 12:49:01 +08:00
"""
"""
2025-05-31 12:33:28 +08:00
if lines_from_hough is None:
return None
# 将HoughLinesP的输出转换为Python列表 [[x1,y1,x2,y2], ...]
py_lines = [list(line[0]) for line in lines_from_hough]
# 过滤掉初始太短的线
current_lines = [line for line in py_lines if np.linalg.norm(np.array(line[:2]) - np.array(line[2:])) >= min_initial_len]
if not current_lines or len(current_lines) < 2:
return np.array([[line] for line in current_lines]) if current_lines else np.array([])
made_change_in_pass = True
while made_change_in_pass:
made_change_in_pass = False
i = 0
while i < len(current_lines):
j = i + 1
merged_line_i = False
while j < len(current_lines):
if _are_lines_mergeable(current_lines[i], current_lines[j],
max_angle_diff_deg,
max_ep_gap_abs, max_ep_gap_factor,
max_p_dist_abs, max_p_dist_factor):
merged_line = _merge_two_lines(current_lines[i], current_lines[j])
current_lines[i] = merged_line # 更新第i条线
current_lines.pop(j) # 移除第j条线
made_change_in_pass = True
merged_line_i = True
# 由于current_lines[i]已改变,且列表长度已改变,
# 最安全的是从头开始或至少重新评估current_lines[i]
# 但对于这种贪心策略继续尝试将新的current_lines[i]与后续线合并是可以的
# j的位置不需要改变因为它指向被删除元素后的下一个元素
continue # j 现在指向下一条未检查的线相对于更新后的i
j += 1
# 如果 line_i 在内部循环中被修改过,则外部的 i 也应该重新评估
# 但由于我们是迭代地进行整个过程(while made_change_in_pass), 最终会收敛
i += 1
return np.array([[line] for line in current_lines]) if current_lines else np.array([])