feat(base_move): implement version 2 of horizontal line detection
- Add support for two versions of horizontal line detection functions - Update move_base_hori_line.py to use the selected detection version - Modify example_robot_log.py and test/main.py to use the new detection version - Update image saving path in rgb-camera/img-raw-get.py - Improve logging and error handling in detect_track.py
This commit is contained in:
		
							parent
							
								
									13f7e30e69
								
							
						
					
					
						commit
						85defc7811
					
				@ -59,7 +59,7 @@ def align_to_horizontal_line(ctrl, msg, observe=False, max_attempts=3, detect_fu
 | 
			
		||||
            error("未检测到横向线,无法进行校准", "失败")
 | 
			
		||||
            
 | 
			
		||||
            # 尝试小幅度摇头寻找横线
 | 
			
		||||
            # TODO 改成上下低头;
 | 
			
		||||
            # TODO 或者改成上下低头?
 | 
			
		||||
            if attempts < max_attempts - 1:
 | 
			
		||||
                small_angle = 5 * (1 if attempts % 2 == 0 else -1)
 | 
			
		||||
                info(f"尝试摇头 {small_angle}度 寻找横线", "校准")
 | 
			
		||||
@ -125,12 +125,20 @@ def align_to_horizontal_line(ctrl, msg, observe=False, max_attempts=3, detect_fu
 | 
			
		||||
        attempts += 1
 | 
			
		||||
        
 | 
			
		||||
        # 检查校准结果
 | 
			
		||||
        edge_point_after, edge_info_after = detect_horizontal_track_edge(
 | 
			
		||||
            ctrl.image_processor.get_current_image(), 
 | 
			
		||||
            observe=observe, 
 | 
			
		||||
            delay=1000 if observe else 0,
 | 
			
		||||
            save_log=True
 | 
			
		||||
        )
 | 
			
		||||
        if detect_func_version == 1:
 | 
			
		||||
            edge_point_after, edge_info_after = detect_horizontal_track_edge(
 | 
			
		||||
                ctrl.image_processor.get_current_image(), 
 | 
			
		||||
                observe=observe, 
 | 
			
		||||
                delay=1000 if observe else 0,
 | 
			
		||||
                save_log=True
 | 
			
		||||
            )
 | 
			
		||||
        elif detect_func_version == 2:
 | 
			
		||||
            edge_point_after, edge_info_after = detect_horizontal_track_edge_v2(
 | 
			
		||||
                ctrl.image_processor.get_current_image(), 
 | 
			
		||||
                observe=observe, 
 | 
			
		||||
                delay=1000 if observe else 0,
 | 
			
		||||
                save_log=True
 | 
			
		||||
            )
 | 
			
		||||
        
 | 
			
		||||
        if edge_info_after and "slope" in edge_info_after:
 | 
			
		||||
            current_slope = edge_info_after["slope"]
 | 
			
		||||
@ -437,11 +445,16 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, absolute_deg=200, observe=
 | 
			
		||||
 | 
			
		||||
def arc_turn_around_hori_line(ctrl, msg, angle_deg=90, target_distance=0.2, 
 | 
			
		||||
                              pass_align=False,
 | 
			
		||||
                              # radius
 | 
			
		||||
                              radius=None,
 | 
			
		||||
                              min_radius=0.1,
 | 
			
		||||
                              max_radius=5,
 | 
			
		||||
                              # mode
 | 
			
		||||
                              detect_func_version=1,
 | 
			
		||||
                              # Qrcode
 | 
			
		||||
                              scan_qrcode=False,
 | 
			
		||||
                              qr_check_interval=0.3, 
 | 
			
		||||
                              # additional
 | 
			
		||||
                              observe=False,
 | 
			
		||||
                              bad_big_angle_corret=False,
 | 
			
		||||
                              no_end_reset=False,):
 | 
			
		||||
@ -486,7 +499,7 @@ def arc_turn_around_hori_line(ctrl, msg, angle_deg=90, target_distance=0.2,
 | 
			
		||||
    # 1. 对准横线
 | 
			
		||||
    if not pass_align:
 | 
			
		||||
        section("校准到横向线水平", "校准")
 | 
			
		||||
        aligned = align_to_horizontal_line(ctrl, msg, observe=observe)
 | 
			
		||||
        aligned = align_to_horizontal_line(ctrl, msg, detect_func_version=detect_func_version, observe=observe)
 | 
			
		||||
        if not aligned:
 | 
			
		||||
            error("无法校准到横向线水平,停止动作", "停止")
 | 
			
		||||
            if scan_qrcode:
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ from utils.marker_client import MarkerRunner
 | 
			
		||||
from utils.base_msg import BaseMsg
 | 
			
		||||
 | 
			
		||||
from task_1.task_1 import run_task_1
 | 
			
		||||
from task_2_5.task_2_5 import run_task_2_5
 | 
			
		||||
from task_5.task_5 import run_task_5
 | 
			
		||||
 | 
			
		||||
from task_test.task_test import run_task_test
 | 
			
		||||
@ -39,7 +40,9 @@ def main():
 | 
			
		||||
        Ctrl.base_msg.stop()  # BUG 垃圾指令 for eat
 | 
			
		||||
        
 | 
			
		||||
        # time.sleep(100) # TEST 
 | 
			
		||||
        run_task_1(Ctrl, msg)
 | 
			
		||||
        # run_task_1(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        run_task_2_5(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        # run_task_5(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										51
									
								
								task_2_5/task_2_5.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								task_2_5/task_2_5.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,51 @@
 | 
			
		||||
import time
 | 
			
		||||
import sys
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
# 添加父目录到路径,以便能够导入utils
 | 
			
		||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
			
		||||
 | 
			
		||||
from base_move.turn_degree import turn_degree
 | 
			
		||||
from base_move.move_base_hori_line import arc_turn_around_hori_line, align_to_horizontal_line
 | 
			
		||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
 | 
			
		||||
 | 
			
		||||
observe = True
 | 
			
		||||
 | 
			
		||||
def run_task_2_5(Ctrl, msg):
 | 
			
		||||
    section('任务2.5:预备进入任务3', "启动")
 | 
			
		||||
 | 
			
		||||
    # TEST
 | 
			
		||||
    turn_degree(Ctrl, msg, 90, absolute=observe)
 | 
			
		||||
 | 
			
		||||
    align_to_horizontal_line(
 | 
			
		||||
        Ctrl,
 | 
			
		||||
        msg,
 | 
			
		||||
        detect_func_version=2,
 | 
			
		||||
        observe=observe,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    return
 | 
			
		||||
 | 
			
		||||
    section('任务2.5-1:第一次旋转', "移动")
 | 
			
		||||
 | 
			
		||||
    arc_turn_around_hori_line(
 | 
			
		||||
        Ctrl,
 | 
			
		||||
        msg,
 | 
			
		||||
        angle_deg=-90,
 | 
			
		||||
        target_distance=0.5,
 | 
			
		||||
        detect_func_version=2,
 | 
			
		||||
        observe=observe,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    section('任务2.5-2:第二次旋转', "移动")
 | 
			
		||||
 | 
			
		||||
    arc_turn_around_hori_line(
 | 
			
		||||
        Ctrl,
 | 
			
		||||
        msg,
 | 
			
		||||
        angle_deg=90,
 | 
			
		||||
        target_distance=0.5,
 | 
			
		||||
        detect_func_version=2,
 | 
			
		||||
        observe=observe,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
@ -51,7 +51,7 @@ class ImageSubscriber(Node):
 | 
			
		||||
        if self.cv_image is not None:
 | 
			
		||||
            # Generate a timestamped filename
 | 
			
		||||
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
 | 
			
		||||
            filename = f"captured_images/arrow-right/image_{timestamp}.png"
 | 
			
		||||
            filename = f"captured_images/path/image_{timestamp}.png"
 | 
			
		||||
            
 | 
			
		||||
            cv2.imwrite(filename, self.cv_image)
 | 
			
		||||
            self.get_logger().info(f"Saved image as {filename}")
 | 
			
		||||
 | 
			
		||||
@ -373,6 +373,12 @@ def detect_horizontal_track_edge_v2(image, observe=False, delay=1000, save_log=T
 | 
			
		||||
        error("无法加载图像", "失败")
 | 
			
		||||
        return None, None
 | 
			
		||||
    
 | 
			
		||||
    if save_log:
 | 
			
		||||
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
 | 
			
		||||
        origin_image_path = os.path.join("logs/image", f"origin_horizontal_edge_{timestamp}.jpg")
 | 
			
		||||
        cv2.imwrite(origin_image_path, img)
 | 
			
		||||
        info(f"保存原始图像到: {origin_image_path}", "日志")
 | 
			
		||||
    
 | 
			
		||||
    # 获取图像尺寸
 | 
			
		||||
    height, width = img.shape[:2]
 | 
			
		||||
    
 | 
			
		||||
@ -753,7 +759,7 @@ def detect_horizontal_track_edge_v2(image, observe=False, delay=1000, save_log=T
 | 
			
		||||
        result_img = slope_img
 | 
			
		||||
    
 | 
			
		||||
    # 保存日志图像
 | 
			
		||||
    if save_log and result_img is not None:
 | 
			
		||||
    if save_log:
 | 
			
		||||
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
 | 
			
		||||
        log_dir = "logs/image"
 | 
			
		||||
        os.makedirs(log_dir, exist_ok=True)
 | 
			
		||||
@ -763,24 +769,25 @@ def detect_horizontal_track_edge_v2(image, observe=False, delay=1000, save_log=T
 | 
			
		||||
        cv2.imwrite(origin_image_path, img)
 | 
			
		||||
        info(f"保存原始图像到: {origin_image_path}", "日志")
 | 
			
		||||
 | 
			
		||||
        img_path = os.path.join(log_dir, f"horizontal_edge_{timestamp}.jpg")
 | 
			
		||||
        cv2.imwrite(img_path, result_img)
 | 
			
		||||
        info(f"保存横向边缘检测结果图像到: {img_path}", "日志")
 | 
			
		||||
        
 | 
			
		||||
        # 保存文本日志信息
 | 
			
		||||
        log_info = {
 | 
			
		||||
            "timestamp": timestamp,
 | 
			
		||||
            "edge_point": bottom_edge_point,
 | 
			
		||||
            "distance_to_center": distance_to_center,
 | 
			
		||||
            "slope": selected_slope,
 | 
			
		||||
            "distance_to_bottom": distance_to_bottom,
 | 
			
		||||
            "intersection_point": intersection_point,
 | 
			
		||||
            "score": selected_score,
 | 
			
		||||
            "valid": valid_result,
 | 
			
		||||
            "reason": reason if not valid_result else "",
 | 
			
		||||
            "is_lower_line": len(lower_horizontal_lines) > 0 and selected_line == lower_horizontal_lines[0][0]
 | 
			
		||||
        }
 | 
			
		||||
        info(f"横向边缘检测结果: {log_info}", "日志")
 | 
			
		||||
        if result_img is not None:
 | 
			
		||||
            img_path = os.path.join(log_dir, f"horizontal_edge_{timestamp}.jpg")
 | 
			
		||||
            cv2.imwrite(img_path, result_img)
 | 
			
		||||
            info(f"保存横向边缘检测结果图像到: {img_path}", "日志")
 | 
			
		||||
            
 | 
			
		||||
            # 保存文本日志信息
 | 
			
		||||
            log_info = {
 | 
			
		||||
                "timestamp": timestamp,
 | 
			
		||||
                "edge_point": bottom_edge_point,
 | 
			
		||||
                "distance_to_center": distance_to_center,
 | 
			
		||||
                "slope": selected_slope,
 | 
			
		||||
                "distance_to_bottom": distance_to_bottom,
 | 
			
		||||
                "intersection_point": intersection_point,
 | 
			
		||||
                "score": selected_score,
 | 
			
		||||
                "valid": valid_result,
 | 
			
		||||
                "reason": reason if not valid_result else "",
 | 
			
		||||
                "is_lower_line": len(lower_horizontal_lines) > 0 and selected_line == lower_horizontal_lines[0][0]
 | 
			
		||||
            }
 | 
			
		||||
            info(f"横向边缘检测结果: {log_info}", "日志")
 | 
			
		||||
    
 | 
			
		||||
    # 如果结果无效,可能需要返回失败
 | 
			
		||||
    if not valid_result:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user