From 3f35af954f2d83e7a88e08e2df7a6eaa6b3577ba Mon Sep 17 00:00:00 2001 From: Havoc <2993167370@qq.com> Date: Wed, 28 May 2025 02:23:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A12=E5=92=8C?= =?UTF-8?q?=E4=BB=BB=E5=8A=A14=EF=BC=8C=E5=A2=9E=E5=BC=BA=E7=AE=AD?= =?UTF-8?q?=E5=A4=B4=E6=A3=80=E6=B5=8B=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在task_2.py中引入异步箭头检测器,优化箭头方向检测流程,确保在关键路径点进行准确检测。 - 更新run_task_2函数以支持箭头方向的动态检测,并在任务执行中根据检测结果调整后续动作。 - 在task_4.py中添加run_task_4_back函数,确保机器人能够在不同方向的情况下顺利完成任务。 - 在main.py中整合新的任务调用,确保任务执行流程的连贯性和准确性。 --- main.py | 33 ++++++---- task_2/task_2.py | 165 +++++++++++++++++++++++++++++++++++++++++++++-- task_4/task_4.py | 42 ++++++------ 3 files changed, 199 insertions(+), 41 deletions(-) diff --git a/main.py b/main.py index 773438f..c4d8495 100644 --- a/main.py +++ b/main.py @@ -22,9 +22,10 @@ from utils.base_msg import BaseMsg from task_1.task_1 import run_task_1, run_task_1_back from task_2.task_2 import run_task_2, run_task_2_back -from task_2_5.task_2_5 import run_task_2_5 -from task_4.task_4 import run_task_4 +from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back +from task_4.task_4 import run_task_4, run_task_4_back from task_test.task_left_line import run_task_test +from task_5.task_5 import run_task_5 # from task_test.go_to_xy_example import run_go_to_xy_example from base_move.turn_degree import turn_degree @@ -45,22 +46,30 @@ def main(): # time.sleep(100) # TEST run_task_1(Ctrl, msg) - res = run_task_2(Ctrl, msg) + arrow_direction = run_task_2(Ctrl, msg) - run_task_2_5(Ctrl, msg) + run_task_2_5(Ctrl, msg, direction=arrow_direction) - # run_task_4(Ctrl, msg) + if arrow_direction == 'left': + run_task_4(Ctrl, msg) + else: + # run_task_3(Ctrl, msg) + pass - # run_task_test(Ctrl, msg) + run_task_5(Ctrl, msg) - # run_task_2_back(Ctrl, msg) + if arrow_direction == 'left': + # run_task_3_back(Ctrl, msg) + pass + else: + run_task_4_back(Ctrl, msg) - # run_task_1_back(Ctrl, msg) + run_task_2_5_back(Ctrl, msg, direction=arrow_direction) + + run_task_2_back(Ctrl, msg) + + run_task_1_back(Ctrl, msg) - # 坐标导航示例 - - # time.sleep(100) - except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: diff --git a/task_2/task_2.py b/task_2/task_2.py index fdb9152..148c556 100644 --- a/task_2/task_2.py +++ b/task_2/task_2.py @@ -2,11 +2,16 @@ import time import sys import os import math +import threading +import cv2 +import queue +from threading import Thread, Lock sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from base_move.go_to_xy import go_to_xy from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2 from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing +from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection from base_move.move_base_hori_line import ( arc_turn_around_hori_line, @@ -16,10 +21,100 @@ from base_move.move_base_hori_line import ( ) # 创建本模块特定的日志记录器 -logger = get_logger("任务1") +logger = get_logger("任务2-5") observe = True +# 异步箭头检测器类 +class AsyncArrowDetector: + def __init__(self, image_processor): + """ + 初始化异步箭头检测器 + + 参数: + image_processor: ImageProcessor实例 + """ + self.image_processor = image_processor + self.detection_thread = None + self.running = False + self.lock = Lock() + self.arrow_result = None + self.result_time = 0 + self.last_processed_image = None + + # 用于保存可视化结果 + self.save_dir = "logs/image" + os.makedirs(self.save_dir, exist_ok=True) + + info("异步箭头检测器已初始化", "初始化") + + def start_detection(self, interval=0.5): + """ + 启动异步箭头检测 + + 参数: + interval: 检测间隔,单位秒 + """ + if self.detection_thread is not None and self.detection_thread.is_alive(): + info("异步箭头检测已经在运行中", "警告") + return + + self.running = True + self.detection_thread = Thread(target=self._detection_worker, args=(interval,)) + self.detection_thread.daemon = True + self.detection_thread.start() + info("启动异步箭头检测线程", "启动") + + def stop_detection(self): + """停止异步箭头检测""" + self.running = False + if self.detection_thread and self.detection_thread.is_alive(): + self.detection_thread.join(timeout=1.0) + info("异步箭头检测线程已停止", "停止") + + def _detection_worker(self, interval): + """异步箭头检测工作线程""" + last_detection_time = 0 + + while self.running: + current_time = time.time() + + # 按指定间隔检测 + if current_time - last_detection_time >= interval: + img = self.image_processor.get_current_image() + if img is not None: + try: + # 保存最后处理的图像 + self.last_processed_image = img.copy() + + # 检测箭头方向 + direction = detect_arrow_direction(img) + + with self.lock: + self.arrow_result = direction + self.result_time = current_time + + if direction != "unknown": + info(f"异步检测到{direction}箭头", "箭头检测") + + # 保存检测结果的可视化图像 + timestamp = time.strftime("%Y%m%d_%H%M%S") + save_path = f"{self.save_dir}/arrow_detection_{timestamp}.jpg" + visualize_arrow_detection(img, save_path=save_path) + info(f"箭头检测可视化结果已保存至: {save_path}", "箭头检测") + except Exception as e: + error(f"异步箭头检测出错: {str(e)}", "错误") + + last_detection_time = current_time + + # 短暂休眠避免占用过多CPU + time.sleep(0.05) + + def get_last_result(self): + """获取最后一次成功的箭头检测结果""" + with self.lock: + return self.arrow_result, self.result_time, self.last_processed_image + def run_task_2(ctrl, msg): # 微调 xy 和角度 go_to_xy(ctrl, msg, 0.9, 0.25, speed=0.5, observe=True) @@ -64,7 +159,21 @@ def run_task_2(ctrl, msg): (0.63,4.73), ] + arrow_direction = None + + # 创建异步箭头检测器 + arrow_detector = AsyncArrowDetector(ctrl.image_processor) + + # 在进入第四个圆弧前(索引为7)启动异步箭头检测 + arrow_detection_started = False + for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions): + # 在第四个圆弧开始前启动异步箭头检测 + if i == 7 and not arrow_detection_started: + info("即将进入第四个圆弧,启动异步箭头检测", "箭头检测") + arrow_detector.start_detection(interval=0.3) + arrow_detection_started = True + # 设置转向命令 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST @@ -84,17 +193,59 @@ def run_task_2(ctrl, msg): print(f"第{i}次",'y为',ctrl.odo_msg.xyz[1]) print(f"第{i}次",'z为',ctrl.odo_msg.xyz[2]) time.sleep(t) # 持续3秒 + + # 在第四个圆弧完成后(索引为7)获取箭头检测结果 + if i == 7: # 最后一个弯道结束后 + info("完成第四个圆弧,开始检测箭头方向", "箭头检测") + + # 给异步检测一些时间来完成 + wait_time = 0 + max_wait = 2.0 # 最多等待2秒 + + while wait_time < max_wait: + # 获取异步检测结果 + result, result_time, last_image = arrow_detector.get_last_result() + + if result is not None and result != "unknown": + arrow_direction = result + info(f"成功检测到箭头方向: {arrow_direction}", "箭头检测") + break + + time.sleep(0.1) + wait_time += 0.1 + + # 如果未检测到或结果为unknown,尝试在当前图像上再次检测 + if arrow_direction is None or arrow_direction == "unknown": + info("异步检测未得到确定结果,尝试在当前图像上直接检测", "箭头检测") + + # 获取当前图像 + image = ctrl.image_processor.get_current_image() + if image is not None: + # 直接在当前图像上检测 + arrow_direction = detect_arrow_direction(image, observe=True) + info(f"直接检测到箭头方向: {arrow_direction}", "箭头检测") + + # 保存检测结果的可视化图像 + timestamp = time.strftime("%Y%m%d_%H%M%S") + save_path = f"logs/image/arrow_detection_final_{timestamp}.jpg" + os.makedirs(os.path.dirname(save_path), exist_ok=True) + visualize_arrow_detection(image, save_path=save_path) + info(f"最终箭头检测可视化结果已保存至: {save_path}", "箭头检测") + else: + warning("无法获取当前图像,箭头方向检测失败", "箭头检测") + arrow_direction = "unknown" + + # 停止异步箭头检测 + if arrow_detection_started: + arrow_detector.stop_detection() print('角度为',ctrl.odo_msg.rpy[2]) print('x为',ctrl.odo_msg.xyz[0]) print('y为',ctrl.odo_msg.xyz[1]) print('z为',ctrl.odo_msg.xyz[2]) - # # 停止 - # msg.mode = 7 # PureDamper - # msg.gait_id = 0 - # msg.life_count += 1 - # Ctrl.Send_cmd(msg) - # Ctrl.Wait_finish(7, 0) + + # 返回检测到的箭头方向 + return arrow_direction def run_task_2_back(ctrl, msg): # go_to_xy(ctrl, msg, 0.2, 1, speed=0.5, observe=True) diff --git a/task_4/task_4.py b/task_4/task_4.py index e7f58e0..64d23fd 100644 --- a/task_4/task_4.py +++ b/task_4/task_4.py @@ -19,6 +19,26 @@ from task_4.pass_bar import pass_bar logger = get_logger("任务4") def run_task_4(ctrl, msg): + section('任务4-1:直线移动', "移动") + # 设置机器人运动模式为快步跑 + msg.mode = 11 # 运动模式 + msg.gait_id = 3 # 步态ID(快步跑) + msg.vel_des = [0.35, 0, 0] # 期望速度 + msg.pos_des = [ 0, 0, 0] + msg.duration = 0 # 零时长表示持续运动,直到接收到新命令 + msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度 + msg.life_count += 1 + ctrl.Send_cmd(msg) + time.sleep(5) # 持续5秒钟 + + section('任务4-2:移动直到灰色天空比例小于阈值', "天空检测") + go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2) + + section('任务4-3:通过栏杆', "移动") + pass_bar(ctrl, msg) + + +def run_task_4_back(ctrl, msg): """ 参数: ctrl: Robot_Ctrl对象 @@ -124,25 +144,3 @@ def go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2, step_d warning(f"已达到最大移动距离 {max_distance} 米,但未找到天空比例小于 {sky_ratio_threshold:.2%} 的位置", "超时") return success_flag - - -def run_task_4_back(ctrl, msg): - section('任务4-1:直线移动', "移动") - # 设置机器人运动模式为快步跑 - msg.mode = 11 # 运动模式 - msg.gait_id = 3 # 步态ID(快步跑) - msg.vel_des = [0.35, 0, 0] # 期望速度 - msg.pos_des = [ 0, 0, 0] - msg.duration = 0 # 零时长表示持续运动,直到接收到新命令 - msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度 - msg.life_count += 1 - ctrl.Send_cmd(msg) - time.sleep(5) # 持续5秒钟 - - section('任务4-2:移动直到灰色天空比例小于阈值', "天空检测") - go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2) - - section('任务4-3:通过栏杆', "移动") - pass_bar(ctrl, msg) - - \ No newline at end of file