mi-task/task_3/task_3.py
2025-08-21 12:51:13 +08:00

684 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import sys
import os
import toml
import copy
import math
import lcm
import numpy as np
import cv2
import tempfile
# 添加父目录到路径以便能够导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 添加当前目录到路径确保可以找到local文件
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from base_move.turn_degree import turn_degree, turn_degree_v2
from base_move.go_straight import go_straight, go_lateral
from base_move.go_to_xy import go_to_x_v2
from base_move.center_on_dual_tracks import center_on_dual_tracks
from file_send_lcmt import file_send_lcmt
from utils.yellow_area_analyzer import analyze_yellow_area_ratio
from utils.detect_dual_track_lines import detect_dual_track_lines
from utils.speech_demo import speak
# 创建本模块特定的日志记录器
logger = get_logger("任务3")
observe = False
YELLOW_RATIO_THRESHOLD = 0.04 # TODO 黄色区域比例阈值
def run_task_3(ctrl, msg, time_sleep=5000):
section('任务3上下坡', "启动")
info('开始执行任务3...', "启动")
turn_degree_v2(ctrl, msg, 90, absolute=True)
section('任务3-1up and down', "开始")
go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# pass_up_down(ctrl, msg)
turn_degree_v2(ctrl, msg, 90, absolute=True)
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False)
section('任务3-2yellow stop', "开始")
go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3)
# 原地站立3秒
section("原地站立3秒", "站立")
msg.mode = 12
msg.gait_id = 0
msg.duration = 0
msg.step_height = [0.06, 0.06]
msg.vel_des = [0, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
speak("检测到黄灯")
info("开始原地站立3秒", "站立")
speak('5')
time.sleep(time_sleep / 1000)
speak('4')
time.sleep(time_sleep / 1000)
speak('3')
time.sleep(time_sleep / 1000)
speak('2')
time.sleep(time_sleep / 1000)
speak('1')
time.sleep(time_sleep / 1000)
info("完成原地站立", "站立")
def run_task_3_back(ctrl, msg, time_sleep=5000):
section('任务3-1up', "开始")
section('任务3-2yellow stop', "开始")
go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3)
# 原地站立3秒
section("原地站立3秒", "站立")
msg.mode = 12
msg.gait_id = 0
msg.duration = 0
msg.step_height = [0.06, 0.06]
msg.vel_des = [0, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
info("开始原地站立3秒", "站立")
speak()
time.sleep(time_sleep / 5000)
info("完成原地站立", "站立")
section('任务3-3up and down', "开始")
go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# TODO 调优达到一个合适的位置,准备 task 2-5 的返回
# 或许用到 move-to-line 函数
def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_time=30, observe=True):
"""
控制机器人直走,直到摄像头检测到黄色区域比例超过指定阈值后开始下降时停止
参数:
ctrl: Robot_Ctrl 对象,包含里程计信息
msg: robot_control_cmd_lcmt 对象,用于发送命令
yellow_ratio_threshold: 黄色区域占比阈值(0-1之间的浮点数)默认为0.15
speed: 前进速度(米/秒)默认为0.3米/秒
max_time: 最大行走时间(秒)默认为30秒
observe: 是否输出中间状态信息和可视化结果默认为True
返回:
bool: 是否成功检测到黄色区域并停止
"""
section("开始直行寻找黄色区域", "黄色检测")
# 设置移动命令基本参数
msg.mode = 11 # Locomotion模式
msg.gait_id = 26 # 自变频步态
msg.duration = 0 # wait next cmd
msg.step_height = [0.06, 0.06] # 抬腿高度
msg.vel_des = [speed, 0, 0] # [前进速度, 侧向速度, 角速度]
# 记录起始时间和位置
start_time = time.time()
start_position = list(ctrl.odo_msg.xyz)
if observe:
info(f"开始寻找黄色区域,阈值: {yellow_ratio_threshold:.2%}", "启动")
debug(f"起始位置: {start_position}", "位置")
# 检测间隔
check_interval = 0.3 # 秒
last_check_time = 0
# 黄色区域监测变量
yellow_peak_detected = False # 是否检测到峰值
yellow_decreasing = False # 是否开始下降
max_yellow_ratio = 0.0 # 记录最大黄色区域占比
yellow_ratio_history = [] # 记录黄色区域占比历史
history_window_size = 5 # 历史窗口大小,用于平滑处理
try:
# 开始移动
msg.life_count += 1
ctrl.Send_cmd(msg)
# 持续检测黄色区域
while time.time() - start_time < max_time and not yellow_decreasing:
current_time = time.time()
# 定期发送移动命令保持移动状态
if current_time - last_check_time >= check_interval:
# 获取当前图像并保存到临时文件
current_image = ctrl.image_processor.get_current_image('ai') # INFO 默认采用 ai 相机
# 创建临时文件保存图像
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
temp_filename = temp_file.name
cv2.imwrite(temp_filename, current_image)
try:
# 分析图像中的黄色区域
yellow_ratio = analyze_yellow_area_ratio(temp_filename, debug=False, save_result=False)
# 添加到历史记录
yellow_ratio_history.append(yellow_ratio)
if len(yellow_ratio_history) > history_window_size:
yellow_ratio_history.pop(0)
# 计算平滑后的当前黄色占比(使用最近几次的平均值以减少噪声)
current_smooth_ratio = sum(yellow_ratio_history) / len(yellow_ratio_history)
# 计算已移动距离(仅用于显示)
current_position = ctrl.odo_msg.xyz
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
distance_moved = math.sqrt(dx*dx + dy*dy)
if observe:
info(f"当前黄色区域占比: {yellow_ratio:.2%}, 平滑值: {current_smooth_ratio:.2%}, 已移动: {distance_moved:.2f}", "检测")
# 检测是否达到阈值(开始监测峰值)
if current_smooth_ratio >= yellow_ratio_threshold:
# 更新最大值
if current_smooth_ratio > max_yellow_ratio:
max_yellow_ratio = current_smooth_ratio
if not yellow_peak_detected:
yellow_peak_detected = True
if observe:
info(f"黄色区域占比超过阈值,开始监测峰值", "检测")
# 检测是否开始下降
elif yellow_peak_detected and current_smooth_ratio < max_yellow_ratio * 0.9: # 下降到峰值的90%以下认为开始下降
yellow_decreasing = True
if observe:
success(f"检测到黄色区域占比开始下降,峰值: {max_yellow_ratio:.2%}, 当前: {current_smooth_ratio:.2%}", "检测")
finally:
# 删除临时文件
try:
os.unlink(temp_filename)
except:
pass
# 更新心跳
msg.life_count += 1
ctrl.Send_cmd(msg)
last_check_time = current_time
# 小间隔
time.sleep(0.05)
# 平滑停止
if yellow_decreasing:
if observe:
info("开始平滑停止", "停止")
# 先降低速度再停止,实现平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 完全停止
ctrl.base_msg.stop()
# 计算最终移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
final_distance = math.sqrt(dx*dx + dy*dy)
if observe:
if yellow_decreasing:
success(f"成功检测到黄色区域峰值并停止,峰值占比: {max_yellow_ratio:.2%}, 总移动距离: {final_distance:.2f}", "完成")
else:
warning(f"未能在限定时间内检测到黄色区域峰值,总移动距离: {final_distance:.2f}", "超时")
return yellow_decreasing
except KeyboardInterrupt:
# 处理键盘中断
ctrl.base_msg.stop()
if observe:
warning("操作被用户中断", "中断")
return False
except Exception as e:
# 处理其他异常
ctrl.base_msg.stop()
if observe:
error(f"发生错误: {str(e)}", "错误")
return False
def go_straight_with_enhanced_calibration(ctrl, msg, distance, speed=0.5, observe=False,
mode=11, gait_id=3, step_height=[0.21, 0.21]):
"""
控制机器人在石板路上沿直线行走,使用视觉校准和姿态传感器融合来保持直线
现在用于 task-3 的上下坡
参数:
ctrl: Robot_Ctrl 对象
msg: 控制消息对象
distance: 要行走的距离(米)
speed: 行走速度(米/秒)
observe: 是否输出调试信息
mode: 运动模式
gait_id: 步态ID
step_height: 摆动腿高度
返回:
bool: 是否成功完成
"""
section("开始石板路增强直线移动", "石板路移动")
# 参数验证
if abs(distance) < 0.01:
info("距离太短,无需移动", "信息")
return True
# 检查相机是否可用
if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'):
warning("机器人控制器没有提供图像处理器,将使用姿态传感器辅助校准", "警告")
# 限制速度范围
speed = min(max(abs(speed), 0.1), 1.0)
# 确定前进或后退方向
forward = distance > 0
move_speed = speed if forward else -speed
abs_distance = abs(distance)
# 获取起始位置和姿态
start_position = list(ctrl.odo_msg.xyz)
start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向
if observe:
debug(f"起始位置: {start_position}", "位置")
info(f"开始石板路增强直线移动,距离: {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动")
# 设置移动命令
msg.mode = mode
msg.gait_id = gait_id
msg.step_height = step_height
msg.duration = 0 # wait next cmd
# 根据需要移动的距离动态调整移动速度
if abs_distance > 1.0:
actual_speed = move_speed # 距离较远时用设定速度
else:
actual_speed = move_speed * 0.8 # 较近距离略微降速
# 设置移动速度和方向
msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度]
msg.life_count += 1
# 发送命令
ctrl.Send_cmd(msg)
# 估算移动时间,但实际上会通过里程计控制
estimated_time = abs_distance / abs(actual_speed)
timeout = estimated_time + 5 # 增加超时时间
# 使用里程计进行实时监控移动距离
distance_moved = 0
start_time = time.time()
last_check_time = start_time
position_check_interval = 0.1 # 位置检查间隔(秒)
# 计算移动方向单位向量(世界坐标系下)
direction_vector = [math.cos(start_yaw), math.sin(start_yaw)]
if not forward:
direction_vector = [-direction_vector[0], -direction_vector[1]]
# 视觉跟踪相关变量
vision_check_interval = 0.2 # 视觉检查间隔(秒)
last_vision_check = start_time
vision_correction_gain = 0.006 # 视觉修正增益系数
# 用于滤波的队列
deviation_queue = []
filter_size = 5
last_valid_deviation = 0
# PID控制参数 - 用于角度修正
kp_angle = 0.6 # 比例系数
ki_angle = 0.02 # 积分系数
kd_angle = 0.1 # 微分系数
# PID控制变量
angle_error_integral = 0
last_angle_error = 0
# 偏移量累计 - 用于检测持续偏移
y_offset_accumulator = 0
# 动态调整参数
slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速
completion_threshold = 0.95 # 当移动到目标距离的95%时停止
# 监控移动过程
while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout:
current_time = time.time()
# 按固定间隔检查位置
if current_time - last_check_time >= position_check_interval:
# 获取当前位置和朝向
current_position = ctrl.odo_msg.xyz
current_yaw = ctrl.odo_msg.rpy[2]
# 计算在移动方向上的位移
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
# 计算在初始方向上的投影距离(实际前进距离)
distance_moved = dx * direction_vector[0] + dy * direction_vector[1]
distance_moved = abs(distance_moved) # 确保距离为正值
# 计算垂直于移动方向的偏移量y方向偏移
y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
# 累积y方向偏移量检测持续偏移趋势
y_offset_accumulator = y_offset_accumulator * 0.8 + y_offset * 0.2
# 根据前进或后退确定期望方向
expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi)
# 使用IMU朝向数据计算角度偏差
yaw_error = current_yaw - expected_direction
# 角度归一化
while yaw_error > math.pi:
yaw_error -= 2 * math.pi
while yaw_error < -math.pi:
yaw_error += 2 * math.pi
# 使用PID控制计算角速度修正
# 比例项
p_control = kp_angle * yaw_error
# 积分项 (带衰减)
angle_error_integral = angle_error_integral * 0.9 + yaw_error
angle_error_integral = max(-1.0, min(1.0, angle_error_integral)) # 限制积分范围
i_control = ki_angle * angle_error_integral
# 微分项
d_control = kd_angle * (yaw_error - last_angle_error)
last_angle_error = yaw_error
# 计算总的角速度控制量
angular_correction = -(p_control + i_control + d_control)
# 限制最大角速度修正
angular_correction = max(min(angular_correction, 0.3), -0.3)
# 根据持续的y偏移趋势增加侧向校正
lateral_correction = 0
if abs(y_offset_accumulator) > 0.05: # 如果累积偏移超过5厘米
lateral_correction = -y_offset_accumulator * 0.8 # 反向校正
lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 限制最大侧向速度
if observe and abs(lateral_correction) > 0.05:
warning(f"累积Y偏移校正: {y_offset_accumulator:.3f}米,应用侧向速度 {lateral_correction:.3f}m/s", "偏移")
# 计算完成比例
completion_ratio = distance_moved / abs_distance
# 根据距离完成情况调整速度
if completion_ratio > slow_down_ratio:
# 计算减速系数
slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio)
# 确保不会减速太多
slow_factor = max(0.3, slow_factor)
new_speed = actual_speed * slow_factor
if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动")
actual_speed = new_speed
# 应用修正 - 同时应用角速度和侧向速度修正
msg.vel_des = [actual_speed, lateral_correction, angular_correction]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < position_check_interval:
debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离")
debug(f"Y偏移: {y_offset:.3f}米, 累积偏移: {y_offset_accumulator:.3f}", "偏移")
debug(f"朝向偏差: {math.degrees(yaw_error):.1f}度, 角速度修正: {angular_correction:.3f}rad/s", "角度")
debug(f"PID: P={p_control:.3f}, I={i_control:.3f}, D={d_control:.3f}", "控制")
# 更新检查时间
last_check_time = current_time
# 定期进行视觉检查和修正 # TODO 这个函数如果用于 上下坡的话,感觉或许就不需要了,如果能保证 task 2-5 的效果。
if hasattr(ctrl, 'image_processor') and current_time - last_vision_check >= vision_check_interval:
try:
# 获取当前相机帧
frame = ctrl.image_processor.get_current_image()
if frame is not None:
# 检测轨道线 - 使用特定的石板路模式
center_info, _, _ = detect_dual_track_lines(
frame, observe=False, save_log=False)
# 如果成功检测到轨道线,使用它进行修正
if center_info is not None:
# 获取当前偏差
current_deviation = center_info["deviation"]
last_valid_deviation = current_deviation
# 添加到队列进行滤波
deviation_queue.append(current_deviation)
if len(deviation_queue) > filter_size:
deviation_queue.pop(0)
# 计算滤波后的偏差值 (去除最大和最小值后的平均)
if len(deviation_queue) >= 3:
filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue
filtered_deviation = sum(filtered_deviations) / len(filtered_deviations)
else:
filtered_deviation = current_deviation
# 计算视觉侧向修正速度
vision_lateral_correction = -filtered_deviation * vision_correction_gain
# 限制最大侧向速度修正
vision_lateral_correction = max(min(vision_lateral_correction, 0.2), -0.2)
# 与当前的侧向校正进行融合 (加权平均)
if msg.vel_des[1] != 0:
# 如果已经有侧向校正,与视觉校正进行融合
fused_lateral = msg.vel_des[1] * 0.3 + vision_lateral_correction * 0.7
else:
# 如果没有侧向校正,直接使用视觉校正
fused_lateral = vision_lateral_correction
if observe and abs(vision_lateral_correction) > 0.05:
warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {vision_lateral_correction:.3f}m/s", "视觉")
# 应用视觉修正,保留当前前进速度和角速度
msg.vel_des = [msg.vel_des[0], fused_lateral, msg.vel_des[2]]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < vision_check_interval:
debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉")
debug(f"融合侧向速度: {fused_lateral:.3f}m/s", "视觉")
except Exception as e:
if observe:
error(f"视觉检测异常: {e}", "错误")
# 更新视觉检查时间
last_vision_check = current_time
# 短暂延时
time.sleep(0.01)
# 平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [actual_speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 最后完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
# 获取最终位置和实际移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
actual_distance = math.sqrt(dx*dx + dy*dy)
# 计算最终y方向偏移
final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
if observe:
success(f"石板路增强直线移动完成,实际移动距离: {actual_distance:.3f}", "完成")
info(f"最终Y方向偏移: {final_y_offset:.3f}", "偏移")
# 判断是否成功完成
distance_error = abs(actual_distance - abs_distance)
y_offset_error = abs(final_y_offset)
go_success = distance_error < 0.1 and y_offset_error < 0.1 # 距离误差和y偏移都小于10厘米视为成功
if observe:
if go_success:
success(f"移动成功", "成功")
else:
warning(f"移动不够精确,距离误差: {distance_error:.3f}米, Y偏移: {y_offset_error:.3f}", "警告")
return go_success
# INFO 保持文件相对路径,直接从 task-4中调用
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
def pass_stone(ctrl, msg, distance=5.0, observe=True):
"""
通过里程计设定distance执行上坡gait。
:param ctrl: 控制器对象
:param msg: 控制消息
:param distance: 期望移动距离(米)
:param observe: 是否打印过程信息
"""
usergait_msg = file_send_lcmt()
try:
# 1. 生成gait参数文件
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k += 1
with open("./task_3/Gait_Params_up_full.toml", 'w') as f:
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
# 2. 发送gait定义和参数
with open("./task_3/Gait_Def_up.toml",'r') as file_obj_gait_def, \
open("./task_3/Gait_Params_up_full.toml",'r') as file_obj_gait_params:
usergait_msg.data = file_obj_gait_def.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.1)
# 3. 记录起始位置
start_xyz = ctrl.odo_msg.xyz[:2] # 只取x, y
if observe:
info(f"pass_up_down: 期望移动距离 {distance:.3f} 米,起始位置: {start_xyz}", "监测")
# 4. 设置gait执行参数
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 5. 控制循环按里程计判断是否到达去除max-iteration直接while判断
arrived = False
step = 0
while True:
ctrl.Send_cmd(msg)
cur_xyz = ctrl.odo_msg.xyz[:2]
dx = cur_xyz[0] - start_xyz[0]
dy = cur_xyz[1] - start_xyz[1]
moved = (dx**2 + dy**2) ** 0.5
if observe and step % 10 == 0:
info(f"Step:{step} 已移动距离={moved:.3f}m, 目标={distance:.3f}m, 当前xy={cur_xyz}", "监测")
if moved >= distance:
arrived = True
if observe:
success(f"pass_up_down: 已到达设定距离 {distance:.3f}m (实际: {moved:.3f}m)", "完成")
break
step += 1
time.sleep(0.1)
# 6. 完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 7. 反馈最终位置
final_xyz = ctrl.odo_msg.xyz[:2]
dx = final_xyz[0] - start_xyz[0]
dy = final_xyz[1] - start_xyz[1]
actual_distance = (dx**2 + dy**2) ** 0.5
if observe:
info(f"pass_up_down: 最终位置 {final_xyz}, 实际移动距离 {actual_distance:.3f}m", "监测")
return arrived
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass