mi-task/base_move/follow_dual_tracks.py

584 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from utils.detect_dual_track_lines import detect_dual_track_lines, auto_detect_dual_track_lines
def follow_dual_tracks(ctrl, msg, speed=0.5, max_time=30, target_distance=None, observe=False,
mode=11,
gait_id=26,
step_height=[0.06, 0.06],
):
"""
控制机器狗在双轨道线中间行走
参数:
ctrl: Robot_Ctrl 对象,包含里程计信息
msg: robot_control_cmd_lcmt 对象,用于发送命令
speed: 前进速度(米/秒)默认为0.5米/秒
max_time: 最大行走时间(秒)默认为30秒
target_distance: 目标移动距离(米),如果设置,则达到此距离后停止
observe: 是否输出中间状态信息和可视化结果默认为False
返回:
bool: 是否成功完成双轨道跟随
"""
section("开始双轨道跟随", "轨道跟随")
# 设置移动命令基本参数
msg.mode = mode # Locomotion模式
msg.gait_id = gait_id # 自变频步态
msg.duration = 0 # wait next cmd
msg.step_height = step_height # 抬腿高度
# 记录起始时间
start_time = time.time()
# 记录起始位置
start_position = list(ctrl.odo_msg.xyz)
if observe:
debug(f"起始位置: {start_position}", "位置")
if target_distance:
info(f"目标距离: {target_distance}", "目标")
# 在起点放置绿色标记
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(start_position[0], start_position[1], start_position[2] if len(start_position) > 2 else 0.0, 'green', observe=True)
# PID控制参数
kp = 0.005 # 比例系数
ki = 0.0002 # 积分系数
kd = 0.001 # 微分系数
# PID控制变量
previous_error = 0
integral = 0
# 最大角速度限制
max_angular_velocity = 0.5 # rad/s
# 记录成功检测到双轨道的次数和总次数,用于计算检测成功率
detection_success_count = 0
detection_total_count = 0
# 帧间滤波参数
filter_size = 5 # 滤波队列大小
deviation_queue = [] # 偏差值队列
last_valid_deviation = 0 # 上一次有效的偏差值
last_valid_center_info = None # 上一次有效的中心信息
# 开始跟随循环
distance_moved = 0
while time.time() - start_time < max_time:
# 检查距离条件
if target_distance and distance_moved >= target_distance:
if observe:
success(f"已达到目标距离 {target_distance}米,停止移动", "完成")
break
# 获取当前图像
image = ctrl.image_processor.get_current_image()
# 检测双轨道线
detection_total_count += 1
center_info, left_info, right_info = detect_dual_track_lines(image, observe=observe, save_log=True)
# 使用帧间滤波增强稳定性
if center_info is not None:
detection_success_count += 1
# 保存有效的中心信息
last_valid_center_info = center_info
# 获取当前偏差
current_deviation = center_info["deviation"]
last_valid_deviation = current_deviation
# 添加到队列
deviation_queue.append(current_deviation)
if len(deviation_queue) > filter_size:
deviation_queue.pop(0)
# 计算滤波后的偏差值 (去除最大和最小值后的平均)
if len(deviation_queue) >= 3:
filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue
filtered_deviation = sum(filtered_deviations) / len(filtered_deviations)
else:
filtered_deviation = current_deviation
if observe and time.time() % 0.5 < 0.02:
debug(f"原始偏差: {current_deviation}px, 滤波后: {filtered_deviation}px", "滤波")
# 计算PID控制
error = -filtered_deviation # 负值表示需要向右转,正值表示需要向左转
# 比例项
p_control = kp * error
# 积分项 (添加积分限制以防止积分饱和)
integral += error
integral = max(-1000, min(1000, integral)) # 限制积分范围
i_control = ki * integral
# 微分项
derivative = error - previous_error
d_control = kd * derivative
previous_error = error
# 计算总的角速度控制量
angular_velocity = p_control + i_control + d_control
# 限制角速度范围
angular_velocity = max(-max_angular_velocity, min(max_angular_velocity, angular_velocity))
# 动态调整前进速度 - 偏差越大,速度越慢
adaptive_speed = speed
if abs(filtered_deviation) > 100: # 如果偏差较大
# 根据偏差大小动态调整速度最低降至60%
deviation_factor = max(0.6, 1.0 - abs(filtered_deviation) / 1000.0)
adaptive_speed = speed * deviation_factor
if observe and time.time() % 0.5 < 0.02:
debug(f"偏差较大({filtered_deviation:.1f}px),调整速度至 {adaptive_speed:.2f}m/s", "速度")
if observe and time.time() % 0.5 < 0.02:
debug(f"偏差: {filtered_deviation:.1f}px, P: {p_control:.4f}, I: {i_control:.4f}, D: {d_control:.4f}, 角速度: {angular_velocity:.4f}", "控制")
# 设置速度命令
msg.vel_des = [adaptive_speed, 0, angular_velocity]
else:
warning("未检测到双轨道线", "警告")
# 如果有之前的有效检测结果,使用上一次的控制值来平滑过渡
if last_valid_center_info is not None and len(deviation_queue) > 0:
# 使用上一次的有效偏差,但降低其影响(乘以衰减因子)
decay_factor = 0.8
decayed_deviation = last_valid_deviation * decay_factor
# 计算PID控制
error = -decayed_deviation
# 更新PID控制值但不更新积分项
p_control = kp * error
i_control = ki * integral
d_control = 0 # 不使用微分项以避免抖动
# 计算总的角速度控制量,但降低其幅度
angular_velocity = (p_control + i_control) * 0.7
# 限制角速度范围
angular_velocity = max(-max_angular_velocity/2, min(max_angular_velocity/2, angular_velocity))
# 降低速度以增加安全性
reduced_speed = speed * 0.8
if observe:
warning(f"使用上一帧数据,偏差: {decayed_deviation:.1f}px, 角速度: {angular_velocity:.4f}, 速度: {reduced_speed:.2f}m/s", "恢复")
# 设置速度命令
msg.vel_des = [reduced_speed, 0, angular_velocity]
else:
# 如果连续多次无法检测到轨道,降低速度直线行走
if detection_total_count > 5 and detection_success_count / detection_total_count < 0.3:
reduced_speed = speed * 0.6 # 降低至60%的速度
msg.vel_des = [reduced_speed, 0, 0]
if observe:
warning(f"检测成功率低,降低速度至 {reduced_speed:.2f}m/s", "警告")
else:
# 如果刚开始无法检测,使用原速度直线行走
msg.vel_des = [speed, 0, 0]
# 发送命令
msg.life_count += 1
ctrl.Send_cmd(msg)
# 短暂延时
time.sleep(0.05)
# 计算已移动距离
current_position = ctrl.odo_msg.xyz
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
distance_moved = math.sqrt(dx*dx + dy*dy)
if observe and time.time() % 0.5 < 0.02: # 每0.5秒左右打印一次
info(f"已移动: {distance_moved:.3f}", "移动")
if target_distance:
progress = (distance_moved / target_distance) * 100
info(f"进度: {progress:.1f}%", "进度")
# 平滑停止
if observe:
info("开始平滑停止", "停止")
# 先降低速度再停止,实现平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 最后完全停止
ctrl.base_msg.stop()
# 计算最终移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
final_distance = math.sqrt(dx*dx + dy*dy)
if observe:
# 在终点放置红色标记
end_position = list(final_position)
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(end_position[0], end_position[1], end_position[2] if len(end_position) > 2 else 0.0, 'red', observe=True)
success(f"双轨道跟随完成,总移动距离: {final_distance:.3f}", "完成")
# 显示检测成功率
if detection_total_count > 0:
detection_rate = (detection_success_count / detection_total_count) * 100
info(f"轨迹检测成功率: {detection_rate:.1f}% ({detection_success_count}/{detection_total_count})", "统计")
# 如果有目标距离,判断是否达到目标
if target_distance:
is_success = abs(final_distance - target_distance) < 0.2 # 误差在20cm内算成功
if observe:
if is_success:
success(f"成功到达目标距离,误差: {abs(final_distance - target_distance):.2f}", "成功")
else:
warning(f"未能精确到达目标距离,误差: {abs(final_distance - target_distance):.2f}", "警告")
return is_success
return True
def go_straight_with_visual_track(ctrl, msg, distance, speed=0.5, observe=False,
mode=11, gait_id=26, step_height=[0.06, 0.06],
stone_path_mode=False):
"""
控制机器人沿直线行走指定距离,同时利用视觉检测黄色轨道线进行自动校准
参数:
ctrl: Robot_Ctrl 对象,包含里程计信息和相机
msg: robot_control_cmd_lcmt 对象,用于发送命令
distance: 要行走的距离(米),正值为前进,负值为后退
speed: 行走速度(米/秒)范围0.1~1.0默认为0.5
observe: 是否输出中间状态信息默认为False
mode: 控制模式默认为11
gait_id: 步态ID默认为26
step_height: 抬腿高度,默认为[0.06, 0.06]
stone_path_mode: 是否为石板路模式默认为False
返回:
bool: 是否成功完成行走
"""
section("开始视觉引导直线移动", "视觉直行")
# 参数验证
if abs(distance) < 0.01:
info("距离太短,无需移动", "信息")
return True
# 检查相机是否可用
if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'):
error("机器人控制器没有提供图像处理器,无法进行视觉跟踪", "错误")
return False
# 限制速度范围
speed = min(max(abs(speed), 0.1), 1.0)
# 确定前进或后退方向
forward = distance > 0
move_speed = speed if forward else -speed
abs_distance = abs(distance)
# 获取起始位置
start_position = list(ctrl.odo_msg.xyz)
start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向,用于保持直线
if observe:
debug(f"起始位置: {start_position}", "位置")
info(f"开始{'前进' if forward else '后退'} {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动")
# 在起点放置标记
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(start_position[0], start_position[1],
start_position[2] if len(start_position) > 2 else 0.0,
'green', observe=True)
# 设置移动命令
msg.mode = mode # 控制模式
msg.gait_id = gait_id # 步态ID
# 根据需要移动的距离动态调整移动速度
if abs_distance > 1.0:
actual_speed = move_speed # 距离较远时用设定速度
elif abs_distance > 0.5:
actual_speed = move_speed * 0.8 # 中等距离略微降速
elif abs_distance > 0.2:
actual_speed = move_speed * 0.6 # 较近距离降低速度
else:
actual_speed = move_speed * 0.4 # 非常接近时用更慢速度
# 设置移动速度和方向
msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度]
msg.duration = 0 # wait next cmd
msg.step_height = step_height # 抬腿高度
msg.life_count += 1
# 发送命令
ctrl.Send_cmd(msg)
# 估算移动时间,但实际上会通过里程计控制
estimated_time = abs_distance / abs(actual_speed)
timeout = estimated_time + 3 # 增加超时时间为预计移动时间加3秒
# 使用里程计进行实时监控移动距离
distance_moved = 0
start_time = time.time()
last_position = start_position
# 动态调整参数
angle_correction_threshold = 0.05 # 角度偏差超过多少弧度开始修正
vision_check_interval = 0.3 # 视觉检查间隔(秒)
last_vision_check = start_time
vision_correction_gain = 0.005 # 视觉修正增益系数,根据像素偏差换算
slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速
completion_threshold = 0.95 # 当移动到目标距离的95%时停止
position_check_interval = 0.1 # 位置检查间隔(秒)
last_check_time = start_time
# 计算移动方向单位向量(世界坐标系下)
direction_vector = [math.cos(start_yaw), math.sin(start_yaw)]
if not forward:
direction_vector = [-direction_vector[0], -direction_vector[1]]
# 视觉跟踪相关变量
last_valid_center_info = None
last_valid_deviation = 0
deviation_queue = [] # 用于滤波
filter_size = 5
detection_success_count = 0
detection_total_count = 0
# 监控移动距离和视觉轨迹
while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout:
current_time = time.time()
# 按固定间隔检查位置,减少计算负担
if current_time - last_check_time >= position_check_interval:
# 获取当前位置和朝向
current_position = ctrl.odo_msg.xyz
current_yaw = ctrl.odo_msg.rpy[2]
# 计算在移动方向上的位移
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
# 计算在初始方向上的投影距离(实际前进距离)
distance_moved = dx * direction_vector[0] + dy * direction_vector[1]
distance_moved = abs(distance_moved) # 确保距离为正值
# 计算垂直于移动方向的偏移量y方向偏移
y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
# 根据前进或后退确定期望方向
expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi)
# 使用IMU朝向数据计算角度偏差
yaw_error = current_yaw - expected_direction
# 角度归一化
while yaw_error > math.pi:
yaw_error -= 2 * math.pi
while yaw_error < -math.pi:
yaw_error += 2 * math.pi
# 计算完成比例
completion_ratio = distance_moved / abs_distance
# 根据距离完成情况调整速度
if completion_ratio > slow_down_ratio:
# 计算减速系数
slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio)
# 确保不会减速太多
slow_factor = max(0.2, slow_factor)
new_speed = actual_speed * slow_factor
if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动")
msg.vel_des[0] = new_speed
# 根据角度偏差进行角速度修正
angular_correction = 0
if abs(yaw_error) > angle_correction_threshold:
# 计算角速度修正值,偏差越大修正越强
angular_correction = -yaw_error * 0.5 # 比例系数可调整
# 限制最大角速度修正
angular_correction = max(min(angular_correction, 0.2), -0.2)
if observe and abs(angular_correction) > 0.05:
warning(f"方向修正: 偏差 {math.degrees(yaw_error):.1f}度,应用角速度 {angular_correction:.3f}rad/s", "角度")
# 更新最后检查时间和位置
last_check_time = current_time
last_position = current_position
if observe and current_time - start_time > 1 and (current_time % 1.0 < position_check_interval):
debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离")
debug(f"Y偏移: {y_offset:.3f}米, 朝向偏差: {math.degrees(yaw_error):.1f}", "偏移")
# 定期进行视觉检查和修正
if current_time - last_vision_check >= vision_check_interval:
try:
# 获取当前相机帧
frame = ctrl.image_processor.get_current_image()
if frame is not None:
# 检测轨道线
detection_total_count += 1
if stone_path_mode:
# 使用特定的石板路模式
center_info, left_info, right_info = detect_dual_track_lines(
frame, observe=False, save_log=False, stone_path_mode=True)
else:
# 使用自动检测模式
center_info, left_info, right_info = auto_detect_dual_track_lines(
frame, observe=False, save_log=False)
# 如果成功检测到轨道线,使用它进行修正
if center_info is not None:
detection_success_count += 1
# 保存有效的中心信息
last_valid_center_info = center_info
# 获取当前偏差
current_deviation = center_info["deviation"]
last_valid_deviation = current_deviation
# 添加到队列进行滤波
deviation_queue.append(current_deviation)
if len(deviation_queue) > filter_size:
deviation_queue.pop(0)
# 计算滤波后的偏差值 (去除最大和最小值后的平均)
if len(deviation_queue) >= 3:
filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue
filtered_deviation = sum(filtered_deviations) / len(filtered_deviations)
else:
filtered_deviation = current_deviation
# 计算侧向修正速度
lateral_correction = -filtered_deviation * vision_correction_gain
# 限制最大侧向速度修正
lateral_correction = max(min(lateral_correction, 0.2), -0.2)
if observe and abs(lateral_correction) > 0.05:
warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {lateral_correction:.3f}m/s", "视觉")
# 应用视觉修正,保留当前前进速度和角速度修正
msg.vel_des = [msg.vel_des[0], lateral_correction, msg.vel_des[2]]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 0.5 < 0.05:
debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉")
debug(f"轨道宽度: {center_info['track_width']:.1f}像素, 石板路模式: {center_info.get('stone_path_mode', False)}", "视觉")
debug(f"修正后速度: [{msg.vel_des[0]:.2f}, {msg.vel_des[1]:.2f}, {msg.vel_des[2]:.2f}]", "移动")
else:
# 如果有之前的有效检测结果,使用上一次的控制值来平滑过渡
if last_valid_center_info is not None and len(deviation_queue) > 0:
# 使用上一次的有效偏差,但降低其影响(乘以衰减因子)
decay_factor = 0.8
decayed_deviation = last_valid_deviation * decay_factor
# 计算侧向修正速度
lateral_correction = -decayed_deviation * vision_correction_gain * 0.7
# 限制最大侧向速度修正
lateral_correction = max(min(lateral_correction, 0.15), -0.15)
# 应用衰减后的视觉修正
msg.vel_des = [msg.vel_des[0], lateral_correction, msg.vel_des[2]]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe:
warning(f"使用上一帧数据,偏差: {decayed_deviation:.1f}像素, 侧向速度: {lateral_correction:.3f}m/s", "恢复")
elif observe:
warning("未检测到轨道线", "视觉")
except Exception as e:
error(f"视觉处理异常: {str(e)}", "错误")
last_vision_check = current_time
time.sleep(0.01) # 小间隔检查位置
# 平滑停止
if observe:
info("开始平滑停止", "停止")
# 先降低速度再停止,实现平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [actual_speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 最后完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
# 获取最终位置和实际移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
# 计算在初始方向上的投影距离(实际前进距离)
actual_distance = dx * direction_vector[0] + dy * direction_vector[1]
actual_distance = abs(actual_distance) # 确保距离为正值
# 计算垂直于移动方向的最终偏移量y方向偏移
final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
# 计算总移动距离(直线距离)
total_distance = math.sqrt(dx*dx + dy*dy)
if observe:
success(f"移动完成,从里程计计算的移动距离: {actual_distance:.3f}", "完成")
info(f"最终Y偏移: {final_y_offset:.3f}", "偏移")
# 在终点放置标记
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(final_position[0], final_position[1],
final_position[2] if len(final_position) > 2 else 0.0,
'red', observe=True)
# 显示检测成功率
if detection_total_count > 0:
detection_rate = (detection_success_count / detection_total_count) * 100
info(f"轨迹检测成功率: {detection_rate:.1f}% ({detection_success_count}/{detection_total_count})", "统计")
# 判断是否成功完成
distance_error = abs(actual_distance - abs_distance)
y_error = abs(final_y_offset)
go_success = distance_error < 0.1 and y_error < 0.1 # 如果距离误差和y偏移都小于10厘米则认为成功
if observe:
info(f"目标距离: {abs_distance:.3f}米, 实际距离: {actual_distance:.3f}米, 误差: {distance_error:.3f}", "距离")
info(f"Y偏移: {final_y_offset:.3f}", "偏移")
if go_success:
success(f"移动成功", "成功")
else:
warning(f"移动{'部分成功' if distance_error < 0.1 or y_error < 0.1 else '失败'}", "结果")
if distance_error >= 0.1:
warning(f"距离误差过大: {distance_error:.3f}", "距离")
if y_error >= 0.1:
warning(f"Y偏移过大: {y_error:.3f}", "偏移")
return go_success