feat(robot): add odometry functionality and update control logic

- Add odometry thread and message handler for localization data
- Update main control loop to incorporate odometry information
- Remove task 5 execution from main function
- Refactor task 5 implementation to be more modular
This commit is contained in:
havoc420ubuntu 2025-05-13 10:12:57 +00:00
parent 8914565e7c
commit fb95c8ae45
4 changed files with 106 additions and 75 deletions

View File

@ -188,6 +188,7 @@ class Robot_Ctrl(object):
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start() ###启动里程计
print("odo 启动---")
self.laser_thread.start()
def msg_handler(self, channel, data):
@ -199,6 +200,7 @@ class Robot_Ctrl(object):
def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用
self.odo_msg = localization_lcmt().decode(data)
print(self.odo_msg.timestamp, self.odo_msg.xyz, self.odo_msg.vxyz, self.odo_msg.rpy, self.odo_msg.vBody)
def rec_responce(self):
while self.runing:
@ -214,10 +216,10 @@ class Robot_Ctrl(object):
try:
self.send_lock.acquire()
if mode == 0: # 石子路+上下坡
steps = toml.load("/home/mi/Downloads/123/Gait_Params_walk.toml")
steps = toml.load("./Gait_Params_walk.toml")
elif mode == 1: # 待定
steps = toml.load(
"/home/mi/Downloads/123/Gait_Params_scopewalk.toml")
"./Gait_Params_scopewalk.toml")
full_steps = {'step': [robot_cmd]}
k = 0
for i in steps['step']:
@ -247,10 +249,10 @@ class Robot_Ctrl(object):
full_steps['step'].append(cmd)
k = k + 1
if mode == 0: # 石子路+上下坡
f = open("/home/mi/Downloads/123/Gait_Params_walk_full.toml",
f = open("./Gait_Params_walk_full.toml",
'w')
elif mode == 1: # 待定
f = open("/home/mi/Downloads/123/Gait_Params_scopewalk_full.toml",
f = open("./Gait_Params_scopewalk_full.toml",
'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
@ -258,14 +260,14 @@ class Robot_Ctrl(object):
if mode == 0: # 石子路+上下坡
file_obj_gait_def = open(
"/home/mi/Downloads/123/Gait_Def_walk.toml", 'r')
"./Gait_Def_walk.toml", 'r')
file_obj_gait_params = open(
"/home/mi/Downloads/123/Gait_Params_walk_full.toml", 'r')
"./Gait_Params_walk_full.toml", 'r')
elif mode == 1: # 待定
file_obj_gait_def = open(
"/home/mi/Downloads/123/Gait_Def_scopewalk.toml", 'r')
"./Gait_Def_scopewalk.toml", 'r')
file_obj_gait_params = open(
"/home/mi/Downloads/123/Gait_Params_scopewalk_full.toml", 'r')
"./Gait_Params_scopewalk_full.toml", 'r')
self.usergait_msg.data = file_obj_gait_def.read()
self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode())
@ -278,10 +280,10 @@ class Robot_Ctrl(object):
file_obj_gait_params.close()
if mode == 0: # 石子路+上下坡
user_gait_list = open(
"/home/mi/Downloads/123/Usergait_List.toml", 'r')
"./Usergait_List.toml", 'r')
if mode == 1: # 石子路+上下坡
user_gait_list = open(
"/home/mi/Downloads/123/Usergait_List.toml", 'r')
"./Usergait_List.toml", 'r')
steps = toml.load(user_gait_list)
for step in steps['step']:
self.cmd_msg.mode = step['mode']

18
main.py
View File

@ -13,6 +13,7 @@ import sys
from utils.robot_control_cmd_lcmt import robot_control_cmd_lcmt
from utils.robot_control_response_lcmt import robot_control_response_lcmt
from utils.localization_lcmt import localization_lcmt
from utils.image_raw import ImageProcessor
from task_1.task_1 import run_task_1
@ -36,7 +37,7 @@ def main():
# run_task_1(Ctrl, msg, image_processor)
run_task_5(Ctrl, msg)
# run_task_5(Ctrl, msg)
# time.sleep(100)
@ -56,10 +57,13 @@ class Robot_Ctrl(object):
def __init__(self):
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
@ -68,8 +72,10 @@ class Robot_Ctrl(object):
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o)
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start()
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
@ -78,11 +84,21 @@ class Robot_Ctrl(object):
else:
self.mode_ok = 0
def msg_handler_o(self, channel, data):
self.odo_msg = localization_lcmt().decode(data)
if self.odo_msg.timestamp % 100 == 0:
print(self.odo_msg.rpy)
def rec_responce(self):
while self.runing:
self.lc_r.handle()
time.sleep(0.002)
def rec_responce_o(self):
while self.runing:
self.lc_o.handle()
time.sleep(0.002)
def Wait_finish(self, mode, gait_id):
count = 0
while self.runing and count < 2000: # 10s

