更新任务2和任务4,增强箭头检测功能

- 在task_2.py中引入异步箭头检测器,优化箭头方向检测流程,确保在关键路径点进行准确检测。
- 更新run_task_2函数以支持箭头方向的动态检测,并在任务执行中根据检测结果调整后续动作。
- 在task_4.py中添加run_task_4_back函数,确保机器人能够在不同方向的情况下顺利完成任务。
- 在main.py中整合新的任务调用,确保任务执行流程的连贯性和准确性。
This commit is contained in:
Havoc 2025-05-28 02:23:01 +08:00
parent 8e172759ac
commit 3f35af954f
3 changed files with 199 additions and 41 deletions

33
main.py
View File

@ -22,9 +22,10 @@ from utils.base_msg import BaseMsg
from task_1.task_1 import run_task_1, run_task_1_back
from task_2.task_2 import run_task_2, run_task_2_back
from task_2_5.task_2_5 import run_task_2_5
from task_4.task_4 import run_task_4
from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back
from task_4.task_4 import run_task_4, run_task_4_back
from task_test.task_left_line import run_task_test
from task_5.task_5 import run_task_5
# from task_test.go_to_xy_example import run_go_to_xy_example
from base_move.turn_degree import turn_degree
@ -45,22 +46,30 @@ def main():
# time.sleep(100) # TEST
run_task_1(Ctrl, msg)
res = run_task_2(Ctrl, msg)
arrow_direction = run_task_2(Ctrl, msg)
run_task_2_5(Ctrl, msg)
run_task_2_5(Ctrl, msg, direction=arrow_direction)
# run_task_4(Ctrl, msg)
if arrow_direction == 'left':
run_task_4(Ctrl, msg)
else:
# run_task_3(Ctrl, msg)
pass
# run_task_test(Ctrl, msg)
run_task_5(Ctrl, msg)
# run_task_2_back(Ctrl, msg)
if arrow_direction == 'left':
# run_task_3_back(Ctrl, msg)
pass
else:
run_task_4_back(Ctrl, msg)
# run_task_1_back(Ctrl, msg)
run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
run_task_2_back(Ctrl, msg)
run_task_1_back(Ctrl, msg)
# 坐标导航示例
# time.sleep(100)
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:

View File

