mi-task/turndemo.py
2025-08-21 11:25:38 +08:00

334 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
This demo show the communication interface of MR813 motion control board based on Lcm.
Dependency:
- robot_control_cmd_lcmt.py
- robot_control_response_lcmt.py
'''
# sudo chown -R $USER:$USER /home/mi-task/
import lcm
import sys
import os
import time
from threading import Thread, Lock
import sys
import string
from datetime import datetime
import rclpy
import cv2
from utils.robot_control_cmd_lcmt import robot_control_cmd_lcmt
from utils.robot_control_response_lcmt import robot_control_response_lcmt
from utils.localization_lcmt import localization_lcmt
from utils.image_raw import ImageProcessor
from utils.base_msg import BaseMsg
from utils.speech_demo import speak
# from utils.marker_client import MarkerRunner
from task_1.task_1 import run_task_1, run_task_1_back
from task_2.task_2 import run_task_2, run_task_2_back, run_task_2_demo
from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back
from task_3.task_3 import run_task_3, run_task_3_back
from task_4.task_4 import run_task_4, run_task_4_back
from task_5.task_5 import run_task_5
from base_move.turn_degree import turn_degree_v2
from base_move.go_to_xy import go_to_x_v2, go_to_y_v2
from utils.log_helper import info
pass_marker = True
TIME_SLEEP = 3000 # TODO 比赛时改成 5000
from enum import Enum, auto
class TaskType(Enum):
TASK = auto() # 默认任务
PASS_BAR = auto() # TODO 测试低头高度;可以修改代码,和 RED-BAR 一块测试。
YELLOW_LIGHT = auto() # TODO 设定 yellow threshold
RED_BAR = auto() # TODO 设定 red threshold
UP_AND_DOWN = auto() # TODO 调整其完成从上坡到下坡
STONE_ROAD = auto() # TODO 调整其刚好走完石板路,不浪费时间。目前估测 distance 就是 4m
#
MOVE_TO_LINE = auto() # TODO 直走逼近直线测试
CENTER_ON_DUAL_TRACKS = auto() # TODO 双轨道居中测试
TASK = ''
def main():
rclpy.init() # 新增:在主程序中统一初始化 ROS 2 上下文
Ctrl = Robot_Ctrl()
Ctrl.run()
print('1')
msg = robot_control_cmd_lcmt()
print('2')
try:
# TEST
# print('yuyin')
# cv_image = Ctrl.image_processor.get_current_image('ai')
# print('111')
# cv2.imwrite(f"saved_images/firstai.jpg", cv_image)
# print('try out')
# speak('nihao')
# TEST
# go_straight_with_enhanced_calibration(Ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# pass_up_down(Ctrl, msg)
# pass_bar(Ctrl, msg)
# run_task_2(Ctrl, msg)
# INFO Real Task
# TAG task - 0
info("Recovery stand", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop() # BUG 垃圾指令 for eat # INFO 但是正式比赛或许也有用
# image = Ctrl.image_processor.get_current_image('ai')
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
# filename = f"saved_images/rgb_{timestamp}.jpg"
# cv2.imwrite(filename, image)
# if TASK == TaskType.PASS_BAR:
# from task_4.pass_bar import pass_bar
# # TEST #1: pass-bar
# pass_bar(Ctrl, msg)
# elif TASK == TaskType.YELLOW_LIGHT: # TODO image
# from task_3.task_3 import go_until_yellow_area
# turn_degree_v2(Ctrl, msg, degree=-90, absolute=True)
# go_until_yellow_area(Ctrl, msg)
# elif TASK == TaskType.RED_BAR:
# from task_4.task_4 import go_straight_until_red_bar
# go_straight_until_red_bar(Ctrl, msg)
# elif TASK == TaskType.UP_AND_DOWN:
# from task_3.task_3 import go_straight_with_enhanced_calibration
# go_straight_with_enhanced_calibration(Ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# elif TASK == TaskType.STONE_ROAD:
# from task_3.task_3 import pass_stone
# pass_stone(Ctrl, msg, distance = 4, observe=False)
# elif TASK == TaskType.MOVE_TO_LINE:
# from base_move.move_base_hori_line import move_to_hori_line
# move_to_hori_line(Ctrl, msg, target_distance = 1.1, observe=False)
# elif TASK == TaskType.CENTER_ON_DUAL_TRACKS:
# from base_move.center_on_dual_tracks import center_on_dual_tracks
# center_on_dual_tracks(Ctrl, msg, max_deviation=10.0, observe=False, detect_height=0.3)
# else:
# pass
# if TASK != TaskType.TASK:
# # 如果不是 task 类型,直接返回
# return
# TAG task - 1
# run_task_1(Ctrl, msg, time_sleep=TIME_SLEEP)
# TAG task - 2
# arrow_direction = run_task_2_demo(Ctrl, msg, xy_flag=False) # TEST
# arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
# print('🏹 arrow_direction: ', arrow_direction)
arrow_direction='left'
# TAG task - 2.5
# run_task_2_5(Ctrl, msg, direction=arrow_direction)
# TAG task - 3 / 4 - part I
# if arrow_direction == 'left':
# run_task_4(Ctrl, msg)
# else:
# run_task_3(Ctrl, msg, time_sleep=TIME_SLEEP) # 直接上下坡,然后走到 黄灯前原地踏步的部分
# TAG task - 5
turn_degree_v2(Ctrl, msg, degree=90, absolute=True)
info("90", "info")
time.sleep(2)
turn_degree_v2(Ctrl, msg, degree=90, absolute=True)
info("90", "info")
time.sleep(2)
turn_degree_v2(Ctrl, msg, degree=180, absolute=True)
info("180", "info")
time.sleep(2)
turn_degree_v2(Ctrl, msg, degree=-30, absolute=True)
info("-30", "info")
#success, qr=run_task_5(Ctrl, msg, direction=arrow_direction, observe=True, time_sleep=TIME_SLEEP) # B区任务
#print(success)
#print(qr)
# TAG task - 3 / 4 - part II
# if arrow_direction == 'left':
# run_task_4_back(Ctrl, msg)
# else:
# run_task_3_back(Ctrl, msg)
# TAG task - 2.5 - back
# run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
# TAG task - 2 - back
# run_task_2_back(Ctrl, msg)
# TAG task - 1 - back
# run_task_1_back(Ctrl, msg, time_sleep=TIME_SLEEP)
print('🏁 Task finished.')
try:
while rclpy.ok():
executor.spin_once(timeout_sec=0.1)
if getattr(node, '_shutdown_flag', False):
break
except KeyboardInterrupt:
pass
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
import traceback
import sys
exc_type, exc_value, exc_tb = sys.exc_info()
tb = traceback.extract_tb(exc_tb)
if tb:
last_call = tb[-1]
print(f"发生错误: {e}(文件: {last_call.filename}, 行号: {last_call.lineno}, 函数: {last_call.name}")
else:
print(f"发生错误: {e}")
traceback.print_exc()
finally:
print("正在清理资源...")
Ctrl.quit()
print('3')
rclpy.shutdown() # 新增:在主程序中统一销毁 ROS 2 上下文
print("程序已退出")
sys.exit()
class Robot_Ctrl(object):
def __init__(self):
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
#self.image_processor = ImageProcessor()
# DEBUG
# self.marker_runner = MarkerRunner(pass_flag=pass_marker)
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
# 新增: 校准相关变量
self.is_calibrated = False # 是否已完成校准
self.calibration_offset = [0, 0, 0] # 校准偏移量
# 新增: 基础消息
self.base_msg = BaseMsg(self, self.cmd_msg)
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o)
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start()
#self.image_processor.run()
#self.marker_runner.run()
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
if self.rec_msg.order_process_bar >= 95:
self.mode_ok = self.rec_msg.mode
else:
self.mode_ok = 0
def msg_handler_o(self, channel, data):
self.odo_msg = localization_lcmt().decode(data)
# 如果尚未校准,记录第一帧数据作为校准基准
if not self.is_calibrated:
self.calibration_offset = self.odo_msg.xyz
self.is_calibrated = True
print(f"校准完成,基准值: {self.calibration_offset}")
# 将接收到的 odo 数据减去校准基准值
self.odo_msg.xyz = [
self.odo_msg.xyz[0] - self.calibration_offset[0],
self.odo_msg.xyz[1] - self.calibration_offset[1],
self.odo_msg.xyz[2] - self.calibration_offset[2]
]
# if self.odo_msg.timestamp % 100 == 0:
# print(self.odo_msg.xyz, self.odo_msg.rpy, self.odo_msg.vxyz, self.odo_msg.omegaBody, self.odo_msg.vBody)
def odo_reset(self):
self.calibration_offset = self.odo_msg.xyz
def rec_responce(self):
while self.runing:
self.lc_r.handle()
time.sleep(0.002)
def rec_responce_o(self):
while self.runing:
self.lc_o.handle()
time.sleep(0.002)
def Wait_finish(self, mode, gait_id):
count = 0
while self.runing and count < 2000: # 10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
print("Wait_finish timeout.")
return False
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ
self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep(0.005)
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.cmd_msg.life_count %= 127
self.send_lock.release()
def place_marker(self, x, y, z, color, observe=False):
return None
"""调用 MarkerRunner 放置标志物"""
if self.marker_runner is None or self.marker_runner.marker_client is None:
print("MarkerRunner 未初始化,无法放置标志物")
return None
try:
response = self.marker_runner.send_request(x, y, z, color, observe=observe)
print(f"放置标志物结果: {response.success}, 消息: {response.message}")
return response
except Exception as e:
print(f"放置标志物时发生异常: {e}")
return None
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
self.image_processor.destroy()
# 销毁 MarkerRunner
if hasattr(self, 'marker_runner') and self.marker_runner is not None:
try:
self.marker_runner.destroy()
except Exception as e:
print(f"MarkerRunner 销毁失败: {e}")
# Main function
if __name__ == '__main__':
main()