View File

@ -8,67 +8,4 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from task_5.detect_arrow_direction import ArrowDetector
def run_task_5(ctrl, msg, image_processor=None):
# 初始化箭头检测器
detector = ArrowDetector() if image_processor is None else ArrowDetector(image_processor)
try:
# 设置俯身姿态
msg.mode = 3 # 姿态控制模式
msg.gait_id = 0
msg.pos_des = [0, 0, 0.15] # 设置较低的姿态高度
msg.life_count += 1
ctrl.Send_cmd(msg)
ctrl.Wait_finish(3, msg.gait_id)
# 等待姿态稳定
time.sleep(1.0)
# 等待获取图像和检测箭头方向
print("正在检测箭头方向...")
time.sleep(2.0) # 给检测器一些时间来获取图像
# 检测箭头方向
direction = detector.get_arrow_direction()
print(f"检测到的箭头方向: {direction}")
# 保存检测结果的可视化图像
detector.visualize_current_detection("arrow_detection_result.jpg")
# 根据箭头方向决定移动方向
vel_x = 0.5 # 前进速度
vel_y = 0.0 # 初始侧向速度为0
if direction == "left":
# 如果是左箭头,向左移动
vel_y = 0.3 # 向左移动的速度
print("根据箭头方向,向左移动")
elif direction == "right":
# 如果是右箭头,向右移动
vel_y = -0.3 # 向右移动的速度
print("根据箭头方向,向右移动")
else:
# 如果无法确定方向,直接前进
print("无法确定箭头方向,直接前进")
# 开始运动
msg.mode = 11 # 运动控制模式
msg.gait_id = 3 # 使用 trot 步态
msg.vel_des = [vel_x, vel_y, 0] # 设置移动速度x和y方向
msg.duration = 3000 # 运动持续3秒
msg.step_height = [0.03, 0.03] # 设置步高
msg.life_count += 1
ctrl.Send_cmd(msg)
ctrl.Wait_finish(11, msg.gait_id)
# 恢复站立姿态
msg.mode = 3
msg.gait_id = 0
msg.pos_des = [0, 0, 0.25] # 恢复到正常站立高度
msg.life_count += 1
ctrl.Send_cmd(msg)
ctrl.Wait_finish(3, msg.gait_id)
finally:
# 清理资源
detector.destroy()
pass

View File

@ -0,0 +1,76 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
from io import BytesIO
import struct
class localization_lcmt(object):
__slots__ = ["xyz", "vxyz", "rpy", "omegaBody", "vBody", "timestamp"]
__typenames__ = ["float", "float", "float", "float", "float", "int64_t"]
__dimensions__ = [[3], [3], [3], [3], [3], None]
def __init__(self):
self.xyz = [ 0.0 for dim0 in range(3) ] # 三维世界坐标
self.vxyz = [ 0.0 for dim0 in range(3) ] # 线速度
self.rpy = [ 0.0 for dim0 in range(3) ] # 设备方向
self.omegaBody = [ 0.0 for dim0 in range(3) ]
self.vBody = [ 0.0 for dim0 in range(3) ]
self.timestamp = 0
def encode(self):
buf = BytesIO()
buf.write(localization_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack('>3f', *self.xyz[:3]))
buf.write(struct.pack('>3f', *self.vxyz[:3]))
buf.write(struct.pack('>3f', *self.rpy[:3]))
buf.write(struct.pack('>3f', *self.omegaBody[:3]))
buf.write(struct.pack('>3f', *self.vBody[:3]))
buf.write(struct.pack(">q", self.timestamp))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != localization_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return localization_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = localization_lcmt()
self.xyz = struct.unpack('>3f', buf.read(12))
self.vxyz = struct.unpack('>3f', buf.read(12))
self.rpy = struct.unpack('>3f', buf.read(12))
self.omegaBody = struct.unpack('>3f', buf.read(12))
self.vBody = struct.unpack('>3f', buf.read(12))
self.timestamp = struct.unpack(">q", buf.read(8))[0]
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if localization_lcmt in parents: return 0
tmphash = (0x7e246f0371a27d89) & 0xffffffffffffffff
tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if localization_lcmt._packed_fingerprint is None:
localization_lcmt._packed_fingerprint = struct.pack(">Q", localization_lcmt._get_hash_recursive([]))
return localization_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", localization_lcmt._get_packed_fingerprint())[0]