mi-task/task_3/task_3.py

278 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import sys
import os
import toml
import copy
import math
import lcm
# 添加父目录到路径以便能够导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 添加当前目录到路径确保可以找到local文件
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from base_move.turn_degree import turn_degree, turn_degree_v2
from base_move.go_straight import go_straight
from file_send_lcmt import file_send_lcmt
# 创建本模块特定的日志记录器
logger = get_logger("任务3")
observe = True
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
def run_task_3(ctrl, msg):
section('任务3步态切换', "启动")
info('开始执行任务3...', "启动")
turn_degree_v2(ctrl, msg, 90, absolute=True)
usergait_msg = file_send_lcmt()
lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
try:
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k =0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
f = open("./task_3/Gait_Params_up_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
# pre
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
usergait_msg.data = file_obj_gait_def.read()
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 10 # 连续15次检测z轴不再增加则认为已经停止
z_speed_threshold = 0.01 # z轴速度阈值小于这个值认为已经停止爬升
climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值
max_iterations = 600 # 最大循环次数,作为安全保障
min_iterations = 200 # 最小循环次数,作为安全保障
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
# 阶段控制
climbing_detected = False # 是否检测到正在爬坡
info(f"开始监测里程计Z轴速度初始高度: {start_height}", "监测")
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
# 每10次迭代打印一次当前信息
if i % 10 == 0:
# 获取当前Z轴位置和速度
current_vz = ctrl.odo_msg.vxyz[2] # z轴速度
info(f"当前Z轴速度={current_vz:.3f}", "监测")
# 获取z轴速度
vz = ctrl.odo_msg.vxyz[2]
# 检测是否开始爬坡阶段 - 使用z轴速度判断
if not climbing_detected and vz > climb_speed_threshold:
climbing_detected = True
info(f"检测到开始爬坡Z轴速度: {vz:.3f}, 当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "监测")
# 只有在检测到爬坡后才开始监控Z轴是否停止增加
if i > min_iterations and climbing_detected:
# 如果Z轴速度接近于0或者为负表示已经停止爬升或开始下降
if abs(vz) < z_speed_threshold or vz < 0:
stable_count += 1
if stable_count >= stable_threshold:
current_height = ctrl.odo_msg.xyz[2]
info(f"Z轴速度趋近于0停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
break
else:
# 如果Z轴仍有明显上升速度重置稳定计数
stable_count = 0
time.sleep(0.2)
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass
section('任务3-2直线行走', "开始")
msg.mode = 11 # Locomotion模式
msg.gait_id = 26 # 自变频步态
msg.duration = 0 # wait next cmd
msg.step_height = [0.06, 0.06] # 抬腿高度
msg.vel_des = [0, 0.2, 0] # [前进速度, 侧向速度, 角速度]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.3)
return
section('任务3-3down', "完成")
try:
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
f = open("./task_3/Gait_Params_up_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
# pre
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
usergait_msg.data = file_obj_gait_def.read()
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
usergait_msg.data = file_obj_gait_params.read()
lcm_usergait.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
file_obj_gait_params.close()
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 8 # 连续10次检测z轴速度接近零则认为已经到达平地
z_speed_threshold = 0.01 # z轴速度阈值小于这个值认为已经停止下降
descent_speed_threshold = -0.05 # 检测到开始下坡的速度阈值(负值表示下降)
max_iterations = 600 # 最大循环次数,作为安全保障
min_iterations = 100 # 最小循环次数,确保有足够的时间开始动作
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
# 阶段控制
descending_detected = False # 是否检测到正在下坡
flat_ground_detected = False # 是否检测到已到达平地
info(f"开始监测下坡过程,初始高度: {start_height}", "监测")
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
# 获取z轴速度和当前高度
vz = ctrl.odo_msg.vxyz[2]
current_height = ctrl.odo_msg.xyz[2]
# 每10次迭代打印一次当前信息
if observe and i % 10 == 0:
info(f"当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}", "监测")
# 检测是否开始下坡阶段 - 使用z轴速度判断负值表示下降
if not descending_detected and vz < descent_speed_threshold:
descending_detected = True
info(f"检测到开始下坡Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
# 只有在检测到下坡后,才开始监控是否到达平地
if i > min_iterations and descending_detected:
# 如果Z轴速度接近于0表示已经停止下降到达平地
if abs(vz) < z_speed_threshold:
stable_count += 1
if stable_count >= stable_threshold:
info(f"检测到已到达平地Z轴速度趋近于0停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}, 下降了: {start_height - current_height:.3f}", "监测")
flat_ground_detected = True
break
else:
# 如果Z轴仍有明显下降速度重置稳定计数
stable_count = 0
time.sleep(0.2)
if not flat_ground_detected:
info(f"达到最大循环次数,未能明确检测到到达平地。当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "警告")
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass