mi-task/task_2/xiesi2.py
2025-08-21 22:12:07 +08:00

369 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import sys
import os
import math
import threading
import cv2
import queue
from threading import Thread, Lock
from datetime import datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from base_move.go_to_xy import go_to_xy, go_to_xy_v2
from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection
from base_move.move_base_hori_line import (
arc_turn_around_hori_line,
go_straight_by_hori_line,
move_to_hori_line,
align_to_horizontal_line
)
# 导入低头匍匐步态模块
from task_2.crawl_gait import (
low_crawl_gait,
transition_to_crawl_position,
recover_from_crawl_position,
run_low_crawl_demo
)
# 创建本模块特定的日志记录器
logger = get_logger("任务2-5")
observe = False
# 异步箭头检测器类
class AsyncArrowDetector:
def __init__(self, image_processor):
"""
初始化异步箭头检测器
参数:
image_processor: ImageProcessor实例
"""
self.image_processor = image_processor
self.detection_thread = None
self.running = False
self.lock = Lock()
self.arrow_result = None
self.result_time = 0
self.last_processed_image = None
# 用于保存可视化结果
self.save_dir = "logs/image"
os.makedirs(self.save_dir, exist_ok=True)
# 添加左右方向计数器
self.left_count = 0
self.right_count = 0
info("异步箭头检测器已初始化", "初始化")
def start_detection(self, interval=0.5):
"""
启动异步箭头检测
参数:
interval: 检测间隔,单位秒
"""
if self.detection_thread is not None and self.detection_thread.is_alive():
info("异步箭头检测已经在运行中", "警告")
return
self.running = True
self.detection_thread = Thread(target=self._detection_worker, args=(interval,))
self.detection_thread.daemon = True
self.detection_thread.start()
info("启动异步箭头检测线程", "启动")
def stop_detection(self):
"""停止异步箭头检测"""
self.running = False
if self.detection_thread and self.detection_thread.is_alive():
self.detection_thread.join(timeout=1.0)
info("异步箭头检测线程已停止", "停止")
def _detection_worker(self, interval):
"""异步箭头检测工作线程"""
last_detection_time = 0
while self.running:
current_time = time.time()
# 按指定间隔检测
if current_time - last_detection_time >= interval:
img = self.image_processor.get_current_image('ai')
if img is not None:
try:
# 保存最后处理的图像
self.last_processed_image = img.copy()
# 检测箭头方向
direction = detect_arrow_direction(img)
with self.lock:
self.arrow_result = direction
self.result_time = current_time
# 更新方向计数
if direction == "left":
self.left_count += 1
elif direction == "right":
self.right_count += 1
if direction != "unknown":
info(f"异步检测到{direction}箭头 (left: {self.left_count}, right: {self.right_count})", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"{self.save_dir}/arrow_detection_{timestamp}.jpg"
# visualize_arrow_detection(img, save_path=save_path)
info(f"箭头检测可视化结果已保存至: {save_path}", "箭头检测")
except Exception as e:
error(f"异步箭头检测出错: {str(e)}", "错误")
last_detection_time = current_time
# 短暂休眠避免占用过多CPU
time.sleep(0.05)
def get_last_result(self):
"""获取最后一次成功的箭头检测结果和计数情况"""
with self.lock:
# 根据计数确定最终方向
final_direction = self.arrow_result
if self.left_count > self.right_count:
final_direction = "left"
elif self.right_count > self.left_count:
final_direction = "right"
return final_direction, self.result_time, self.last_processed_image
def get_counts(self):
"""获取当前的左右方向计数"""
with self.lock:
return self.left_count, self.right_count
def xiesi2(ctrl, msg):
xy_flag=False
# qreader = QReader()
try:
#x=-0.118246,y=0.011020,z=0.066921,yaw=-0.000436
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
ctrl.Send_cmd(msg)
ctrl.Wait_finish(12, 0)
msg.mode = 11
msg.gait_id = 3
msg.step_height = [0.06, 0.06]
msg.vel_des = [0.1,0.0,0.0]
msg.duration = 2000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(5)
###################### S 弯 ###########################
# msg.vel_des = [0.3, 0.02, 0.05] #转向
# msg.duration = 3000 # Zero duration means continuous motion until a new command is used.
# # Continuous motion can interrupt non-zero duration interpolation motion
# msg.life_count += 1
# ctrl.Send_cmd(msg)
# time.sleep( 5 )
msg.vel_des = [0.3, 0, 0.42]
msg.duration = 10300
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 15 )
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 200
# msg.life_count += 1
# ctrl.Send_cmd(msg)
# time.sleep( 2 )
msg.vel_des = [0.3, 0, -0.43]
msg.duration = 12200
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 15 )
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 100
# msg.life_count += 1
# ctrl.Send_cmd(msg)
# time.sleep( 2 )
msg.vel_des = [0.3, 0, 0.43]
msg.duration = 10200
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 15 )
msg.vel_des = [0.3, 0, -0.5]
msg.duration = 3500
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 10 )
arrow_direction = None
# 创建异步箭头检测器
arrow_detector = AsyncArrowDetector(ctrl.image_processor)
# 在进入第四个圆弧前索引为7启动异步箭头检测
arrow_detection_started = False
info("完成第四个圆弧,开始检测箭头方向", "箭头检测")
# 给异步检测一些时间来完成
wait_time = 0
max_wait = 2.0 # 最多等待2秒
while wait_time < max_wait:
# 获取异步检测结果
result, result_time, last_image = arrow_detector.get_last_result()
if result is not None and result != "unknown":
arrow_direction = result
left_count, right_count = arrow_detector.get_counts()
info(f"成功检测到箭头方向: {arrow_direction} (left: {left_count}, right: {right_count})", "箭头检测")
break
time.sleep(0.1)
wait_time += 0.1
# 如果未检测到或结果为unknown尝试在当前图像上再次检测
if arrow_direction is None or arrow_direction == "unknown":
info("异步检测未得到确定结果,尝试在当前图像上直接检测", "箭头检测")
# 获取当前图像
image = ctrl.image_processor.get_current_image('ai')
if image is not None:
# 直接在当前图像上检测
arrow_direction = detect_arrow_direction(image, observe=False)
# 更新计数
if arrow_direction == "left":
arrow_detector.left_count += 1
elif arrow_direction == "right":
arrow_detector.right_count += 1
# 根据总计数确定最终方向
left_count, right_count = arrow_detector.get_counts()
if left_count > right_count:
arrow_direction = "left"
elif right_count > left_count:
arrow_direction = "right"
info(f"直接检测到箭头方向: {arrow_direction} (left: {left_count}, right: {right_count})", "箭头检测")
# 保存检测结果的可视化图像
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"logs/image/arrow_detection_final_{timestamp}.jpg"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# visualize_arrow_detection(image, save_path=save_path)
info(f"最终箭头检测可视化结果已保存至: {save_path}", "箭头检测")
else:
warning("无法获取当前图像,箭头方向检测失败", "箭头检测")
arrow_direction = "unknown"
# 停止异步箭头检测
if arrow_detection_started:
arrow_detector.stop_detection()
print('角度为',ctrl.odo_msg.rpy[2])
print('x为',ctrl.odo_msg.xyz[0])
print('y为',ctrl.odo_msg.xyz[1])
print('z为',ctrl.odo_msg.xyz[2])
# 输出最终计数结果
left_count, right_count = arrow_detector.get_counts()
info(f"箭头检测最终结果: left={left_count}, right={right_count}, 选择={arrow_direction}", "箭头检测结果")
# 返回检测到的箭头方向
msg.vel_des = [0, 0.3, 0]
msg.duration = 1000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 10 )
msg.vel_des = [0.3, 0, 0]
msg.duration = 3000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep( 10 )
return arrow_direction
###################### S 弯 ###########################
except KeyboardInterrupt:
pass
ctrl.quit()
sys.exit()
def xiesi2_back(ctrl, msg):
try:
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
ctrl.Send_cmd(msg)
ctrl.Wait_finish(12, 0)
msg.mode = 11
msg.gait_id = 3
msg.step_height = [0.06, 0.06]
msg.vel_des = [0.2, 0, 0]
msg.duration = 2500
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(4)
msg.vel_des = [0.3,0,0.41]
msg.duration = 4000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(5)
msg.vel_des = [0.3,0,-0.43]
msg.duration = 10300
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(12)
msg.vel_des = [0.3,0,0.44]
msg.duration = 12000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(14)
msg.vel_des = [0.3,0,-0.44]
msg.duration = 11000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(13)
msg.vel_des = [0.3,0,0]
msg.duration = 2000
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(3)
except KeyboardInterrupt:
pass
ctrl.quit()
sys.exit()