重构任务4和任务5,更新任务流程和参数
- 在main.py中注释掉run_task_2函数,确保任务执行流程的简化。 - 在task_4.py中添加go_straight_until_sky_ratio_below函数以实现基于灰色天空比例的直线移动。 - 更新README.md以反映任务4和任务5的最新描述,合并相关内容。 - 删除task_5中的多个不再使用的文件,优化代码结构。
This commit is contained in:
parent
7517ce61f0
commit
c2b29f18f3
@ -42,18 +42,16 @@ RGB 摄像机所在位置的确定:
|
||||
|
||||
以及反方向。
|
||||
|
||||
## Task-4:石板路
|
||||
## Task-4:石板路 & 过栅栏
|
||||
|
||||

|
||||

|
||||

|
||||
|
||||
## Task-5:过栅栏
|
||||

|
||||
|
||||
## Task-5.5:走向卸货
|
||||
## Task-5:走向卸货 & 卸货
|
||||
从上一个赛道结束到 B 二维码。
|
||||
|
||||
## Task-6:卸货
|
||||
也可能在另一边。
|
||||
这里是感觉在走过去的过程中就能判断二维码。
|
||||
|
||||
|
4
main.py
4
main.py
@ -43,11 +43,11 @@ def main():
|
||||
# time.sleep(100) # TEST
|
||||
# run_task_1(Ctrl, msg)
|
||||
|
||||
run_task_2(Ctrl, msg)
|
||||
# run_task_2(Ctrl, msg)
|
||||
|
||||
# run_task_2_5(Ctrl, msg)
|
||||
|
||||
# run_task_4(Ctrl, msg)
|
||||
run_task_4(Ctrl, msg)
|
||||
|
||||
# run_task_test(Ctrl, msg)
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
98
task_5/main.py → task_4/pass_bar.py
Executable file → Normal file
98
task_5/main.py → task_4/pass_bar.py
Executable file → Normal file
@ -1,20 +1,26 @@
|
||||
'''
|
||||
This demo show the communication interface of MR813 motion control board based on Lcm
|
||||
- robot_control_cmd_lcmt.py
|
||||
- file_send_lcmt.py
|
||||
- Gait_Def_moonwalk.toml
|
||||
- Gait_Params_moonwalk.toml
|
||||
- Usergait_List.toml
|
||||
'''
|
||||
import lcm
|
||||
import sys
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import toml
|
||||
import copy
|
||||
import math
|
||||
from robot_control_cmd_lcmt import robot_control_cmd_lcmt
|
||||
import lcm
|
||||
|
||||
# 添加父目录到路径,以便能够导入utils
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
# 添加当前目录到路径,确保可以找到local文件
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
||||
from base_move.turn_degree import turn_degree
|
||||
from base_move.go_straight import go_straight
|
||||
from file_send_lcmt import file_send_lcmt
|
||||
|
||||
# 创建本模块特定的日志记录器
|
||||
logger = get_logger("任务3")
|
||||
|
||||
observe = True
|
||||
|
||||
robot_cmd = {
|
||||
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
|
||||
'vel_des':[0.0, 0.0, 0.0],
|
||||
@ -25,15 +31,24 @@ robot_cmd = {
|
||||
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
'step_height':[0.0, 0.0],
|
||||
'value':0, 'duration':0
|
||||
}
|
||||
}
|
||||
|
||||
def main():
|
||||
lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
|
||||
lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
|
||||
def run_task_5(ctrl, msg):
|
||||
"""
|
||||
俯身通过一个栅栏
|
||||
"""
|
||||
section('任务5:步态切换', "启动")
|
||||
info('开始执行任务5...', "启动")
|
||||
|
||||
turn_degree(ctrl, msg, 90, absolute=True)
|
||||
|
||||
section('任务5-1:切换步态', "启动")
|
||||
# 切换步态
|
||||
usergait_msg = file_send_lcmt()
|
||||
cmd_msg = robot_control_cmd_lcmt()
|
||||
lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
|
||||
|
||||
try:
|
||||
steps = toml.load("Gait_Params_up.toml")
|
||||
steps = toml.load("./task_5/Gait_Params_up.toml")
|
||||
full_steps = {'step':[robot_cmd]}
|
||||
k =0
|
||||
for i in steps['step']:
|
||||
@ -60,13 +75,14 @@ def main():
|
||||
else:
|
||||
full_steps['step'].append(cmd)
|
||||
k=k+1
|
||||
f = open("Gait_Params_up_full.toml", 'w')
|
||||
f = open("./task_5/Gait_Params_up_full.toml", 'w')
|
||||
f.write("# Gait Params\n")
|
||||
f.writelines(toml.dumps(full_steps))
|
||||
f.close()
|
||||
|
||||
file_obj_gait_def = open("Gait_Def_up.toml",'r')
|
||||
file_obj_gait_params = open("Gait_Params_up_full.toml",'r')
|
||||
# pre
|
||||
file_obj_gait_def = open("./task_5/Gait_Def_up.toml",'r')
|
||||
file_obj_gait_params = open("./task_5/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_def.read()
|
||||
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
@ -76,23 +92,27 @@ def main():
|
||||
file_obj_gait_def.close()
|
||||
file_obj_gait_params.close()
|
||||
|
||||
cmd_msg.mode = 62
|
||||
cmd_msg.value = 0
|
||||
cmd_msg.contact = 15
|
||||
cmd_msg.gait_id = 110
|
||||
cmd_msg.duration = 1000
|
||||
cmd_msg.life_count += 1
|
||||
for i in range(325): #15s Heat beat It is used to maintain the heartbeat when life count is not updated
|
||||
lcm_cmd.publish("robot_control_cmd",cmd_msg.encode())
|
||||
time.sleep( 0.2 )
|
||||
except KeyboardInterrupt:
|
||||
cmd_msg.mode = 7 #PureDamper before KeyboardInterrupt:
|
||||
cmd_msg.gait_id = 0
|
||||
cmd_msg.duration = 0
|
||||
cmd_msg.life_count += 1
|
||||
lcm_cmd.publish("robot_control_cmd",cmd_msg.encode())
|
||||
pass
|
||||
sys.exit()
|
||||
file_obj_gait_params = open("./task_5/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_params.read()
|
||||
lcm_usergait.publish("user_gait_file", usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
file_obj_gait_params.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
msg.mode = 62
|
||||
msg.value = 0
|
||||
msg.contact = 15
|
||||
msg.gait_id = 110
|
||||
msg.duration = 1000
|
||||
msg.life_count += 1
|
||||
|
||||
for i in range(50):
|
||||
ctrl.Send_cmd(msg)
|
||||
time.sleep(0.2)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
msg.mode = 7 #PureDamper before KeyboardInterrupt:
|
||||
msg.gait_id = 0
|
||||
msg.duration = 0
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
pass
|
@ -1,6 +1,8 @@
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# 添加父目录到路径,以便能够导入utils
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
@ -8,9 +10,10 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from base_move.turn_degree import turn_degree
|
||||
from base_move.go_straight import go_straight
|
||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
||||
from utils.gray_sky_analyzer import analyze_gray_sky_ratio
|
||||
|
||||
# 创建本模块特定的日志记录器
|
||||
logger = get_logger("任务5")
|
||||
logger = get_logger("任务4")
|
||||
|
||||
def run_task_4(ctrl, msg):
|
||||
"""
|
||||
@ -20,9 +23,91 @@ def run_task_4(ctrl, msg):
|
||||
image_processor: 可选的图像处理器实例
|
||||
"""
|
||||
|
||||
turn_degree(ctrl, msg, 90, absolute=90)
|
||||
section('任务4-1:直线移动', "移动")
|
||||
go_straight(ctrl, msg, distance=6)
|
||||
|
||||
section('任务4-2:移动直到灰色天空比例小于阈值', "天空检测")
|
||||
go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2)
|
||||
|
||||
time.sleep(100)
|
||||
|
||||
# go_straight(ctrl, msg, distance=6)
|
||||
def go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2, step_distance=0.5, max_distance=10, speed=0.3):
|
||||
"""
|
||||
控制机器人沿直线行走,直到灰色天空比例小于指定阈值
|
||||
|
||||
参数:
|
||||
ctrl: Robot_Ctrl对象
|
||||
msg: 控制命令消息对象
|
||||
sky_ratio_threshold: 灰色天空比例阈值,当检测到的比例小于此值时停止
|
||||
step_distance: 每次移动的步长(米)
|
||||
max_distance: 最大移动距离(米),防止无限前进
|
||||
speed: 移动速度(米/秒)
|
||||
|
||||
返回:
|
||||
bool: 是否成功找到天空比例小于阈值的位置
|
||||
"""
|
||||
total_distance = 0
|
||||
success_flag = False
|
||||
|
||||
# 设置移动命令
|
||||
msg.mode = 11 # Locomotion模式
|
||||
msg.gait_id = 26 # 自变频步态
|
||||
msg.step_height = [0.06, 0.06] # 抬腿高度
|
||||
|
||||
while total_distance < max_distance:
|
||||
# 获取当前图像
|
||||
current_image = ctrl.image_processor.get_current_image()
|
||||
if current_image is None:
|
||||
warning("无法获取图像,等待...", "图像")
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
# 保存当前图像用于分析
|
||||
temp_image_path = "/tmp/current_sky_image.jpg"
|
||||
cv2.imwrite(temp_image_path, current_image)
|
||||
|
||||
# 分析灰色天空比例
|
||||
try:
|
||||
sky_ratio = analyze_gray_sky_ratio(temp_image_path)
|
||||
info(f"当前灰色天空比例: {sky_ratio:.2%}", "分析")
|
||||
|
||||
# 如果天空比例小于阈值,停止移动
|
||||
if sky_ratio < sky_ratio_threshold:
|
||||
success(f"检测到灰色天空比例({sky_ratio:.2%})小于阈值({sky_ratio_threshold:.2%}),停止移动", "完成")
|
||||
success_flag = True
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
error(f"分析图像时出错: {e}", "错误")
|
||||
|
||||
# 继续前进一段距离
|
||||
info(f"继续前进 {step_distance} 米...", "移动")
|
||||
|
||||
# 设置移动速度和方向
|
||||
msg.vel_des = [speed, 0, 0] # [前进速度, 侧向速度, 角速度]
|
||||
msg.duration = 0 # wait next cmd
|
||||
msg.life_count += 1
|
||||
|
||||
# 发送命令
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
# 估算前进时间
|
||||
move_time = step_distance / speed
|
||||
time.sleep(move_time)
|
||||
|
||||
# 累计移动距离
|
||||
total_distance += step_distance
|
||||
info(f"已移动总距离: {total_distance:.2f} 米", "距离")
|
||||
|
||||
# 平滑停止
|
||||
if hasattr(ctrl.base_msg, 'stop_smooth'):
|
||||
ctrl.base_msg.stop_smooth()
|
||||
else:
|
||||
ctrl.base_msg.stop()
|
||||
|
||||
if not success_flag and total_distance >= max_distance:
|
||||
warning(f"已达到最大移动距离 {max_distance} 米,但未找到天空比例小于 {sky_ratio_threshold:.2%} 的位置", "超时")
|
||||
|
||||
return success_flag
|
||||
|
||||
|
||||
|
||||
|
@ -1,44 +0,0 @@
|
||||
[[step]]
|
||||
acc_des = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
contact = 0
|
||||
ctrl_point = [0.0, 0.0, 0.0]
|
||||
duration = 0 # Expected execution time of Position interpolation control, For recovery stand need > 5.0S
|
||||
foot_pose = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
gait_id = 0
|
||||
life_count = 0 #Fake value
|
||||
mode = 12
|
||||
pos_des = [0.0, 0.0, 0.0]
|
||||
rpy_des = [0.0, 0.0, 0.0]
|
||||
step_height = [0.0, 0.0]
|
||||
value = 0
|
||||
vel_des = [0.0, 0.0, 0.0]
|
||||
|
||||
[[step]]
|
||||
acc_des = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
contact = 15
|
||||
ctrl_point = [0.0, 0.0, 0.0]
|
||||
duration = 0
|
||||
foot_pose = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
gait_id = 110
|
||||
life_count = 0
|
||||
mode = 62 # User define gait
|
||||
pos_des = [0.0, 0.0, 0.0]
|
||||
rpy_des = [0.0, 0.0, 0.0]
|
||||
step_height = [0.0, 0.0]
|
||||
value = 0
|
||||
vel_des = [0.0, 0.0, 0.0] # velocity of x y yaw
|
||||
|
||||
# [[step]]
|
||||
# acc_des = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
# contact = 0
|
||||
# ctrl_point = [0.0, 0.0, 0.0]
|
||||
# duration = 1000
|
||||
# foot_pose = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
# gait_id = 1 #采用受控趴下
|
||||
# life_count = 0
|
||||
# mode = 7 #Puredamper
|
||||
# pos_des = [0.0, 0.0, 0.0]
|
||||
# rpy_des = [0.0, 0.0, 0.0]
|
||||
# step_height = [0.0, 0.0]
|
||||
# value = 0
|
||||
# vel_des = [0.0, 0.0, 0.0]
|
283
task_5/task_3.py
283
task_5/task_3.py
@ -1,283 +0,0 @@
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import toml
|
||||
import copy
|
||||
import math
|
||||
import lcm
|
||||
|
||||
# 添加父目录到路径,以便能够导入utils
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
# 添加当前目录到路径,确保可以找到local文件
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
||||
from base_move.turn_degree import turn_degree
|
||||
from base_move.go_straight import go_straight
|
||||
from file_send_lcmt import file_send_lcmt
|
||||
|
||||
# 创建本模块特定的日志记录器
|
||||
logger = get_logger("任务3")
|
||||
|
||||
observe = True
|
||||
|
||||
robot_cmd = {
|
||||
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
|
||||
'vel_des':[0.0, 0.0, 0.0],
|
||||
'rpy_des':[0.0, 0.0, 0.0],
|
||||
'pos_des':[0.0, 0.0, 0.0],
|
||||
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
'ctrl_point':[0.0, 0.0, 0.0],
|
||||
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
'step_height':[0.0, 0.0],
|
||||
'value':0, 'duration':0
|
||||
}
|
||||
|
||||
def run_task_3(ctrl, msg):
|
||||
section('任务3:步态切换', "启动")
|
||||
info('开始执行任务3...', "启动")
|
||||
|
||||
turn_degree(ctrl, msg, 90, absolute=True)
|
||||
turn_degree(ctrl, msg, 90, absolute=True)
|
||||
|
||||
usergait_msg = file_send_lcmt()
|
||||
lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
|
||||
|
||||
try:
|
||||
steps = toml.load("./task_3/Gait_Params_up.toml")
|
||||
full_steps = {'step':[robot_cmd]}
|
||||
k =0
|
||||
for i in steps['step']:
|
||||
cmd = copy.deepcopy(robot_cmd)
|
||||
cmd['duration'] = i['duration']
|
||||
if i['type'] == 'usergait':
|
||||
cmd['mode'] = 11 # LOCOMOTION
|
||||
cmd['gait_id'] = 110 # USERGAIT
|
||||
cmd['vel_des'] = i['body_vel_des']
|
||||
cmd['rpy_des'] = i['body_pos_des'][0:3]
|
||||
cmd['pos_des'] = i['body_pos_des'][3:6]
|
||||
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
|
||||
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
|
||||
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
|
||||
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
|
||||
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
|
||||
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
|
||||
cmd['acc_des'] = i['weight']
|
||||
cmd['value'] = i['use_mpc_traj']
|
||||
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
|
||||
cmd['ctrl_point'][2] = i['mu']
|
||||
if k == 0:
|
||||
full_steps['step'] = [cmd]
|
||||
else:
|
||||
full_steps['step'].append(cmd)
|
||||
k=k+1
|
||||
f = open("./task_3/Gait_Params_up_full.toml", 'w')
|
||||
f.write("# Gait Params\n")
|
||||
f.writelines(toml.dumps(full_steps))
|
||||
f.close()
|
||||
|
||||
# pre
|
||||
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
|
||||
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_def.read()
|
||||
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
usergait_msg.data = file_obj_gait_params.read()
|
||||
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
|
||||
time.sleep(0.1)
|
||||
file_obj_gait_def.close()
|
||||
file_obj_gait_params.close()
|
||||
|
||||
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_params.read()
|
||||
lcm_usergait.publish("user_gait_file", usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
file_obj_gait_params.close()
|
||||
|
||||
msg.mode = 62
|
||||
msg.value = 0
|
||||
msg.contact = 15
|
||||
msg.gait_id = 110
|
||||
msg.duration = 1000
|
||||
msg.life_count += 1
|
||||
|
||||
# 参数设置
|
||||
stable_count = 0 # 用于计数z轴稳定的次数
|
||||
stable_threshold = 10 # 连续15次检测z轴不再增加则认为已经停止
|
||||
z_speed_threshold = 0.01 # z轴速度阈值,小于这个值认为已经停止爬升
|
||||
climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值
|
||||
max_iterations = 600 # 最大循环次数,作为安全保障
|
||||
min_iterations = 200 # 最小循环次数,作为安全保障
|
||||
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
|
||||
|
||||
# 阶段控制
|
||||
climbing_detected = False # 是否检测到正在爬坡
|
||||
|
||||
info(f"开始监测里程计Z轴速度,初始高度: {start_height}", "监测")
|
||||
|
||||
for i in range(max_iterations):
|
||||
# 发送控制命令维持心跳
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
# 每10次迭代打印一次当前信息
|
||||
if i % 10 == 0:
|
||||
# 获取当前Z轴位置和速度
|
||||
current_vz = ctrl.odo_msg.vxyz[2] # z轴速度
|
||||
info(f"当前Z轴速度={current_vz:.3f}", "监测")
|
||||
|
||||
# 获取z轴速度
|
||||
vz = ctrl.odo_msg.vxyz[2]
|
||||
|
||||
# 检测是否开始爬坡阶段 - 使用z轴速度判断
|
||||
if not climbing_detected and vz > climb_speed_threshold:
|
||||
climbing_detected = True
|
||||
info(f"检测到开始爬坡,Z轴速度: {vz:.3f}, 当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "监测")
|
||||
|
||||
# 只有在检测到爬坡后,才开始监控Z轴是否停止增加
|
||||
if i > min_iterations and climbing_detected:
|
||||
# 如果Z轴速度接近于0或者为负,表示已经停止爬升或开始下降
|
||||
if abs(vz) < z_speed_threshold or vz < 0:
|
||||
stable_count += 1
|
||||
if stable_count >= stable_threshold:
|
||||
current_height = ctrl.odo_msg.xyz[2]
|
||||
info(f"Z轴速度趋近于0,停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
|
||||
break
|
||||
else:
|
||||
# 如果Z轴仍有明显上升速度,重置稳定计数
|
||||
stable_count = 0
|
||||
|
||||
time.sleep(0.2)
|
||||
except KeyboardInterrupt:
|
||||
msg.mode = 7 #PureDamper before KeyboardInterrupt:
|
||||
msg.gait_id = 0
|
||||
msg.duration = 0
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
pass
|
||||
|
||||
section('任务3-2:直线行走', "开始")
|
||||
# go_straight(ctrl, msg, distance=1)
|
||||
msg.mode = 11 # Locomotion模式
|
||||
msg.gait_id = 26 # 自变频步态
|
||||
msg.duration = 0 # wait next cmd
|
||||
msg.step_height = [0.06, 0.06] # 抬腿高度
|
||||
msg.vel_des = [0, 0.2, 0] # [前进速度, 侧向速度, 角速度]
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
time.sleep(0.3)
|
||||
|
||||
section('任务3-3:down', "完成")
|
||||
try:
|
||||
steps = toml.load("./task_3/Gait_Params_up.toml")
|
||||
full_steps = {'step':[robot_cmd]}
|
||||
k = 0
|
||||
for i in steps['step']:
|
||||
cmd = copy.deepcopy(robot_cmd)
|
||||
cmd['duration'] = i['duration']
|
||||
if i['type'] == 'usergait':
|
||||
cmd['mode'] = 11 # LOCOMOTION
|
||||
cmd['gait_id'] = 110 # USERGAIT
|
||||
cmd['vel_des'] = i['body_vel_des']
|
||||
cmd['rpy_des'] = i['body_pos_des'][0:3]
|
||||
cmd['pos_des'] = i['body_pos_des'][3:6]
|
||||
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
|
||||
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
|
||||
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
|
||||
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
|
||||
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
|
||||
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
|
||||
cmd['acc_des'] = i['weight']
|
||||
cmd['value'] = i['use_mpc_traj']
|
||||
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
|
||||
cmd['ctrl_point'][2] = i['mu']
|
||||
if k == 0:
|
||||
full_steps['step'] = [cmd]
|
||||
else:
|
||||
full_steps['step'].append(cmd)
|
||||
k=k+1
|
||||
f = open("./task_3/Gait_Params_up_full.toml", 'w')
|
||||
f.write("# Gait Params\n")
|
||||
f.writelines(toml.dumps(full_steps))
|
||||
f.close()
|
||||
|
||||
# pre
|
||||
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
|
||||
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_def.read()
|
||||
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
usergait_msg.data = file_obj_gait_params.read()
|
||||
lcm_usergait.publish("user_gait_file",usergait_msg.encode())
|
||||
time.sleep(0.1)
|
||||
file_obj_gait_def.close()
|
||||
file_obj_gait_params.close()
|
||||
|
||||
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
|
||||
usergait_msg.data = file_obj_gait_params.read()
|
||||
lcm_usergait.publish("user_gait_file", usergait_msg.encode())
|
||||
time.sleep(0.5)
|
||||
file_obj_gait_params.close()
|
||||
|
||||
msg.mode = 62
|
||||
msg.value = 0
|
||||
msg.contact = 15
|
||||
msg.gait_id = 110
|
||||
msg.duration = 1000
|
||||
msg.life_count += 1
|
||||
|
||||
# 参数设置
|
||||
stable_count = 0 # 用于计数z轴稳定的次数
|
||||
stable_threshold = 8 # 连续10次检测z轴速度接近零则认为已经到达平地
|
||||
z_speed_threshold = 0.01 # z轴速度阈值,小于这个值认为已经停止下降
|
||||
descent_speed_threshold = -0.05 # 检测到开始下坡的速度阈值(负值表示下降)
|
||||
max_iterations = 600 # 最大循环次数,作为安全保障
|
||||
min_iterations = 100 # 最小循环次数,确保有足够的时间开始动作
|
||||
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
|
||||
|
||||
# 阶段控制
|
||||
descending_detected = False # 是否检测到正在下坡
|
||||
flat_ground_detected = False # 是否检测到已到达平地
|
||||
|
||||
info(f"开始监测下坡过程,初始高度: {start_height}", "监测")
|
||||
|
||||
for i in range(max_iterations):
|
||||
# 发送控制命令维持心跳
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
# 获取z轴速度和当前高度
|
||||
vz = ctrl.odo_msg.vxyz[2]
|
||||
current_height = ctrl.odo_msg.xyz[2]
|
||||
|
||||
# 每10次迭代打印一次当前信息
|
||||
if observe and i % 10 == 0:
|
||||
info(f"当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}", "监测")
|
||||
|
||||
# 检测是否开始下坡阶段 - 使用z轴速度判断(负值表示下降)
|
||||
if not descending_detected and vz < descent_speed_threshold:
|
||||
descending_detected = True
|
||||
info(f"检测到开始下坡,Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}", "监测")
|
||||
|
||||
# 只有在检测到下坡后,才开始监控是否到达平地
|
||||
if i > min_iterations and descending_detected:
|
||||
# 如果Z轴速度接近于0,表示已经停止下降(到达平地)
|
||||
if abs(vz) < z_speed_threshold:
|
||||
stable_count += 1
|
||||
if stable_count >= stable_threshold:
|
||||
info(f"检测到已到达平地,Z轴速度趋近于0,停止循环。当前速度: {vz:.3f}, 当前高度: {current_height:.3f}, 下降了: {start_height - current_height:.3f}米", "监测")
|
||||
flat_ground_detected = True
|
||||
break
|
||||
else:
|
||||
# 如果Z轴仍有明显下降速度,重置稳定计数
|
||||
stable_count = 0
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
if not flat_ground_detected:
|
||||
info(f"达到最大循环次数,未能明确检测到到达平地。当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "警告")
|
||||
except KeyboardInterrupt:
|
||||
msg.mode = 7 #PureDamper before KeyboardInterrupt:
|
||||
msg.gait_id = 0
|
||||
msg.duration = 0
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
pass
|
239
task_5/task_5.py
Normal file
239
task_5/task_5.py
Normal file
@ -0,0 +1,239 @@
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
import math
|
||||
|
||||
# 添加父目录到路径,以便能够导入utils
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from base_move.turn_degree import turn_degree
|
||||
from base_move.go_straight import go_straight
|
||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
||||
from utils.gray_sky_analyzer import analyze_gray_sky_ratio
|
||||
from base_move.move_base_hori_line import detect_horizontal_track_edge, detect_horizontal_track_edge_v2, detect_horizontal_track_edge_v3, calculate_distance_to_line
|
||||
|
||||
|
||||
def go_straight_to_horizontal_line_with_qr(ctrl, msg, target_distance=0.5, speed=0.5,
|
||||
max_distance=10, detect_func_version=2,
|
||||
qr_check_interval=0.3, observe=False):
|
||||
"""
|
||||
控制机器人直线行走,直到与横向线距离为指定值,同时识别路径上的二维码
|
||||
|
||||
参数:
|
||||
ctrl: Robot_Ctrl 对象,包含里程计信息
|
||||
msg: robot_control_cmd_lcmt 对象,用于发送命令
|
||||
target_distance: 与横向线的目标距离(米),默认为0.5米
|
||||
speed: 行走速度(米/秒),默认为0.5米/秒
|
||||
max_distance: 最大行走距离(米),超过此距离将停止,默认为10米
|
||||
detect_func_version: 检测横线的函数版本,默认为2
|
||||
qr_check_interval: 检查二维码的时间间隔(秒),默认为0.3秒
|
||||
observe: 是否输出中间状态信息,默认为False
|
||||
|
||||
返回:
|
||||
tuple: (是否成功, 二维码结果, 额外信息字典)
|
||||
"""
|
||||
# 返回结果字典,包含过程中的状态信息
|
||||
res = {
|
||||
'qr_result': None,
|
||||
'success': False,
|
||||
'distance_moved': 0,
|
||||
'target_reached': False
|
||||
}
|
||||
|
||||
# 启动异步QR码扫描
|
||||
qr_result = None
|
||||
try:
|
||||
ctrl.image_processor.start_async_scan(interval=qr_check_interval)
|
||||
if observe:
|
||||
info("已启动异步QR码扫描", "扫描")
|
||||
except Exception as e:
|
||||
if observe:
|
||||
error(f"启动QR码扫描失败: {e}", "失败")
|
||||
|
||||
# 获取起始位置
|
||||
start_position = list(ctrl.odo_msg.xyz)
|
||||
if observe:
|
||||
debug(f"起始位置: {start_position}", "位置")
|
||||
# 在起点放置绿色标记
|
||||
if hasattr(ctrl, 'place_marker'):
|
||||
ctrl.place_marker(start_position[0], start_position[1], start_position[2] if len(start_position) > 2 else 0.0, 'green', observe=True)
|
||||
|
||||
# 设置移动命令
|
||||
msg.mode = 11 # Locomotion模式
|
||||
msg.gait_id = 26 # 自变频步态
|
||||
msg.vel_des = [speed, 0, 0] # [前进速度, 侧向速度, 角速度]
|
||||
msg.duration = 0 # wait next cmd
|
||||
msg.step_height = [0.06, 0.06] # 抬腿高度
|
||||
|
||||
# 开始移动
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
# 记录起始时间和上次QR码检查时间
|
||||
start_time = time.time()
|
||||
last_qr_check_time = start_time
|
||||
|
||||
# 相机高度 (单位: 米)
|
||||
camera_height = 0.355
|
||||
|
||||
# 存储上一次检测到的横线距离
|
||||
last_hori_line_distance = None
|
||||
|
||||
# 主循环:持续移动直到达到目标或最大距离
|
||||
distance_moved = 0
|
||||
timeout = max_distance / speed + 5 # 超时时间
|
||||
|
||||
while distance_moved < max_distance and time.time() - start_time < timeout:
|
||||
# 计算已移动距离
|
||||
current_position = ctrl.odo_msg.xyz
|
||||
dx = current_position[0] - start_position[0]
|
||||
dy = current_position[1] - start_position[1]
|
||||
distance_moved = math.sqrt(dx*dx + dy*dy)
|
||||
|
||||
# 检测横向线
|
||||
image = ctrl.image_processor.get_current_image()
|
||||
|
||||
if detect_func_version == 1:
|
||||
edge_point, edge_info = detect_horizontal_track_edge(image, observe=False, save_log=True)
|
||||
elif detect_func_version == 2:
|
||||
edge_point, edge_info = detect_horizontal_track_edge_v2(image, observe=False, save_log=True)
|
||||
elif detect_func_version == 3:
|
||||
edge_point, edge_info = detect_horizontal_track_edge_v3(image, observe=False, save_log=True)
|
||||
else:
|
||||
edge_point, edge_info = detect_horizontal_track_edge_v2(image, observe=False, save_log=True)
|
||||
|
||||
# 如果检测到横向线
|
||||
if edge_point is not None and edge_info is not None:
|
||||
# 计算到横向线的距离
|
||||
current_distance = calculate_distance_to_line(edge_info, camera_height, observe=False)
|
||||
last_hori_line_distance = current_distance
|
||||
|
||||
if observe and time.time() % 0.5 < 0.02: # 每0.5秒左右打印一次
|
||||
info(f"检测到横向线,距离: {current_distance:.3f}米,目标距离: {target_distance:.3f}米", "检测")
|
||||
|
||||
# 判断是否达到目标距离
|
||||
if current_distance <= target_distance + 0.1:
|
||||
if observe:
|
||||
success(f"已达到目标距离,当前距离: {current_distance:.3f}米", "完成")
|
||||
|
||||
# 平滑停止
|
||||
if hasattr(ctrl.base_msg, 'stop_smooth'):
|
||||
ctrl.base_msg.stop_smooth()
|
||||
else:
|
||||
ctrl.base_msg.stop()
|
||||
|
||||
res['success'] = True
|
||||
res['target_reached'] = True
|
||||
res['final_distance'] = current_distance
|
||||
break
|
||||
|
||||
# 根据距离动态调整速度
|
||||
# 离目标越近,速度越慢
|
||||
if current_distance - target_distance < 2.0:
|
||||
# 线性降低速度
|
||||
speed_factor = min(1.0, max(0.3, (current_distance - target_distance) / 2.0))
|
||||
new_speed = speed * speed_factor
|
||||
|
||||
if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
|
||||
info(f"调整速度: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒", "速度")
|
||||
|
||||
msg.vel_des[0] = new_speed
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
# 检查QR码扫描结果
|
||||
current_time = time.time()
|
||||
if current_time - last_qr_check_time >= qr_check_interval:
|
||||
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
|
||||
if qr_data and scan_time > start_time:
|
||||
qr_result = qr_data
|
||||
res['qr_result'] = qr_result
|
||||
if observe:
|
||||
success(f"扫描到QR码: {qr_data}", "扫描")
|
||||
last_qr_check_time = current_time
|
||||
|
||||
# 输出调试信息
|
||||
if observe and time.time() % 0.5 < 0.02: # 每0.5秒左右打印一次
|
||||
debug(f"已移动: {distance_moved:.3f}米, 最大距离: {max_distance:.3f}米", "移动")
|
||||
|
||||
time.sleep(0.05) # 控制循环频率
|
||||
|
||||
# 停止移动
|
||||
if not res['target_reached']:
|
||||
if observe:
|
||||
if distance_moved >= max_distance:
|
||||
warning(f"已达到最大移动距离 {max_distance:.3f}米,但未检测到横向线", "停止")
|
||||
else:
|
||||
warning("超时停止", "停止")
|
||||
ctrl.base_msg.stop()
|
||||
res['success'] = False
|
||||
|
||||
# 停止异步QR码扫描
|
||||
ctrl.image_processor.stop_async_scan()
|
||||
|
||||
# 最后一次检查QR码结果
|
||||
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
|
||||
if qr_data and (qr_result is None or scan_time > last_qr_check_time):
|
||||
qr_result = qr_data
|
||||
res['qr_result'] = qr_result
|
||||
if observe:
|
||||
success(f"最终扫描到QR码: {qr_data}", "扫描")
|
||||
|
||||
# 记录最终位置和移动距离
|
||||
final_position = ctrl.odo_msg.xyz
|
||||
dx = final_position[0] - start_position[0]
|
||||
dy = final_position[1] - start_position[1]
|
||||
final_distance_moved = math.sqrt(dx*dx + dy*dy)
|
||||
res['distance_moved'] = final_distance_moved
|
||||
|
||||
if observe:
|
||||
# 在终点放置红色标记
|
||||
if hasattr(ctrl, 'place_marker'):
|
||||
ctrl.place_marker(final_position[0], final_position[1],
|
||||
final_position[2] if len(final_position) > 2 else 0.0,
|
||||
'red', observe=True)
|
||||
info(f"移动完成,总距离: {final_distance_moved:.3f}米", "完成")
|
||||
if qr_result:
|
||||
info(f"扫描到的QR码: {qr_result}", "结果")
|
||||
|
||||
return res['success'], qr_result, res
|
||||
|
||||
|
||||
def run_task_5(ctrl, msg):
|
||||
"""
|
||||
走向卸货
|
||||
"""
|
||||
section('任务5-1:直线移动并扫描二维码', "移动")
|
||||
|
||||
# 目标与横线的距离为0.5米
|
||||
target_distance = 0.5
|
||||
# 前进速度为0.5米/秒
|
||||
speed = 0.5
|
||||
# 最大移动距离为8米
|
||||
max_distance = 8
|
||||
# 启用观察模式
|
||||
observe = True
|
||||
|
||||
# 开始移动并扫描二维码
|
||||
success, qr_result, move_info = go_straight_to_horizontal_line_with_qr(
|
||||
ctrl, msg,
|
||||
target_distance=target_distance,
|
||||
speed=speed,
|
||||
max_distance=max_distance,
|
||||
observe=observe
|
||||
)
|
||||
|
||||
# 输出结果
|
||||
if success:
|
||||
success("成功到达横线前指定距离", "完成")
|
||||
if qr_result:
|
||||
info(f"扫描到二维码: {qr_result}", "二维码")
|
||||
else:
|
||||
warning("未扫描到二维码", "二维码")
|
||||
else:
|
||||
error("未能成功到达横线前指定距离", "失败")
|
||||
|
||||
# 返回移动和扫描结果
|
||||
return success, qr_result, move_info
|
Loading…
x
Reference in New Issue
Block a user