@ -2,11 +2,16 @@ import time
import sys
import os
import math
import threading
import cv2
import queue
from threading import Thread, Lock
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from base_move.go_to_xy import go_to_xy
from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection
from base_move.move_base_hori_line import (
arc_turn_around_hori_line,
@ -16,10 +21,100 @@ from base_move.move_base_hori_line import (
)
# 创建本模块特定的日志记录器
logger = get_logger("任务1")
logger = get_logger("任务2-5")
observe = True
# 异步箭头检测器类
class AsyncArrowDetector:
def __init__(self, image_processor):
"""
初始化异步箭头检测器
参数:
image_processor: ImageProcessor实例
"""
self.image_processor = image_processor
self.detection_thread = None
self.running = False
self.lock = Lock()
self.arrow_result = None
self.result_time = 0
self.last_processed_image = None
# 用于保存可视化结果
self.save_dir = "logs/image"
os.makedirs(self.save_dir, exist_ok=True)
info("异步箭头检测器已初始化", "初始化")
def start_detection(self, interval=0.5):
"""
启动异步箭头检测
参数:
interval: 检测间隔单位秒
"""
if self.detection_thread is not None and self.detection_thread.is_alive():
info("异步箭头检测已经在运行中", "警告")
return
self.running = True
self.detection_thread = Thread(target=self._detection_worker, args=(interval,))
self.detection_thread.daemon = True
self.detection_thread.start()
info("启动异步箭头检测线程", "启动")
def stop_detection(self):
"""停止异步箭头检测"""
self.running = False
if self.detection_thread and self.detection_thread.is_alive():
self.detection_thread.join(timeout=1.0)
info("异步箭头检测线程已停止", "停止")
def _detection_worker(self, interval):
"""异步箭头检测工作线程"""
last_detection_time = 0
while self.running:
current_time = time.time()
# 按指定间隔检测
if current_time - last_detection_time >= interval:
img = self.image_processor.get_current_image()
if img is not None:
try:
# 保存最后处理的图像
self.last_processed_image = img.copy()
# 检测箭头方向
direction = detect_arrow_direction(img)
with self.lock:
self.arrow_result = direction
self.result_time = current_time
if direction != "unknown":
info(f"异步检测到{direction}箭头", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"{self.save_dir}/arrow_detection_{timestamp}.jpg"
visualize_arrow_detection(img, save_path=save_path)
info(f"箭头检测可视化结果已保存至: {save_path}", "箭头检测")
except Exception as e:
error(f"异步箭头检测出错: {str(e)}", "错误")
last_detection_time = current_time
# 短暂休眠避免占用过多CPU
time.sleep(0.05)
def get_last_result(self):
"""获取最后一次成功的箭头检测结果"""
with self.lock:
return self.arrow_result, self.result_time, self.last_processed_image
def run_task_2(ctrl, msg):
# 微调 xy 和角度
go_to_xy(ctrl, msg, 0.9, 0.25, speed=0.5, observe=True)
@ -64,7 +159,21 @@ def run_task_2(ctrl, msg):
(0.63,4.73),
]
arrow_direction = None
# 创建异步箭头检测器
arrow_detector = AsyncArrowDetector(ctrl.image_processor)
# 在进入第四个圆弧前索引为7启动异步箭头检测
arrow_detection_started = False
for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions):
# 在第四个圆弧开始前启动异步箭头检测
if i == 7 and not arrow_detection_started:
info("即将进入第四个圆弧,启动异步箭头检测", "箭头检测")
arrow_detector.start_detection(interval=0.3)
arrow_detection_started = True
# 设置转向命令
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST
@ -84,17 +193,59 @@ def run_task_2(ctrl, msg):
print(f"{i}",'y为',ctrl.odo_msg.xyz[1])
print(f"{i}",'z为',ctrl.odo_msg.xyz[2])
time.sleep(t) # 持续3秒
# 在第四个圆弧完成后索引为7获取箭头检测结果
if i == 7: # 最后一个弯道结束后
info("完成第四个圆弧,开始检测箭头方向", "箭头检测")
# 给异步检测一些时间来完成
wait_time = 0
max_wait = 2.0 # 最多等待2秒
while wait_time < max_wait:
# 获取异步检测结果
result, result_time, last_image = arrow_detector.get_last_result()
if result is not None and result != "unknown":
arrow_direction = result
info(f"成功检测到箭头方向: {arrow_direction}", "箭头检测")
break
time.sleep(0.1)
wait_time += 0.1
# 如果未检测到或结果为unknown尝试在当前图像上再次检测
if arrow_direction is None or arrow_direction == "unknown":
info("异步检测未得到确定结果,尝试在当前图像上直接检测", "箭头检测")
# 获取当前图像
image = ctrl.image_processor.get_current_image()
if image is not None:
# 直接在当前图像上检测
arrow_direction = detect_arrow_direction(image, observe=True)
info(f"直接检测到箭头方向: {arrow_direction}", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"logs/image/arrow_detection_final_{timestamp}.jpg"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
visualize_arrow_detection(image, save_path=save_path)
info(f"最终箭头检测可视化结果已保存至: {save_path}", "箭头检测")
else:
warning("无法获取当前图像,箭头方向检测失败", "箭头检测")
arrow_direction = "unknown"
# 停止异步箭头检测
if arrow_detection_started:
arrow_detector.stop_detection()
print('角度为',ctrl.odo_msg.rpy[2])
print('x为',ctrl.odo_msg.xyz[0])
print('y为',ctrl.odo_msg.xyz[1])
print('z为',ctrl.odo_msg.xyz[2])
# # 停止
# msg.mode = 7 # PureDamper
# msg.gait_id = 0
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# Ctrl.Wait_finish(7, 0)
# 返回检测到的箭头方向
return arrow_direction
def run_task_2_back(ctrl, msg):
# go_to_xy(ctrl, msg, 0.2, 1, speed=0.5, observe=True)

View File

@ -19,6 +19,26 @@ from task_4.pass_bar import pass_bar
logger = get_logger("任务4")
def run_task_4(ctrl, msg):
section('任务4-1直线移动', "移动")
# 设置机器人运动模式为快步跑
msg.mode = 11 # 运动模式
msg.gait_id = 3 # 步态ID快步跑
msg.vel_des = [0.35, 0, 0] # 期望速度
msg.pos_des = [ 0, 0, 0]
msg.duration = 0 # 零时长表示持续运动,直到接收到新命令
msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(5) # 持续5秒钟
section('任务4-2移动直到灰色天空比例小于阈值', "天空检测")
go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2)
section('任务4-3通过栏杆', "移动")
pass_bar(ctrl, msg)
def run_task_4_back(ctrl, msg):
"""
参数:
ctrl: Robot_Ctrl对象
@ -124,25 +144,3 @@ def go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2, step_d
warning(f"已达到最大移动距离 {max_distance} 米,但未找到天空比例小于 {sky_ratio_threshold:.2%} 的位置", "超时")
return success_flag
def run_task_4_back(ctrl, msg):
section('任务4-1直线移动', "移动")
# 设置机器人运动模式为快步跑
msg.mode = 11 # 运动模式
msg.gait_id = 3 # 步态ID快步跑
msg.vel_des = [0.35, 0, 0] # 期望速度
msg.pos_des = [ 0, 0, 0]
msg.duration = 0 # 零时长表示持续运动,直到接收到新命令
msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(5) # 持续5秒钟
section('任务4-2移动直到灰色天空比例小于阈值', "天空检测")
go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2)
section('任务4-3通过栏杆', "移动")
pass_bar(ctrl, msg)