import math import time import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing from utils.detect_dual_track_lines import detect_dual_track_lines, auto_detect_dual_track_lines def follow_dual_tracks(ctrl, msg, speed=0.5, max_time=30, target_distance=None, observe=False, mode=11, gait_id=26, step_height=[0.06, 0.06], ): """ 控制机器狗在双轨道线中间行走 参数: ctrl: Robot_Ctrl 对象,包含里程计信息 msg: robot_control_cmd_lcmt 对象,用于发送命令 speed: 前进速度(米/秒),默认为0.5米/秒 max_time: 最大行走时间(秒),默认为30秒 target_distance: 目标移动距离(米),如果设置,则达到此距离后停止 observe: 是否输出中间状态信息和可视化结果,默认为False 返回: bool: 是否成功完成双轨道跟随 """ section("开始双轨道跟随", "轨道跟随") # 设置移动命令基本参数 msg.mode = mode # Locomotion模式 msg.gait_id = gait_id # 自变频步态 msg.duration = 0 # wait next cmd msg.step_height = step_height # 抬腿高度 # 记录起始时间 start_time = time.time() # 记录起始位置 start_position = list(ctrl.odo_msg.xyz) if observe: debug(f"起始位置: {start_position}", "位置") if target_distance: info(f"目标距离: {target_distance}米", "目标") # 在起点放置绿色标记 if hasattr(ctrl, 'place_marker'): ctrl.place_marker(start_position[0], start_position[1], start_position[2] if len(start_position) > 2 else 0.0, 'green', observe=True) # PID控制参数 kp = 0.005 # 比例系数 ki = 0.0002 # 积分系数 kd = 0.001 # 微分系数 # PID控制变量 previous_error = 0 integral = 0 # 最大角速度限制 max_angular_velocity = 0.5 # rad/s # 记录成功检测到双轨道的次数和总次数,用于计算检测成功率 detection_success_count = 0 detection_total_count = 0 # 帧间滤波参数 filter_size = 5 # 滤波队列大小 deviation_queue = [] # 偏差值队列 last_valid_deviation = 0 # 上一次有效的偏差值 last_valid_center_info = None # 上一次有效的中心信息 # 开始跟随循环 distance_moved = 0 while time.time() - start_time < max_time: # 检查距离条件 if target_distance and distance_moved >= target_distance: if observe: success(f"已达到目标距离 {target_distance}米,停止移动", "完成") break # 获取当前图像 image = ctrl.image_processor.get_current_image() # 检测双轨道线 detection_total_count += 1 center_info, left_info, right_info = detect_dual_track_lines(image, observe=observe, save_log=True) # 使用帧间滤波增强稳定性 if center_info is not None: detection_success_count += 1 # 保存有效的中心信息 last_valid_center_info = center_info # 获取当前偏差 current_deviation = center_info["deviation"] last_valid_deviation = current_deviation # 添加到队列 deviation_queue.append(current_deviation) if len(deviation_queue) > filter_size: deviation_queue.pop(0) # 计算滤波后的偏差值 (去除最大和最小值后的平均) if len(deviation_queue) >= 3: filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue filtered_deviation = sum(filtered_deviations) / len(filtered_deviations) else: filtered_deviation = current_deviation if observe and time.time() % 0.5 < 0.02: debug(f"原始偏差: {current_deviation}px, 滤波后: {filtered_deviation}px", "滤波") # 计算PID控制 error = -filtered_deviation # 负值表示需要向右转,正值表示需要向左转 # 比例项 p_control = kp * error # 积分项 (添加积分限制以防止积分饱和) integral += error integral = max(-1000, min(1000, integral)) # 限制积分范围 i_control = ki * integral # 微分项 derivative = error - previous_error d_control = kd * derivative previous_error = error # 计算总的角速度控制量 angular_velocity = p_control + i_control + d_control # 限制角速度范围 angular_velocity = max(-max_angular_velocity, min(max_angular_velocity, angular_velocity)) # 动态调整前进速度 - 偏差越大,速度越慢 adaptive_speed = speed if abs(filtered_deviation) > 100: # 如果偏差较大 # 根据偏差大小动态调整速度,最低降至60% deviation_factor = max(0.6, 1.0 - abs(filtered_deviation) / 1000.0) adaptive_speed = speed * deviation_factor if observe and time.time() % 0.5 < 0.02: debug(f"偏差较大({filtered_deviation:.1f}px),调整速度至 {adaptive_speed:.2f}m/s", "速度") if observe and time.time() % 0.5 < 0.02: debug(f"偏差: {filtered_deviation:.1f}px, P: {p_control:.4f}, I: {i_control:.4f}, D: {d_control:.4f}, 角速度: {angular_velocity:.4f}", "控制") # 设置速度命令 msg.vel_des = [adaptive_speed, 0, angular_velocity] else: warning("未检测到双轨道线", "警告") # 如果有之前的有效检测结果,使用上一次的控制值来平滑过渡 if last_valid_center_info is not None and len(deviation_queue) > 0: # 使用上一次的有效偏差,但降低其影响(乘以衰减因子) decay_factor = 0.8 decayed_deviation = last_valid_deviation * decay_factor # 计算PID控制 error = -decayed_deviation # 更新PID控制值但不更新积分项 p_control = kp * error i_control = ki * integral d_control = 0 # 不使用微分项以避免抖动 # 计算总的角速度控制量,但降低其幅度 angular_velocity = (p_control + i_control) * 0.7 # 限制角速度范围 angular_velocity = max(-max_angular_velocity/2, min(max_angular_velocity/2, angular_velocity)) # 降低速度以增加安全性 reduced_speed = speed * 0.8 if observe: warning(f"使用上一帧数据,偏差: {decayed_deviation:.1f}px, 角速度: {angular_velocity:.4f}, 速度: {reduced_speed:.2f}m/s", "恢复") # 设置速度命令 msg.vel_des = [reduced_speed, 0, angular_velocity] else: # 如果连续多次无法检测到轨道,降低速度直线行走 if detection_total_count > 5 and detection_success_count / detection_total_count < 0.3: reduced_speed = speed * 0.6 # 降低至60%的速度 msg.vel_des = [reduced_speed, 0, 0] if observe: warning(f"检测成功率低,降低速度至 {reduced_speed:.2f}m/s", "警告") else: # 如果刚开始无法检测,使用原速度直线行走 msg.vel_des = [speed, 0, 0] # 发送命令 msg.life_count += 1 ctrl.Send_cmd(msg) # 短暂延时 time.sleep(0.05) # 计算已移动距离 current_position = ctrl.odo_msg.xyz dx = current_position[0] - start_position[0] dy = current_position[1] - start_position[1] distance_moved = math.sqrt(dx*dx + dy*dy) if observe and time.time() % 0.5 < 0.02: # 每0.5秒左右打印一次 info(f"已移动: {distance_moved:.3f}米", "移动") if target_distance: progress = (distance_moved / target_distance) * 100 info(f"进度: {progress:.1f}%", "进度") # 平滑停止 if observe: info("开始平滑停止", "停止") # 先降低速度再停止,实现平滑停止 slowdown_steps = 5 for i in range(slowdown_steps, 0, -1): slowdown_factor = i / slowdown_steps msg.vel_des = [speed * slowdown_factor, 0, 0] msg.life_count += 1 ctrl.Send_cmd(msg) time.sleep(0.1) # 最后完全停止 ctrl.base_msg.stop() # 计算最终移动距离 final_position = ctrl.odo_msg.xyz dx = final_position[0] - start_position[0] dy = final_position[1] - start_position[1] final_distance = math.sqrt(dx*dx + dy*dy) if observe: # 在终点放置红色标记 end_position = list(final_position) if hasattr(ctrl, 'place_marker'): ctrl.place_marker(end_position[0], end_position[1], end_position[2] if len(end_position) > 2 else 0.0, 'red', observe=True) success(f"双轨道跟随完成,总移动距离: {final_distance:.3f}米", "完成") # 显示检测成功率 if detection_total_count > 0: detection_rate = (detection_success_count / detection_total_count) * 100 info(f"轨迹检测成功率: {detection_rate:.1f}% ({detection_success_count}/{detection_total_count})", "统计") # 如果有目标距离,判断是否达到目标 if target_distance: is_success = abs(final_distance - target_distance) < 0.2 # 误差在20cm内算成功 if observe: if is_success: success(f"成功到达目标距离,误差: {abs(final_distance - target_distance):.2f}米", "成功") else: warning(f"未能精确到达目标距离,误差: {abs(final_distance - target_distance):.2f}米", "警告") return is_success return True def go_straight_with_visual_track(ctrl, msg, distance, speed=0.5, observe=False, mode=11, gait_id=26, step_height=[0.06, 0.06], stone_path_mode=False): """ 控制机器人沿直线行走指定距离,同时利用视觉检测黄色轨道线进行自动校准 参数: ctrl: Robot_Ctrl 对象,包含里程计信息和相机 msg: robot_control_cmd_lcmt 对象,用于发送命令 distance: 要行走的距离(米),正值为前进,负值为后退 speed: 行走速度(米/秒),范围0.1~1.0,默认为0.5 observe: 是否输出中间状态信息,默认为False mode: 控制模式,默认为11 gait_id: 步态ID,默认为26 step_height: 抬腿高度,默认为[0.06, 0.06] stone_path_mode: 是否为石板路模式,默认为False 返回: bool: 是否成功完成行走 """ section("开始视觉引导直线移动", "视觉直行") # 参数验证 if abs(distance) < 0.01: info("距离太短,无需移动", "信息") return True # 检查相机是否可用 if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'): error("机器人控制器没有提供图像处理器,无法进行视觉跟踪", "错误") return False # 限制速度范围 speed = min(max(abs(speed), 0.1), 1.0) # 确定前进或后退方向 forward = distance > 0 move_speed = speed if forward else -speed abs_distance = abs(distance) # 获取起始位置 start_position = list(ctrl.odo_msg.xyz) start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向,用于保持直线 if observe: debug(f"起始位置: {start_position}", "位置") info(f"开始{'前进' if forward else '后退'} {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动") # 在起点放置标记 if hasattr(ctrl, 'place_marker'): ctrl.place_marker(start_position[0], start_position[1], start_position[2] if len(start_position) > 2 else 0.0, 'green', observe=True) # 设置移动命令 msg.mode = mode # 控制模式 msg.gait_id = gait_id # 步态ID # 根据需要移动的距离动态调整移动速度 if abs_distance > 1.0: actual_speed = move_speed # 距离较远时用设定速度 elif abs_distance > 0.5: actual_speed = move_speed * 0.8 # 中等距离略微降速 elif abs_distance > 0.2: actual_speed = move_speed * 0.6 # 较近距离降低速度 else: actual_speed = move_speed * 0.4 # 非常接近时用更慢速度 # 设置移动速度和方向 msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度] msg.duration = 0 # wait next cmd msg.step_height = step_height # 抬腿高度 msg.life_count += 1 # 发送命令 ctrl.Send_cmd(msg) # 估算移动时间,但实际上会通过里程计控制 estimated_time = abs_distance / abs(actual_speed) timeout = estimated_time + 3 # 增加超时时间为预计移动时间加3秒 # 使用里程计进行实时监控移动距离 distance_moved = 0 start_time = time.time() last_position = start_position # 动态调整参数 angle_correction_threshold = 0.05 # 角度偏差超过多少弧度开始修正 vision_check_interval = 0.3 # 视觉检查间隔(秒) last_vision_check = start_time vision_correction_gain = 0.005 # 视觉修正增益系数,根据像素偏差换算 slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速 completion_threshold = 0.95 # 当移动到目标距离的95%时停止 position_check_interval = 0.1 # 位置检查间隔(秒) last_check_time = start_time # 计算移动方向单位向量(世界坐标系下) direction_vector = [math.cos(start_yaw), math.sin(start_yaw)] if not forward: direction_vector = [-direction_vector[0], -direction_vector[1]] # 视觉跟踪相关变量 last_valid_center_info = None last_valid_deviation = 0 deviation_queue = [] # 用于滤波 filter_size = 5 detection_success_count = 0 detection_total_count = 0 # 监控移动距离和视觉轨迹 while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout: current_time = time.time() # 按固定间隔检查位置,减少计算负担 if current_time - last_check_time >= position_check_interval: # 获取当前位置和朝向 current_position = ctrl.odo_msg.xyz current_yaw = ctrl.odo_msg.rpy[2] # 计算在移动方向上的位移 dx = current_position[0] - start_position[0] dy = current_position[1] - start_position[1] # 计算在初始方向上的投影距离(实际前进距离) distance_moved = dx * direction_vector[0] + dy * direction_vector[1] distance_moved = abs(distance_moved) # 确保距离为正值 # 计算垂直于移动方向的偏移量(y方向偏移) y_offset = -dx * direction_vector[1] + dy * direction_vector[0] # 根据前进或后退确定期望方向 expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi) # 使用IMU朝向数据计算角度偏差 yaw_error = current_yaw - expected_direction # 角度归一化 while yaw_error > math.pi: yaw_error -= 2 * math.pi while yaw_error < -math.pi: yaw_error += 2 * math.pi # 计算完成比例 completion_ratio = distance_moved / abs_distance # 根据距离完成情况调整速度 if completion_ratio > slow_down_ratio: # 计算减速系数 slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio) # 确保不会减速太多 slow_factor = max(0.2, slow_factor) new_speed = actual_speed * slow_factor if observe and abs(new_speed - msg.vel_des[0]) > 0.05: info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动") msg.vel_des[0] = new_speed # 根据角度偏差进行角速度修正 angular_correction = 0 if abs(yaw_error) > angle_correction_threshold: # 计算角速度修正值,偏差越大修正越强 angular_correction = -yaw_error * 0.5 # 比例系数可调整 # 限制最大角速度修正 angular_correction = max(min(angular_correction, 0.2), -0.2) if observe and abs(angular_correction) > 0.05: warning(f"方向修正: 偏差 {math.degrees(yaw_error):.1f}度,应用角速度 {angular_correction:.3f}rad/s", "角度") # 更新最后检查时间和位置 last_check_time = current_time last_position = current_position if observe and current_time - start_time > 1 and (current_time % 1.0 < position_check_interval): debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离") debug(f"Y偏移: {y_offset:.3f}米, 朝向偏差: {math.degrees(yaw_error):.1f}度", "偏移") # 定期进行视觉检查和修正 if current_time - last_vision_check >= vision_check_interval: try: # 获取当前相机帧 frame = ctrl.image_processor.get_current_image() if frame is not None: # 检测轨道线 detection_total_count += 1 if stone_path_mode: # 使用特定的石板路模式 center_info, left_info, right_info = detect_dual_track_lines( frame, observe=False, save_log=False, stone_path_mode=True) else: # 使用自动检测模式 center_info, left_info, right_info = auto_detect_dual_track_lines( frame, observe=False, save_log=False) # 如果成功检测到轨道线,使用它进行修正 if center_info is not None: detection_success_count += 1 # 保存有效的中心信息 last_valid_center_info = center_info # 获取当前偏差 current_deviation = center_info["deviation"] last_valid_deviation = current_deviation # 添加到队列进行滤波 deviation_queue.append(current_deviation) if len(deviation_queue) > filter_size: deviation_queue.pop(0) # 计算滤波后的偏差值 (去除最大和最小值后的平均) if len(deviation_queue) >= 3: filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue filtered_deviation = sum(filtered_deviations) / len(filtered_deviations) else: filtered_deviation = current_deviation # 计算侧向修正速度 lateral_correction = -filtered_deviation * vision_correction_gain # 限制最大侧向速度修正 lateral_correction = max(min(lateral_correction, 0.2), -0.2) if observe and abs(lateral_correction) > 0.05: warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {lateral_correction:.3f}m/s", "视觉") # 应用视觉修正,保留当前前进速度和角速度修正 msg.vel_des = [msg.vel_des[0], lateral_correction, msg.vel_des[2]] msg.life_count += 1 ctrl.Send_cmd(msg) if observe and current_time % 0.5 < 0.05: debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉") debug(f"轨道宽度: {center_info['track_width']:.1f}像素, 石板路模式: {center_info.get('stone_path_mode', False)}", "视觉") debug(f"修正后速度: [{msg.vel_des[0]:.2f}, {msg.vel_des[1]:.2f}, {msg.vel_des[2]:.2f}]", "移动") else: # 如果有之前的有效检测结果,使用上一次的控制值来平滑过渡 if last_valid_center_info is not None and len(deviation_queue) > 0: # 使用上一次的有效偏差,但降低其影响(乘以衰减因子) decay_factor = 0.8 decayed_deviation = last_valid_deviation * decay_factor # 计算侧向修正速度 lateral_correction = -decayed_deviation * vision_correction_gain * 0.7 # 限制最大侧向速度修正 lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 应用衰减后的视觉修正 msg.vel_des = [msg.vel_des[0], lateral_correction, msg.vel_des[2]] msg.life_count += 1 ctrl.Send_cmd(msg) if observe: warning(f"使用上一帧数据,偏差: {decayed_deviation:.1f}像素, 侧向速度: {lateral_correction:.3f}m/s", "恢复") elif observe: warning("未检测到轨道线", "视觉") except Exception as e: error(f"视觉处理异常: {str(e)}", "错误") last_vision_check = current_time time.sleep(0.01) # 小间隔检查位置 # 平滑停止 if observe: info("开始平滑停止", "停止") # 先降低速度再停止,实现平滑停止 slowdown_steps = 5 for i in range(slowdown_steps, 0, -1): slowdown_factor = i / slowdown_steps msg.vel_des = [actual_speed * slowdown_factor, 0, 0] msg.life_count += 1 ctrl.Send_cmd(msg) time.sleep(0.1) # 最后完全停止 if hasattr(ctrl.base_msg, 'stop_smooth'): ctrl.base_msg.stop_smooth() else: ctrl.base_msg.stop() # 获取最终位置和实际移动距离 final_position = ctrl.odo_msg.xyz dx = final_position[0] - start_position[0] dy = final_position[1] - start_position[1] # 计算在初始方向上的投影距离(实际前进距离) actual_distance = dx * direction_vector[0] + dy * direction_vector[1] actual_distance = abs(actual_distance) # 确保距离为正值 # 计算垂直于移动方向的最终偏移量(y方向偏移) final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0] # 计算总移动距离(直线距离) total_distance = math.sqrt(dx*dx + dy*dy) if observe: success(f"移动完成,从里程计计算的移动距离: {actual_distance:.3f}米", "完成") info(f"最终Y偏移: {final_y_offset:.3f}米", "偏移") # 在终点放置标记 if hasattr(ctrl, 'place_marker'): ctrl.place_marker(final_position[0], final_position[1], final_position[2] if len(final_position) > 2 else 0.0, 'red', observe=True) # 显示检测成功率 if detection_total_count > 0: detection_rate = (detection_success_count / detection_total_count) * 100 info(f"轨迹检测成功率: {detection_rate:.1f}% ({detection_success_count}/{detection_total_count})", "统计") # 判断是否成功完成 distance_error = abs(actual_distance - abs_distance) y_error = abs(final_y_offset) go_success = distance_error < 0.1 and y_error < 0.1 # 如果距离误差和y偏移都小于10厘米,则认为成功 if observe: info(f"目标距离: {abs_distance:.3f}米, 实际距离: {actual_distance:.3f}米, 误差: {distance_error:.3f}米", "距离") info(f"Y偏移: {final_y_offset:.3f}米", "偏移") if go_success: success(f"移动成功", "成功") else: warning(f"移动{'部分成功' if distance_error < 0.1 or y_error < 0.1 else '失败'}", "结果") if distance_error >= 0.1: warning(f"距离误差过大: {distance_error:.3f}米", "距离") if y_error >= 0.1: warning(f"Y偏移过大: {y_error:.3f}米", "偏移") return go_success