mi-task/task_2/task_2.py

368 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import sys
import os
import math
import threading
import cv2
import queue
from threading import Thread, Lock
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from base_move.go_to_xy import go_to_xy, go_to_xy_v2
from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection
from base_move.go_straight import go_straight, go_lateral
from base_move.move_base_hori_line import (
arc_turn_around_hori_line,
go_straight_by_hori_line,
move_to_hori_line,
align_to_horizontal_line
)
# 创建本模块特定的日志记录器
logger = get_logger("任务2-5")
observe = True
# 异步箭头检测器类
class AsyncArrowDetector:
def __init__(self, image_processor):
"""
初始化异步箭头检测器
参数:
image_processor: ImageProcessor实例
"""
self.image_processor = image_processor
self.detection_thread = None
self.running = False
self.lock = Lock()
self.arrow_result = None
self.result_time = 0
self.last_processed_image = None
# 用于保存可视化结果
self.save_dir = "logs/image"
os.makedirs(self.save_dir, exist_ok=True)
# 添加左右方向计数器
self.left_count = 0
self.right_count = 0
info("异步箭头检测器已初始化", "初始化")
def start_detection(self, interval=0.5):
"""
启动异步箭头检测
参数:
interval: 检测间隔,单位秒
"""
if self.detection_thread is not None and self.detection_thread.is_alive():
info("异步箭头检测已经在运行中", "警告")
return
self.running = True
self.detection_thread = Thread(target=self._detection_worker, args=(interval,))
self.detection_thread.daemon = True
self.detection_thread.start()
info("启动异步箭头检测线程", "启动")
def stop_detection(self):
"""停止异步箭头检测"""
self.running = False
if self.detection_thread and self.detection_thread.is_alive():
self.detection_thread.join(timeout=1.0)
info("异步箭头检测线程已停止", "停止")
def _detection_worker(self, interval):
"""异步箭头检测工作线程"""
last_detection_time = 0
while self.running:
current_time = time.time()
# 按指定间隔检测
if current_time - last_detection_time >= interval:
img = self.image_processor.get_current_image()
if img is not None:
try:
# 保存最后处理的图像
self.last_processed_image = img.copy()
# 检测箭头方向
direction = detect_arrow_direction(img)
with self.lock:
self.arrow_result = direction
self.result_time = current_time
# 更新方向计数
if direction == "left":
self.left_count += 1
elif direction == "right":
self.right_count += 1
if direction != "unknown":
info(f"异步检测到{direction}箭头 (left: {self.left_count}, right: {self.right_count})", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"{self.save_dir}/arrow_detection_{timestamp}.jpg"
# visualize_arrow_detection(img, save_path=save_path)
info(f"箭头检测可视化结果已保存至: {save_path}", "箭头检测")
except Exception as e:
error(f"异步箭头检测出错: {str(e)}", "错误")
last_detection_time = current_time
# 短暂休眠避免占用过多CPU
time.sleep(0.05)
def get_last_result(self):
"""获取最后一次成功的箭头检测结果和计数情况"""
with self.lock:
# 根据计数确定最终方向
final_direction = self.arrow_result
if self.left_count > self.right_count:
final_direction = "left"
elif self.right_count > self.left_count:
final_direction = "right"
return final_direction, self.result_time, self.last_processed_image
def get_counts(self):
"""获取当前的左右方向计数"""
with self.lock:
return self.left_count, self.right_count
def run_task_2(ctrl, msg, xy_flag=False):
# go_to_xy(ctrl, msg, 0.8, 5, speed=0.5, observe=True)
# return 'right'
# 微调 xy 和角度
go_to_xy_v2(ctrl, msg, 0.9, 0.25, speed=0.5, observe=True)
turn_degree_v2(ctrl, msg, 0.8, absolute=True)
print('角度为',ctrl.odo_msg.rpy[2])
#获取微调角度
text=["直线走一段","第一个圆弧","微调路径","第二个圆弧","微调路径","第三个圆弧","微调路径","第四个圆弧","绕出来","前走一点"]
# 连续三重弯道行走
directions = [
#直线走一段
(0.4,0,0,0,0,0,1.2),
#第一个圆弧
(0.45, 0, 0.77,0,0,0,6.5),
#微调路径
(0.3,0,0,0,0,0,0.4),
#第二个圆弧
(0.4,0,-0.73,0,0,0,7.9),
#微调路径
(0.3,0,0,0,0,0,1),
# 第三个圆弧
(0.4,0,0.73,0,0,0,6.4),
#微调路径
(0.3,0,0,0,0,0,1.3),
#第四个圆弧
(0.4,0,-0.72,0,0,0,3.6),
#绕出来
(0.3,0,0.6,0,0,0,1),
# #前走一点
# (0.3,0,0,0,0,0,0.5)
] # [vel_x, vel_z] 对应 [左转, 右转, 左转]
gotoxy=[
(0.87,0.23),
(1.19,0.22),
(0.91,1.55),
(0.81,1.46),
(0.67,2.59),
(0.92,2.43),
(1.53,3.62),
(1.15,3.70),
(0.63,4.73),
]
arrow_direction = None
# 创建异步箭头检测器
arrow_detector = AsyncArrowDetector(ctrl.image_processor)
# 在进入第四个圆弧前索引为7启动异步箭头检测
arrow_detection_started = False
for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions):
# 在第四个圆弧开始前启动异步箭头检测
if i == 7 and not arrow_detection_started:
info("即将进入第四个圆弧,启动异步箭头检测", "箭头检测")
arrow_detector.start_detection(interval=0.3)
arrow_detection_started = True
# 设置转向命令
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST
msg.vel_des = [vel_x, vel_y, vel_z] # [x, y, z]y=0表示不侧移
msg.rpy_des = [rpy_x, rpy_y, rpy_z]
msg.duration = 0 # 连续运动
msg.step_height = [0.03, 0.03]
msg.life_count += 1
ctrl.Send_cmd(msg)
# 根据转弯方向调整持续时间假设半圆需要3秒
if xy_flag:
go_to_xy(ctrl, msg, gotoxy[i][0], gotoxy[i][1], speed=0.5, observe=True)
# 打印当前方向
print(f"开始 {text[i]} ")
print(f"{i}",'角度为',ctrl.odo_msg.rpy[2])
print(f"{i}",'x为',ctrl.odo_msg.xyz[0])
print(f"{i}",'y为',ctrl.odo_msg.xyz[1])
print(f"{i}",'z为',ctrl.odo_msg.xyz[2])
time.sleep(t) # 持续3秒
# 在第四个圆弧完成后索引为7获取箭头检测结果
if i == 7: # 最后一个弯道结束后
info("完成第四个圆弧,开始检测箭头方向", "箭头检测")
# 给异步检测一些时间来完成
wait_time = 0
max_wait = 2.0 # 最多等待2秒
while wait_time < max_wait:
# 获取异步检测结果
result, result_time, last_image = arrow_detector.get_last_result()
if result is not None and result != "unknown":
arrow_direction = result
left_count, right_count = arrow_detector.get_counts()
info(f"成功检测到箭头方向: {arrow_direction} (left: {left_count}, right: {right_count})", "箭头检测")
break
time.sleep(0.1)
wait_time += 0.1
# 如果未检测到或结果为unknown尝试在当前图像上再次检测
if arrow_direction is None or arrow_direction == "unknown":
info("异步检测未得到确定结果,尝试在当前图像上直接检测", "箭头检测")
# 获取当前图像
image = ctrl.image_processor.get_current_image()
if image is not None:
# 直接在当前图像上检测
arrow_direction = detect_arrow_direction(image, observe=False)
# 更新计数
if arrow_direction == "left":
arrow_detector.left_count += 1
elif arrow_direction == "right":
arrow_detector.right_count += 1
# 根据总计数确定最终方向
left_count, right_count = arrow_detector.get_counts()
if left_count > right_count:
arrow_direction = "left"
elif right_count > left_count:
arrow_direction = "right"
info(f"直接检测到箭头方向: {arrow_direction} (left: {left_count}, right: {right_count})", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"logs/image/arrow_detection_final_{timestamp}.jpg"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# visualize_arrow_detection(image, save_path=save_path)
info(f"最终箭头检测可视化结果已保存至: {save_path}", "箭头检测")
else:
warning("无法获取当前图像,箭头方向检测失败", "箭头检测")
arrow_direction = "unknown"
# 停止异步箭头检测
if arrow_detection_started:
arrow_detector.stop_detection()
print('角度为',ctrl.odo_msg.rpy[2])
print('x为',ctrl.odo_msg.xyz[0])
print('y为',ctrl.odo_msg.xyz[1])
print('z为',ctrl.odo_msg.xyz[2])
# 输出最终计数结果
left_count, right_count = arrow_detector.get_counts()
info(f"箭头检测最终结果: left={left_count}, right={right_count}, 选择={arrow_direction}", "箭头检测结果")
# 返回检测到的箭头方向
return arrow_direction
def run_task_2_back(ctrl, msg,xy_flag=False):
turn_degree_v2(ctrl, msg, 0, absolute=True)
go_straight(ctrl, msg, distance=0.9, speed=0.20, mode=11, gait_id=3, step_height=[0.21, 0.21])
go_to_xy(ctrl, msg, 0.72, -11.1, speed=0.5, observe=True)
print(ctrl.odo_msg.xyz[0], 'ctrl.odo_msg.xyz[0]')
print(ctrl.odo_msg.xyz[1], 'ctrl.odo_msg.xyz[1]')
print(ctrl.odo_msg.rpy[2], 'ctrl.odo_msg.rpy[2]')
turn_degree_v2(ctrl, msg, -106.5, absolute=True)
print('角度为',ctrl.odo_msg.rpy[2])
print('x为',ctrl.odo_msg.xyz[0])
print('y为',ctrl.odo_msg.xyz[1])
print('z为',ctrl.odo_msg.xyz[2])
text=["直线走一段","第一个圆弧","微调路径","第二个圆弧","微调路径","第三个圆弧","微调路径","第四个圆弧","绕出来","前走一点"]
# 连续三重弯道行走
directions = [
(0.3,0,0,0,0,0,0.6),
#第四个圆弧
(0.4,0,0.69,0,0,0,3),
#微调路径
(0.3,0,0,0,0,0,1.3),
# 第三个圆弧
(0.4,0,-0.73,0,0,0,6),
#微调路径
(0.3,0,0,0,0,0,1.4),
#第二个圆弧
(0.4,0,0.74,0,0,0,7.4),
#微调路径
(0.3,0,0,0,0,0,1.2),
#第一个圆弧
(0.45, 0, -0.78,0,0,0,5.8),
#直线走一段
(0.4,0.05,0,0,0,0,0.3),
(0.4,0,0,0,0,0,1.5),
] # [vel_x, vel_z] 对应 [左转, 右转, 左转]
gotoxy=[
(0.76,-11),
(0.71,-11),
(1.12,-12),
(1.49,-12.1),
(1.1,-13.5),
(0.74,-13.3),
(0.68,-14.5),
(0.97,-14.3),
(1.69,-15.5),
(1.59,-15.6),
]
for i, (vel_x, vel_y, vel_z ,rpy_x,rpy_y,rpy_z,t) in enumerate(directions):
# 设置转向命令
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST
msg.vel_des = [vel_x, vel_y, vel_z] # [x, y, z]y=0表示不侧移
msg.rpy_des = [rpy_x, rpy_y, rpy_z]
msg.duration = 0 # 连续运动
msg.step_height = [0.03, 0.03]
msg.life_count += 1
ctrl.Send_cmd(msg)
if xy_flag:
go_to_xy(ctrl, msg, gotoxy[i][0], gotoxy[i][1], speed=0.5, observe=True)
print('角度为',ctrl.odo_msg.rpy[2])
print('x为',ctrl.odo_msg.xyz[0])
print('y为',ctrl.odo_msg.xyz[1])
print('z为',ctrl.odo_msg.xyz[2])
# 打印当前方向
print(f"开始 {text[i]} ")
# 根据转弯方向调整持续时间假设半圆需要3秒
time.sleep(t) # 持续3秒