mi-task/main.py

397 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
This demo show the communication interface of MR813 motion control board based on Lcm.
Dependency:
- robot_control_cmd_lcmt.py
- robot_control_response_lcmt.py
'''
import lcm
import sys
import os
import time
from threading import Thread, Lock
import sys
import rclpy
from utils.robot_control_cmd_lcmt import robot_control_cmd_lcmt
from utils.robot_control_response_lcmt import robot_control_response_lcmt
from utils.localization_lcmt import localization_lcmt
from utils.image_raw import ImageProcessor
from utils.base_msg import BaseMsg
# from utils.marker_client import MarkerRunner
from task_1.task_1 import run_task_1
from task_2_5.task_2_5 import run_task_2_5
from task_3.task_3 import run_task_3, run_task_3_back
from task_1.task_1 import run_task_1, run_task_1_back
from task_2.task_2 import run_task_2, run_task_2_back
from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back
from task_4.task_4 import run_task_4, run_task_4_back
from task_test.task_left_line import run_task_test
from task_5.task_5 import run_task_5
from base_move.turn_degree import turn_degree, turn_degree_v2
from base_move.center_on_dual_tracks import center_on_dual_tracks
from base_move.go_to_xy import go_to_x_v2, go_to_y_v2
from utils.log_helper import info
# 新增:导入配置文件
from config import get_config, SYSTEM_CONFIG
# 新增导入single-test测试模块
try:
from single_test.test_main import main_menu as test_main_menu
from single_test.test_main import run_interactive_test_selection
SINGLE_TEST_AVAILABLE = get_config('SYSTEM', 'ENABLE_SINGLE_TEST', True)
except ImportError:
print("警告single-test模块未找到测试功能将不可用")
SINGLE_TEST_AVAILABLE = False
pass_marker = True
TIME_SLEEP = get_config('SYSTEM', 'TIME_SLEEP', 5000)
def print_main_menu():
"""打印主程序菜单"""
print("\n" + "="*70)
print(" 机器狗控制系统")
print("="*70)
print("🎯 运行模式选择:")
print(" 1. 完整任务模式 - 运行完整的任务1-5流程")
print(" 2. 测试模式 - 进入子运动函数测试")
print(" 3. 交互式测试 - 快速测试单个功能")
print(" 0. 退出程序")
print("="*70)
print("💡 提示:")
print(" - 模式1执行完整比赛任务流程")
print(" - 模式2菜单式测试各种子运动函数")
print(" - 模式3快速命令行测试界面")
print("="*70)
def run_complete_task_mode():
"""运行完整任务模式原main函数逻辑"""
rclpy.init()
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
info("Recovery stand", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop() # BUG 垃圾指令 for eat
run_task_1(Ctrl, msg, time_sleep=TIME_SLEEP)
arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
info(f"识别到箭头方向: {arrow_direction}", "info")
info(f"识别到箭头方向: {arrow_direction}", "info")
run_task_2_5(Ctrl, msg, direction=arrow_direction)
if arrow_direction == 'left':
run_task_4(Ctrl, msg)
else:
run_task_3(Ctrl, msg, time_sleep=TIME_SLEEP)
turn_degree_v2(Ctrl, msg, degree=90, absolute=True)
run_task_5(Ctrl, msg, direction=arrow_direction, time_sleep=TIME_SLEEP)
if arrow_direction == 'left':
run_task_3_back(Ctrl, msg, time_sleep=TIME_SLEEP)
else:
run_task_4_back(Ctrl, msg)
run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
run_task_2_back(Ctrl, msg, xy_flag=False)
run_task_1_back(Ctrl, msg, time_sleep=TIME_SLEEP)
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"发生错误: {e}")
finally:
print("正在清理资源...")
Ctrl.quit()
rclpy.shutdown()
print("程序已退出")
def run_test_mode():
"""运行测试模式"""
if not SINGLE_TEST_AVAILABLE:
print("❌ 测试模块不可用请检查single-test目录是否存在")
return
rclpy.init()
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
info("初始化测试环境", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop()
# 进入测试主菜单
test_main_menu(Ctrl, msg)
except KeyboardInterrupt:
print("\n测试被用户中断")
except Exception as e:
print(f"测试过程中发生错误: {e}")
finally:
print("正在清理测试环境...")
Ctrl.quit()
rclpy.shutdown()
print("测试环境已清理")
def run_interactive_test_mode():
"""运行交互式测试模式"""
if not SINGLE_TEST_AVAILABLE:
print("❌ 测试模块不可用请检查single-test目录是否存在")
return
rclpy.init()
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
info("初始化交互式测试环境", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop()
# 进入交互式测试
run_interactive_test_selection(Ctrl, msg)
except KeyboardInterrupt:
print("\n交互式测试被用户中断")
except Exception as e:
print(f"交互式测试过程中发生错误: {e}")
finally:
print("正在清理交互式测试环境...")
Ctrl.quit()
rclpy.shutdown()
print("交互式测试环境已清理")
def main():
"""主函数 - 支持模式选择"""
while True:
print_main_menu()
try:
choice = input("请选择运行模式 (输入编号): ").strip()
if choice == '0':
print("👋 感谢使用机器狗控制系统,再见!")
break
elif choice == '1':
print("🚀 启动完整任务模式...")
run_complete_task_mode()
print("\n✅ 完整任务模式执行完毕")
input("按回车键返回主菜单...")
elif choice == '2':
print("🔧 启动测试模式...")
run_test_mode()
print("\n✅ 测试模式执行完毕")
input("按回车键返回主菜单...")
elif choice == '3':
print("⚡ 启动交互式测试模式...")
run_interactive_test_mode()
print("\n✅ 交互式测试模式执行完毕")
input("按回车键返回主菜单...")
else:
print("❌ 无效的选择,请输入 0-3 之间的数字")
input("按回车键继续...")
except KeyboardInterrupt:
print("\n👋 程序被用户中断,再见!")
break
except Exception as e:
print(f"❌ 发生错误: {e}")
input("按回车键继续...")
# 保留原来的main函数作为legacy_main以防需要
def legacy_main():
"""原始的main函数保留作为备用"""
rclpy.init()
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
info("Recovery stand", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop() # BUG 垃圾指令 for eat
run_task_1(Ctrl, msg, time_sleep=TIME_SLEEP)
arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
info(f"识别到箭头方向: {arrow_direction}", "info")
info(f"识别到箭头方向: {arrow_direction}", "info")
run_task_2_5(Ctrl, msg, direction=arrow_direction)
if arrow_direction == 'left':
run_task_4(Ctrl, msg)
else:
run_task_3(Ctrl, msg, time_sleep=TIME_SLEEP)
turn_degree_v2(Ctrl, msg, degree=90, absolute=True)
run_task_5(Ctrl, msg, direction=arrow_direction, time_sleep=TIME_SLEEP)
if arrow_direction == 'left':
run_task_3_back(Ctrl, msg, time_sleep=TIME_SLEEP)
else:
run_task_4_back(Ctrl, msg)
run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
run_task_2_back(Ctrl, msg, xy_flag=False)
run_task_1_back(Ctrl, msg, time_sleep=TIME_SLEEP)
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"发生错误: {e}")
finally:
print("正在清理资源...")
Ctrl.quit()
rclpy.shutdown()
print("程序已退出")
sys.exit()
class Robot_Ctrl(object):
def __init__(self):
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") # 控制指令
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") # 状态反馈
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") # 里程计
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
self.image_processor = ImageProcessor()
# DEBUG
# self.marker_runner = MarkerRunner(pass_flag=pass_marker)
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
# 新增: 校准相关变量
self.is_calibrated = False # 是否已完成校准
self.calibration_offset = [0, 0, 0] # 校准偏移量
# 新增: 基础消息
self.base_msg = BaseMsg(self, self.cmd_msg)
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o)
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start()
self.image_processor.run()
# self.marker_runner.run()
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
if self.rec_msg.order_process_bar >= 95:
self.mode_ok = self.rec_msg.mode
else:
self.mode_ok = 0
def reset_offset(self):
self.is_calibrated = False
def msg_handler_o(self, channel, data):
self.odo_msg = localization_lcmt().decode(data)
# 如果尚未校准,记录第一帧数据作为校准基准
if not self.is_calibrated:
self.calibration_offset = self.odo_msg.xyz
self.is_calibrated = True
print(f"校准完成,基准值: {self.calibration_offset}")
# 将接收到的 odo 数据减去校准基准值
self.odo_msg.xyz = [
self.odo_msg.xyz[0] - self.calibration_offset[0],
self.odo_msg.xyz[1] - self.calibration_offset[1],
self.odo_msg.xyz[2] - self.calibration_offset[2]
]
# if self.odo_msg.timestamp % 100 == 0:
# print(self.odo_msg.xyz, self.odo_msg.rpy, self.odo_msg.vxyz, self.odo_msg.omegaBody, self.odo_msg.vBody)
def odo_reset(self):
self.calibration_offset = self.odo_msg.xyz
def rec_responce(self):
while self.runing:
self.lc_r.handle()
time.sleep(0.002)
def rec_responce_o(self):
while self.runing:
self.lc_o.handle()
time.sleep(0.002)
def Wait_finish(self, mode, gait_id):
count = 0
while self.runing and count < 2000: # 10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
print("Wait_finish timeout.")
return False
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ
self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep(0.005)
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.cmd_msg.life_count %= 127
self.send_lock.release()
def place_marker(self, x, y, z, color, observe=False):
return None
return None
"""调用 MarkerRunner 放置标志物"""
if self.marker_runner is None or self.marker_runner.marker_client is None:
print("MarkerRunner 未初始化,无法放置标志物")
return None
try:
response = self.marker_runner.send_request(x, y, z, color, observe=observe)
print(f"放置标志物结果: {response.success}, 消息: {response.message}")
return response
except Exception as e:
print(f"放置标志物时发生异常: {e}")
return None
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
self.image_processor.destroy()
# 销毁 MarkerRunner
if hasattr(self, 'marker_runner') and self.marker_runner is not None:
try:
self.marker_runner.destroy()
except Exception as e:
print(f"MarkerRunner 销毁失败: {e}")
# Main function
if __name__ == '__main__':
main()