import time import sys import os import math import threading import cv2 import queue from threading import Thread, Lock sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from base_move.go_to_xy import go_to_xy, go_to_xy_v2 from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2 from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection from base_move.move_base_hori_line import ( arc_turn_around_hori_line, go_straight_by_hori_line, move_to_hori_line, align_to_horizontal_line ) # 创建本模块特定的日志记录器 logger = get_logger("任务2-5") observe = True # 异步箭头检测器类 class AsyncArrowDetector: def __init__(self, image_processor): """ 初始化异步箭头检测器 参数: image_processor: ImageProcessor实例 """ self.image_processor = image_processor self.detection_thread = None self.running = False self.lock = Lock() self.arrow_result = None self.result_time = 0 self.last_processed_image = None # 用于保存可视化结果 self.save_dir = "logs/image" os.makedirs(self.save_dir, exist_ok=True) info("异步箭头检测器已初始化", "初始化") def start_detection(self, interval=0.5): """ 启动异步箭头检测 参数: interval: 检测间隔,单位秒 """ if self.detection_thread is not None and self.detection_thread.is_alive(): info("异步箭头检测已经在运行中", "警告") return self.running = True self.detection_thread = Thread(target=self._detection_worker, args=(interval,)) self.detection_thread.daemon = True self.detection_thread.start() info("启动异步箭头检测线程", "启动") def stop_detection(self): """停止异步箭头检测""" self.running = False if self.detection_thread and self.detection_thread.is_alive(): self.detection_thread.join(timeout=1.0) info("异步箭头检测线程已停止", "停止") def _detection_worker(self, interval): """异步箭头检测工作线程""" last_detection_time = 0 while self.running: current_time = time.time() # 按指定间隔检测 if current_time - last_detection_time >= interval: img = self.image_processor.get_current_image() if img is not None: try: # 保存最后处理的图像 self.last_processed_image = img.copy() # 检测箭头方向 direction = detect_arrow_direction(img) with self.lock: self.arrow_result = direction self.result_time = current_time if direction != "unknown": info(f"异步检测到{direction}箭头", "箭头检测") # 保存检测结果的可视化图像 timestamp = time.strftime("%Y%m%d_%H%M%S") save_path = f"{self.save_dir}/arrow_detection_{timestamp}.jpg" # visualize_arrow_detection(img, save_path=save_path) info(f"箭头检测可视化结果已保存至: {save_path}", "箭头检测") except Exception as e: error(f"异步箭头检测出错: {str(e)}", "错误") last_detection_time = current_time # 短暂休眠避免占用过多CPU time.sleep(0.05) def get_last_result(self): """获取最后一次成功的箭头检测结果""" with self.lock: return self.arrow_result, self.result_time, self.last_processed_image def run_task_2(ctrl, msg, xy_flag=False): # 微调 xy 和角度 go_to_xy_v2(ctrl, msg, 0.9, 0.25, speed=0.5, observe=True) turn_degree(ctrl, msg, 0.8, absolute=True) print('角度为',ctrl.odo_msg.rpy[2]) #获取微调角度 text=["直线走一段","第一个圆弧","微调路径","第二个圆弧","微调路径","第三个圆弧","微调路径","第四个圆弧","绕出来","前走一点"] # 连续三重弯道行走 directions = [ #直线走一段 (0.4,0,0,0,0,0,1.2), #第一个圆弧 (0.45, 0, 0.77,0,0,0,6.5), #微调路径 (0.3,0,0,0,0,0,0.4), #第二个圆弧 (0.4,0,-0.73,0,0,0,7.9), #微调路径 (0.3,0,0,0,0,0,1), # 第三个圆弧 (0.4,0,0.73,0,0,0,6.4), #微调路径 (0.3,0,0,0,0,0,1.3), #第四个圆弧 (0.4,0,-0.72,0,0,0,3.6), #绕出来 (0.3,0,0.6,0,0,0,1), # #前走一点 # (0.3,0,0,0,0,0,0.5) ] # [vel_x, vel_z] 对应 [左转, 右转, 左转] gotoxy=[ (0.87,0.23), (1.19,0.22), (0.91,1.55), (0.81,1.46), (0.67,2.59), (0.92,2.43), (1.53,3.62), (1.15,3.70), (0.63,4.73), ] arrow_direction = None # 创建异步箭头检测器 arrow_detector = AsyncArrowDetector(ctrl.image_processor) # 在进入第四个圆弧前(索引为7)启动异步箭头检测 arrow_detection_started = False for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions): # 在第四个圆弧开始前启动异步箭头检测 if i == 7 and not arrow_detection_started: info("即将进入第四个圆弧,启动异步箭头检测", "箭头检测") arrow_detector.start_detection(interval=0.3) arrow_detection_started = True # 设置转向命令 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST msg.vel_des = [vel_x, vel_y, vel_z] # [x, y, z],y=0表示不侧移 msg.rpy_des = [rpy_x, rpy_y, rpy_z] msg.duration = 0 # 连续运动 msg.step_height = [0.03, 0.03] msg.life_count += 1 ctrl.Send_cmd(msg) # 根据转弯方向调整持续时间(假设半圆需要3秒 if xy_flag: go_to_xy(ctrl, msg, gotoxy[i][0], gotoxy[i][1], speed=0.5, observe=True) # 打印当前方向 print(f"开始 {text[i]} ") print(f"第{i}次",'角度为',ctrl.odo_msg.rpy[2]) print(f"第{i}次",'x为',ctrl.odo_msg.xyz[0]) print(f"第{i}次",'y为',ctrl.odo_msg.xyz[1]) print(f"第{i}次",'z为',ctrl.odo_msg.xyz[2]) time.sleep(t) # 持续3秒 # 在第四个圆弧完成后(索引为7)获取箭头检测结果 if i == 7: # 最后一个弯道结束后 info("完成第四个圆弧,开始检测箭头方向", "箭头检测") # 给异步检测一些时间来完成 wait_time = 0 max_wait = 2.0 # 最多等待2秒 while wait_time < max_wait: # 获取异步检测结果 result, result_time, last_image = arrow_detector.get_last_result() if result is not None and result != "unknown": arrow_direction = result info(f"成功检测到箭头方向: {arrow_direction}", "箭头检测") break time.sleep(0.1) wait_time += 0.1 # 如果未检测到或结果为unknown,尝试在当前图像上再次检测 if arrow_direction is None or arrow_direction == "unknown": info("异步检测未得到确定结果,尝试在当前图像上直接检测", "箭头检测") # 获取当前图像 image = ctrl.image_processor.get_current_image() if image is not None: # 直接在当前图像上检测 arrow_direction = detect_arrow_direction(image, observe=False) info(f"直接检测到箭头方向: {arrow_direction}", "箭头检测") # 保存检测结果的可视化图像 timestamp = time.strftime("%Y%m%d_%H%M%S") save_path = f"logs/image/arrow_detection_final_{timestamp}.jpg" os.makedirs(os.path.dirname(save_path), exist_ok=True) # visualize_arrow_detection(image, save_path=save_path) info(f"最终箭头检测可视化结果已保存至: {save_path}", "箭头检测") else: warning("无法获取当前图像,箭头方向检测失败", "箭头检测") arrow_direction = "unknown" # 停止异步箭头检测 if arrow_detection_started: arrow_detector.stop_detection() print('角度为',ctrl.odo_msg.rpy[2]) print('x为',ctrl.odo_msg.xyz[0]) print('y为',ctrl.odo_msg.xyz[1]) print('z为',ctrl.odo_msg.xyz[2]) # 返回检测到的箭头方向 return arrow_direction def run_task_2_back(ctrl, msg): # go_to_xy(ctrl, msg, 0.2, 1, speed=0.5, observe=True) go_to_xy(ctrl, msg, 0.7, 5, speed=0.5, observe=True) turn_degree_v2(ctrl, msg, -106.5, absolute=True) print('角度为',ctrl.odo_msg.rpy[2]) print('x为',ctrl.odo_msg.xyz[0]) print('y为',ctrl.odo_msg.xyz[1]) print('z为',ctrl.odo_msg.xyz[2]) text=["直线走一段","第一个圆弧","微调路径","第二个圆弧","微调路径","第三个圆弧","微调路径","第四个圆弧","绕出来","前走一点"] # 连续三重弯道行走 directions = [ (0.3,0,0,0,0,0,0.6), #第四个圆弧 (0.4,0,0.69,0,0,0,3), #微调路径 (0.3,0,0,0,0,0,1.3), # 第三个圆弧 (0.4,0,-0.73,0,0,0,6), #微调路径 (0.3,0,0,0,0,0,1.4), #第二个圆弧 (0.4,0,0.74,0,0,0,7.4), #微调路径 (0.3,0,0,0,0,0,1.2), #第一个圆弧 (0.45, 0, -0.78,0,0,0,5.8), #直线走一段 (0.4,0.05,0,0,0,0,0.3), (0.4,0,0,0,0,0,1.5), ] # [vel_x, vel_z] 对应 [左转, 右转, 左转] for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions): # 设置转向命令 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST msg.vel_des = [vel_x, vel_y, vel_z] # [x, y, z],y=0表示不侧移 msg.rpy_des = [rpy_x, rpy_y, rpy_z] msg.duration = 0 # 连续运动 msg.step_height = [0.03, 0.03] msg.life_count += 1 ctrl.Send_cmd(msg) # 打印当前方向 print(f"开始 {text[i]} ") # 根据转弯方向调整持续时间(假设半圆需要3秒) time.sleep(t) # 持续3秒