mi-task/task_3/task_3.py
havoc420ubuntu 7b106c03dc refactor(task_3): update gait processing and enhance control logic
- Refactored gait processing in main.py to load parameters from Gait_Params_up.toml and publish user gait files.
- Improved control logic in task_3.py to monitor Z-axis speed and detect climbing phases.
- Added functionality for straight movement after task completion.
- Cleaned up commented-out code and ensured consistent message handling.
2025-05-26 06:11:09 +00:00

223 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import sys
import os
import toml
import copy
import math
import lcm
# 添加父目录到路径以便能够导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 添加当前目录到路径确保可以找到local文件
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from base_move.turn_degree import turn_degree
from base_move.go_straight import go_straight
from file_send_lcmt import file_send_lcmt
# 创建本模块特定的日志记录器
logger = get_logger("任务3")
observe = True
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
def load_gait_file(file_path):
"""加载步态文件"""
try:
with open(file_path, 'r') as f:
return f.read()
except Exception as e:
error(f"加载步态文件失败: {e}", "文件")
return None
def run_task_3(ctrl, msg):
section('任务3步态切换', "启动")
info('开始执行任务3...', "启动")
turn_degree(ctrl, msg, 90, absolute=True)
turn_degree(ctrl, msg, 90, absolute=True)
usergait_msg = file_send_lcmt()
lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
try:
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k =0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
f = open("./task_3/Gait_Params_up_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
usergait_msg.data = file_obj_gait_params.read()
lcm_usergait.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
file_obj_gait_params.close()
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 10 # 连续5次检测z轴不再增加则认为已经停止
z_speed_threshold = 0.01 # z轴速度阈值小于这个值认为已经停止爬升
climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值
max_iterations = 600 # 最大循环次数,作为安全保障
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
# 阶段控制
climbing_detected = False # 是否检测到正在爬坡
initial_flat_stage = True # 是否在初始平路阶段
info(f"开始监测里程计Z轴速度初始高度: {start_height}", "监测")
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
# 每10次迭代打印一次当前信息
if i % 10 == 0:
# 获取当前Z轴位置和速度
current_vz = ctrl.odo_msg.vxyz[2] # z轴速度
info(f"当前Z轴速度={current_vz:.3f}", "监测")
# 获取z轴速度
vz = ctrl.odo_msg.vxyz[2]
# 检测是否开始爬坡阶段 - 使用z轴速度判断
if not climbing_detected and vz > climb_speed_threshold:
climbing_detected = True
initial_flat_stage = False
info(f"检测到开始爬坡Z轴速度: {vz:.3f}, 当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "监测")
# 只有在检测到爬坡后才开始监控Z轴是否停止增加
if climbing_detected:
# 如果Z轴速度接近于0或者为负表示已经停止爬升或开始下降
if abs(vz) < z_speed_threshold or vz < 0:
stable_count += 1
if stable_count >= stable_threshold:
current_height = ctrl.odo_msg.xyz[2]
info(f"Z轴速度趋近于0停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
break
else:
# 如果Z轴仍有明显上升速度重置稳定计数
stable_count = 0
time.sleep(0.2)
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass
section('任务3-2直线行走', "开始")
go_straight(ctrl, msg, distance=1)
section('任务3-3down', "完成")
try:
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 10 # 连续5次检测z轴不再增加则认为已经停止
z_speed_threshold = 0.01 # z轴速度阈值小于这个值认为已经停止爬升
climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值
max_iterations = 600 # 最大循环次数,作为安全保障
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
# 阶段控制
climbing_detected = False # 是否检测到正在爬坡
initial_flat_stage = True # 是否在初始平路阶段
info(f"开始监测里程计Z轴速度初始高度: {start_height}", "监测")
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
# 每10次迭代打印一次当前信息
if i % 10 == 0:
# 获取当前Z轴位置和速度
current_vz = ctrl.odo_msg.vxyz[2] # z轴速度
info(f"当前Z轴速度={current_vz:.3f}", "监测")
# 获取z轴速度
vz = ctrl.odo_msg.vxyz[2]
# 检测是否开始爬坡阶段 - 使用z轴速度判断
if not climbing_detected and vz < -climb_speed_threshold:
climbing_detected = True
initial_flat_stage = False
info(f"检测到开始爬坡Z轴速度: {vz:.3f}, 当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "监测")
# 只有在检测到爬坡后才开始监控Z轴是否停止增加
if climbing_detected:
# 如果Z轴速度接近于0或者为正表示已经停止爬升或开始下降
if abs(vz) < z_speed_threshold or vz > 0:
stable_count += 1
if stable_count >= stable_threshold:
current_height = ctrl.odo_msg.xyz[2]
info(f"Z轴速度趋近于0停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
break
else:
# 如果Z轴仍有明显上升速度重置稳定计数
stable_count = 0
time.sleep(0.2)
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass