From b23526ff9d07c52e6f39067a3ddd76cf538cc1bf Mon Sep 17 00:00:00 2001 From: havocrao Date: Wed, 20 Aug 2025 01:33:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A13?= =?UTF-8?q?=E5=92=8C=E4=BB=BB=E5=8A=A14=EF=BC=8C=E6=95=B4=E5=90=88?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E5=9D=A1=E5=92=8C=E9=80=9A=E8=BF=87=E6=A0=85?= =?UTF-8?q?=E6=A0=8F=E7=9A=84=E5=8A=9F=E8=83=BD=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E8=A7=86=E8=A7=89=E6=A3=80=E6=B5=8B=E5=92=8C=E7=A7=BB=E5=8A=A8?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- task_3/task_3.py | 794 ++++++++++++++++++++++++++------------------- task_4/pass_bar.py | 3 +- task_4/task_4.py | 427 ++++-------------------- 3 files changed, 541 insertions(+), 683 deletions(-) diff --git a/task_3/task_3.py b/task_3/task_3.py index 5a22de3..b60f844 100755 --- a/task_3/task_3.py +++ b/task_3/task_3.py @@ -27,310 +27,63 @@ logger = get_logger("任务3") observe = True -robot_cmd = { - 'mode':0, 'gait_id':0, 'contact':0, 'life_count':0, - 'vel_des':[0.0, 0.0, 0.0], - 'rpy_des':[0.0, 0.0, 0.0], - 'pos_des':[0.0, 0.0, 0.0], - 'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - 'ctrl_point':[0.0, 0.0, 0.0], - 'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - 'step_height':[0.0, 0.0], - 'value':0, 'duration':0 -} +YELLOW_RATIO_THRESHOLD = 0.15 # TODO 黄色区域比例阈值 - -def pass_up_down(ctrl, msg): - usergait_msg = file_send_lcmt() - # lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") - - try: - steps = toml.load("./task_3/Gait_Params_up.toml") - full_steps = {'step':[robot_cmd]} - k =0 - for i in steps['step']: - cmd = copy.deepcopy(robot_cmd) - cmd['duration'] = i['duration'] - if i['type'] == 'usergait': - cmd['mode'] = 11 # LOCOMOTION - cmd['gait_id'] = 110 # USERGAIT - cmd['vel_des'] = i['body_vel_des'] - cmd['rpy_des'] = i['body_pos_des'][0:3] - cmd['pos_des'] = i['body_pos_des'][3:6] - cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2] - cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5] - cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8] - cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11] - cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3 - cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3 - cmd['acc_des'] = i['weight'] - cmd['value'] = i['use_mpc_traj'] - cmd['contact'] = math.floor(i['landing_gain'] * 1e1) - cmd['ctrl_point'][2] = i['mu'] - if k == 0: - full_steps['step'] = [cmd] - else: - full_steps['step'].append(cmd) - k=k+1 - f = open("./task_3/Gait_Params_up_full.toml", 'w') - f.write("# Gait Params\n") - f.writelines(toml.dumps(full_steps)) - f.close() - - # pre - file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r') - file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r') - usergait_msg.data = file_obj_gait_def.read() - ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) - time.sleep(0.5) - usergait_msg.data = file_obj_gait_params.read() - ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) - time.sleep(0.1) - file_obj_gait_def.close() - file_obj_gait_params.close() - - msg.mode = 62 - msg.value = 0 - msg.contact = 15 - msg.gait_id = 110 - msg.duration = 1000 - msg.life_count += 1 - - # 参数设置 - stable_count = 0 # 用于计数z轴稳定的次数 - stable_threshold = 8 # 连续15次检测z轴不再增加则认为已经停止 - z_speed_threshold = 0.01 # z轴速度阈值,小于这个值认为已经停止爬升 - climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值 - max_iterations = 230 # 最大循环次数,作为安全保障 - min_iterations = 170 # 最小循环次数,作为安全保障 - - # 姿态判断参数 - pitch_threshold = 0.05 # 俯仰角阈值(弧度) - angular_rate_threshold = 0.03 # 角速度阈值(弧度/秒) - - # 阶段控制 - climbing_detected = False # 是否检测到正在爬坡 - - # 高度变化记录 - height_window = [] - pitch_window = [] - window_size = 8 - - # 记录起始姿态和高度 - start_height = ctrl.odo_msg.xyz[2] - info(f"开始监测爬坡过程,初始高度: {start_height:.3f}", "监测") - - for i in range(max_iterations): - # 发送控制命令维持心跳 - ctrl.Send_cmd(msg) - - # 获取当前状态数据 - vz = ctrl.odo_msg.vxyz[2] # Z轴速度 - current_height = ctrl.odo_msg.xyz[2] # 当前高度 - current_pitch = ctrl.odo_msg.rpy[1] # 当前俯仰角 - pitch_rate = ctrl.odo_msg.omegaBody[1] # 俯仰角速度 - vbody_z = ctrl.odo_msg.vBody[2] # 机体坐标系Z速度 - - # 更新滑动窗口数据 - height_window.append(current_height) - pitch_window.append(current_pitch) - if len(height_window) > window_size: - height_window.pop(0) - pitch_window.pop(0) - - # 每10次迭代打印一次当前信息 - if i % 10 == 0: - info(f"Step:{i} 当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}, 俯仰角={current_pitch:.3f}, 角速度={pitch_rate:.3f}", "监测") - - # 检测是否开始爬坡阶段 - if not climbing_detected and vz > climb_speed_threshold: - climbing_detected = True - info(f"检测到开始爬坡,Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}, 俯仰角: {current_pitch:.3f}", "监测") - - # 多条件判断是否完成爬坡 - if i > min_iterations and climbing_detected and len(height_window) == window_size: - # 计算高度和俯仰角的稳定性 - height_std = np.std(height_window) # 高度标准差 - pitch_std = np.std(pitch_window) # 俯仰角标准差 - - # 多条件综合判断 - position_stable = abs(vz) < z_speed_threshold # 垂直速度稳定 - attitude_stable = abs(current_pitch) < pitch_threshold # 俯仰角接近水平 - angular_rate_stable = abs(pitch_rate) < angular_rate_threshold # 角速度稳定 - height_stable = height_std < 0.01 # 高度变化小 - pitch_stable = pitch_std < 0.01 # 俯仰角变化小 - vbody_stable = abs(vbody_z) < 0.01 # 机体Z方向速度稳定 - - # 综合判断条件 - if (position_stable and attitude_stable and angular_rate_stable) or \ - (position_stable and height_stable and pitch_stable) or \ - (vbody_stable and attitude_stable and height_stable): - stable_count += 1 - if stable_count >= stable_threshold: - info(f"检测到已完成爬坡:\n - Z轴速度: {vz:.3f}\n - 俯仰角: {current_pitch:.3f}\n - 角速度: {pitch_rate:.3f}\n - 当前高度: {current_height:.3f}\n - 上升了: {current_height - start_height:.3f}米", "监测") - break - else: - # 重置稳定计数 - stable_count = 0 - - time.sleep(0.2) - except KeyboardInterrupt: - msg.mode = 7 #PureDamper before KeyboardInterrupt: - msg.gait_id = 0 - msg.duration = 0 - msg.life_count += 1 - ctrl.Send_cmd(msg) - pass - - section('任务3-2:x = 2', "开始") +def run_task_3(ctrl, msg, time_sleep=5000): + section('任务3:上下坡', "启动") + info('开始执行任务3...', "启动") turn_degree_v2(ctrl, msg, 90, absolute=True) - center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False, detect_height=0.3) - go_straight(ctrl, msg, distance=0.5, speed=0.5, observe=True) - time.sleep(1) - section('任务3-3:down', "完成") - try: - steps = toml.load("./task_3/Gait_Params_down.toml") - full_steps = {'step':[robot_cmd]} - k = 0 - for i in steps['step']: - cmd = copy.deepcopy(robot_cmd) - cmd['duration'] = i['duration'] - if i['type'] == 'usergait': - cmd['mode'] = 11 # LOCOMOTION - cmd['gait_id'] = 110 # USERGAIT - cmd['vel_des'] = i['body_vel_des'] - cmd['rpy_des'] = i['body_pos_des'][0:3] - cmd['pos_des'] = i['body_pos_des'][3:6] - cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2] - cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5] - cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8] - cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11] - cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3 - cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3 - cmd['acc_des'] = i['weight'] - cmd['value'] = i['use_mpc_traj'] - cmd['contact'] = math.floor(i['landing_gain'] * 1e1) - cmd['ctrl_point'][2] = i['mu'] - if k == 0: - full_steps['step'] = [cmd] - else: - full_steps['step'].append(cmd) - k=k+1 - f = open("./task_3/Gait_Params_down_full.toml", 'w') - f.write("# Gait Params\n") - f.writelines(toml.dumps(full_steps)) - f.close() + section('任务3-1:up and down', "开始") + go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21]) + # pass_up_down(ctrl, msg) - # pre - file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r') - file_obj_gait_params = open("./task_3/Gait_Params_down_full.toml",'r') - usergait_msg.data = file_obj_gait_def.read() - ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) - time.sleep(0.5) - usergait_msg.data = file_obj_gait_params.read() - ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) - time.sleep(0.1) - file_obj_gait_def.close() - file_obj_gait_params.close() + turn_degree_v2(ctrl, msg, 90, absolute=True) + center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False) - msg.mode = 62 - msg.value = 0 - msg.contact = 15 - msg.gait_id = 110 - msg.duration = 1000 - msg.life_count += 1 - - # 参数设置 - stable_count = 0 # 用于计数z轴稳定的次数 - stable_threshold = 10 # 连续10次检测z轴速度接近零则认为已经到达平地 - z_speed_threshold = 0.005 # z轴速度阈值,小于这个值认为已经停止下降 - descent_speed_threshold = -0.05 # 检测到开始下坡的速度阈值(负值表示下降) - max_iterations = 110 # 最大循环次数,作为安全保障 - min_iterations = 80 # 最小循环次数,确保有足够的时间开始动作 - start_height = ctrl.odo_msg.xyz[2] # 记录起始高度 - - # 姿态判断参数 - pitch_threshold = 0.05 # 俯仰角阈值(弧度) - angular_rate_threshold = 0.03 # 角速度阈值(弧度/秒) - - # 阶段控制 - descending_detected = False # 是否检测到正在下坡 - flat_ground_detected = False # 是否检测到已到达平地 - - # 高度变化记录 - height_window = [] - pitch_window = [] - window_size = 8 - - info(f"开始监测下坡过程,初始高度: {start_height}", "监测") - - for i in range(max_iterations): - # 发送控制命令维持心跳 - ctrl.Send_cmd(msg) - - # 获取当前状态数据 - vz = ctrl.odo_msg.vxyz[2] # Z轴速度 - current_height = ctrl.odo_msg.xyz[2] # 当前高度 - current_pitch = ctrl.odo_msg.rpy[1] # 当前俯仰角 - pitch_rate = ctrl.odo_msg.omegaBody[1] # 俯仰角速度 - vbody_z = ctrl.odo_msg.vBody[2] # 机体坐标系Z速度 - - # 更新滑动窗口数据 - height_window.append(current_height) - pitch_window.append(current_pitch) - if len(height_window) > window_size: - height_window.pop(0) - pitch_window.pop(0) - - # 每10次迭代打印一次当前信息 - if observe and i % 10 == 0: - info(f"Step:{i} 当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}, 俯仰角={current_pitch:.3f}, 角速度={pitch_rate:.3f}", "监测") - - # 检测是否开始下坡阶段 - if not descending_detected and vz < descent_speed_threshold: - descending_detected = True - info(f"检测到开始下坡,Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}, 俯仰角: {current_pitch:.3f}", "监测") - - # 多条件判断是否到达平地 - if i > min_iterations and descending_detected and len(height_window) == window_size: - # 计算高度和俯仰角的稳定性 - height_std = np.std(height_window) # 高度标准差 - pitch_std = np.std(pitch_window) # 俯仰角标准差 - - # 多条件综合判断 - position_stable = abs(vz) < z_speed_threshold # 垂直速度稳定 - attitude_stable = abs(current_pitch) < pitch_threshold # 俯仰角接近水平 - angular_rate_stable = abs(pitch_rate) < angular_rate_threshold # 角速度稳定 - height_stable = height_std < 0.01 # 高度变化小 - pitch_stable = pitch_std < 0.01 # 俯仰角变化小 - vbody_stable = abs(vbody_z) < 0.01 # 机体Z方向速度稳定 - - # 综合判断条件 - if (position_stable and attitude_stable and angular_rate_stable) or \ - (position_stable and height_stable and pitch_stable) or \ - (vbody_stable and attitude_stable and height_stable): - stable_count += 1 - if stable_count >= stable_threshold: - info(f"检测到已到达平地:\n - Z轴速度: {vz:.3f}\n - 俯仰角: {current_pitch:.3f}\n - 角速度: {pitch_rate:.3f}\n - 高度: {current_height:.3f}\n - 下降了: {start_height - current_height:.3f}米", "监测") - flat_ground_detected = True - break - else: - # 重置稳定计数 - stable_count = 0 - - time.sleep(0.2) - - if not flat_ground_detected: - info(f"达到最大循环次数,未能明确检测到到达平地。当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "警告") - except KeyboardInterrupt: - msg.mode = 7 #PureDamper before KeyboardInterrupt: - msg.gait_id = 0 - msg.duration = 0 - msg.life_count += 1 - ctrl.Send_cmd(msg) - pass + section('任务3-2:yellow stop', "开始") + go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3) + + # 原地站立3秒 + section("原地站立3秒", "站立") + msg.mode = 12 + msg.gait_id = 0 + msg.duration = 0 + msg.step_height = [0.06, 0.06] + msg.vel_des = [0, 0, 0] + msg.life_count += 1 + ctrl.Send_cmd(msg) + + info("开始原地站立3秒", "站立") + time.sleep(time_sleep / 1000) + info("完成原地站立", "站立") + +def run_task_3_back(ctrl, msg, time_sleep=5000): + section('任务3-1:up', "开始") + section('任务3-2:yellow stop', "开始") + go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3) + + # 原地站立3秒 + section("原地站立3秒", "站立") + msg.mode = 12 + msg.gait_id = 0 + msg.duration = 0 + msg.step_height = [0.06, 0.06] + msg.vel_des = [0, 0, 0] + msg.life_count += 1 + ctrl.Send_cmd(msg) + + info("开始原地站立3秒", "站立") + time.sleep(time_sleep / 1000) + info("完成原地站立", "站立") + + section('任务3-3:up and down', "开始") + go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21]) + + # TODO 调优达到一个合适的位置,准备 task 2-5 的返回 + # 或许用到 move-to-line 函数 + def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_time=30, observe=True): """ @@ -387,7 +140,7 @@ def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_ # 定期发送移动命令保持移动状态 if current_time - last_check_time >= check_interval: # 获取当前图像并保存到临时文件 - current_image = ctrl.image_processor.get_current_image() + current_image = ctrl.image_processor.get_current_image('ai') # INFO 默认采用 ai 相机 # 创建临时文件保存图像 with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file: @@ -490,35 +243,426 @@ def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_ return False -def run_task_3(ctrl, msg, time_sleep=5000): - section('任务3:上下坡', "启动") - info('开始执行任务3...', "启动") - - turn_degree_v2(ctrl, msg, 90, absolute=True) - - section('任务3-1:up', "开始") - pass_up_down(ctrl, msg) - - turn_degree_v2(ctrl, msg, 90, absolute=True) - center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False) - - section('任务3-2:yellow stop', "开始") - go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3) +def go_straight_with_enhanced_calibration(ctrl, msg, distance, speed=0.5, observe=False, + mode=11, gait_id=3, step_height=[0.21, 0.21]): + """ + 控制机器人在石板路上沿直线行走,使用视觉校准和姿态传感器融合来保持直线 - # 原地站立3秒 - section("原地站立3秒", "站立") - msg.mode = 12 - msg.gait_id = 0 - msg.duration = 0 - msg.step_height = [0.06, 0.06] - msg.vel_des = [0, 0, 0] + 参数: + ctrl: Robot_Ctrl 对象 + msg: 控制消息对象 + distance: 要行走的距离(米) + speed: 行走速度(米/秒) + observe: 是否输出调试信息 + mode: 运动模式 + gait_id: 步态ID + step_height: 摆动腿高度 + + 返回: + bool: 是否成功完成 + """ + section("开始石板路增强直线移动", "石板路移动") + + # 参数验证 + if abs(distance) < 0.01: + info("距离太短,无需移动", "信息") + return True + + # 检查相机是否可用 + if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'): + warning("机器人控制器没有提供图像处理器,将使用姿态传感器辅助校准", "警告") + + # 限制速度范围 + speed = min(max(abs(speed), 0.1), 1.0) + + # 确定前进或后退方向 + forward = distance > 0 + move_speed = speed if forward else -speed + abs_distance = abs(distance) + + # 获取起始位置和姿态 + start_position = list(ctrl.odo_msg.xyz) + start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向 + + if observe: + debug(f"起始位置: {start_position}", "位置") + info(f"开始石板路增强直线移动,距离: {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动") + + # 设置移动命令 + msg.mode = mode + msg.gait_id = gait_id + msg.step_height = step_height + msg.duration = 0 # wait next cmd + + # 根据需要移动的距离动态调整移动速度 + if abs_distance > 1.0: + actual_speed = move_speed # 距离较远时用设定速度 + else: + actual_speed = move_speed * 0.8 # 较近距离略微降速 + + # 设置移动速度和方向 + msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度] msg.life_count += 1 + + # 发送命令 ctrl.Send_cmd(msg) - info("开始原地站立3秒", "站立") - time.sleep(time_sleep / 1000) - info("完成原地站立", "站立") + # 估算移动时间,但实际上会通过里程计控制 + estimated_time = abs_distance / abs(actual_speed) + timeout = estimated_time + 5 # 增加超时时间 + # 使用里程计进行实时监控移动距离 + distance_moved = 0 + start_time = time.time() + last_check_time = start_time + position_check_interval = 0.1 # 位置检查间隔(秒) + + # 计算移动方向单位向量(世界坐标系下) + direction_vector = [math.cos(start_yaw), math.sin(start_yaw)] + if not forward: + direction_vector = [-direction_vector[0], -direction_vector[1]] + + # 视觉跟踪相关变量 + vision_check_interval = 0.2 # 视觉检查间隔(秒) + last_vision_check = start_time + vision_correction_gain = 0.006 # 视觉修正增益系数 + + # 用于滤波的队列 + deviation_queue = [] + filter_size = 5 + last_valid_deviation = 0 + + # PID控制参数 - 用于角度修正 + kp_angle = 0.6 # 比例系数 + ki_angle = 0.02 # 积分系数 + kd_angle = 0.1 # 微分系数 + + # PID控制变量 + angle_error_integral = 0 + last_angle_error = 0 + + # 偏移量累计 - 用于检测持续偏移 + y_offset_accumulator = 0 + + # 动态调整参数 + slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速 + completion_threshold = 0.95 # 当移动到目标距离的95%时停止 + + # 监控移动过程 + while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout: + current_time = time.time() + + # 按固定间隔检查位置 + if current_time - last_check_time >= position_check_interval: + # 获取当前位置和朝向 + current_position = ctrl.odo_msg.xyz + current_yaw = ctrl.odo_msg.rpy[2] + + # 计算在移动方向上的位移 + dx = current_position[0] - start_position[0] + dy = current_position[1] - start_position[1] + + # 计算在初始方向上的投影距离(实际前进距离) + distance_moved = dx * direction_vector[0] + dy * direction_vector[1] + distance_moved = abs(distance_moved) # 确保距离为正值 + + # 计算垂直于移动方向的偏移量(y方向偏移) + y_offset = -dx * direction_vector[1] + dy * direction_vector[0] + + # 累积y方向偏移量,检测持续偏移趋势 + y_offset_accumulator = y_offset_accumulator * 0.8 + y_offset * 0.2 + + # 根据前进或后退确定期望方向 + expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi) + + # 使用IMU朝向数据计算角度偏差 + yaw_error = current_yaw - expected_direction + # 角度归一化 + while yaw_error > math.pi: + yaw_error -= 2 * math.pi + while yaw_error < -math.pi: + yaw_error += 2 * math.pi + + # 使用PID控制计算角速度修正 + # 比例项 + p_control = kp_angle * yaw_error + + # 积分项 (带衰减) + angle_error_integral = angle_error_integral * 0.9 + yaw_error + angle_error_integral = max(-1.0, min(1.0, angle_error_integral)) # 限制积分范围 + i_control = ki_angle * angle_error_integral + + # 微分项 + d_control = kd_angle * (yaw_error - last_angle_error) + last_angle_error = yaw_error + + # 计算总的角速度控制量 + angular_correction = -(p_control + i_control + d_control) + # 限制最大角速度修正 + angular_correction = max(min(angular_correction, 0.3), -0.3) + + # 根据持续的y偏移趋势增加侧向校正 + lateral_correction = 0 + if abs(y_offset_accumulator) > 0.05: # 如果累积偏移超过5厘米 + lateral_correction = -y_offset_accumulator * 0.8 # 反向校正 + lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 限制最大侧向速度 + + if observe and abs(lateral_correction) > 0.05: + warning(f"累积Y偏移校正: {y_offset_accumulator:.3f}米,应用侧向速度 {lateral_correction:.3f}m/s", "偏移") + + # 计算完成比例 + completion_ratio = distance_moved / abs_distance + + # 根据距离完成情况调整速度 + if completion_ratio > slow_down_ratio: + # 计算减速系数 + slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio) + # 确保不会减速太多 + slow_factor = max(0.3, slow_factor) + new_speed = actual_speed * slow_factor + + if observe and abs(new_speed - msg.vel_des[0]) > 0.05: + info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动") + + actual_speed = new_speed + + # 应用修正 - 同时应用角速度和侧向速度修正 + msg.vel_des = [actual_speed, lateral_correction, angular_correction] + msg.life_count += 1 + ctrl.Send_cmd(msg) + + if observe and current_time % 1.0 < position_check_interval: + debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离") + debug(f"Y偏移: {y_offset:.3f}米, 累积偏移: {y_offset_accumulator:.3f}米", "偏移") + debug(f"朝向偏差: {math.degrees(yaw_error):.1f}度, 角速度修正: {angular_correction:.3f}rad/s", "角度") + debug(f"PID: P={p_control:.3f}, I={i_control:.3f}, D={d_control:.3f}", "控制") + + # 更新检查时间 + last_check_time = current_time + + # 定期进行视觉检查和修正 # TODO 这个函数如果用于 上下坡的话,感觉或许就不需要了,如果能保证 task 2-5 的效果。 + if hasattr(ctrl, 'image_processor') and current_time - last_vision_check >= vision_check_interval: + try: + # 获取当前相机帧 + frame = ctrl.image_processor.get_current_image() + if frame is not None: + # 检测轨道线 - 使用特定的石板路模式 + center_info, _, _ = detect_dual_track_lines( + frame, observe=False, save_log=False) + + # 如果成功检测到轨道线,使用它进行修正 + if center_info is not None: + # 获取当前偏差 + current_deviation = center_info["deviation"] + last_valid_deviation = current_deviation + + # 添加到队列进行滤波 + deviation_queue.append(current_deviation) + if len(deviation_queue) > filter_size: + deviation_queue.pop(0) + + # 计算滤波后的偏差值 (去除最大和最小值后的平均) + if len(deviation_queue) >= 3: + filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue + filtered_deviation = sum(filtered_deviations) / len(filtered_deviations) + else: + filtered_deviation = current_deviation + + # 计算视觉侧向修正速度 + vision_lateral_correction = -filtered_deviation * vision_correction_gain + # 限制最大侧向速度修正 + vision_lateral_correction = max(min(vision_lateral_correction, 0.2), -0.2) + + # 与当前的侧向校正进行融合 (加权平均) + if msg.vel_des[1] != 0: + # 如果已经有侧向校正,与视觉校正进行融合 + fused_lateral = msg.vel_des[1] * 0.3 + vision_lateral_correction * 0.7 + else: + # 如果没有侧向校正,直接使用视觉校正 + fused_lateral = vision_lateral_correction + + if observe and abs(vision_lateral_correction) > 0.05: + warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {vision_lateral_correction:.3f}m/s", "视觉") + + # 应用视觉修正,保留当前前进速度和角速度 + msg.vel_des = [msg.vel_des[0], fused_lateral, msg.vel_des[2]] + msg.life_count += 1 + ctrl.Send_cmd(msg) + + if observe and current_time % 1.0 < vision_check_interval: + debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉") + debug(f"融合侧向速度: {fused_lateral:.3f}m/s", "视觉") + except Exception as e: + if observe: + error(f"视觉检测异常: {e}", "错误") + + # 更新视觉检查时间 + last_vision_check = current_time + + # 短暂延时 + time.sleep(0.01) + + # 平滑停止 + slowdown_steps = 5 + for i in range(slowdown_steps, 0, -1): + slowdown_factor = i / slowdown_steps + msg.vel_des = [actual_speed * slowdown_factor, 0, 0] + msg.life_count += 1 + ctrl.Send_cmd(msg) + time.sleep(0.1) + + # 最后完全停止 + if hasattr(ctrl.base_msg, 'stop_smooth'): + ctrl.base_msg.stop_smooth() + else: + ctrl.base_msg.stop() + + # 获取最终位置和实际移动距离 + final_position = ctrl.odo_msg.xyz + dx = final_position[0] - start_position[0] + dy = final_position[1] - start_position[1] + actual_distance = math.sqrt(dx*dx + dy*dy) + + # 计算最终y方向偏移 + final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0] + + if observe: + success(f"石板路增强直线移动完成,实际移动距离: {actual_distance:.3f}米", "完成") + info(f"最终Y方向偏移: {final_y_offset:.3f}米", "偏移") + + # 判断是否成功完成 + distance_error = abs(actual_distance - abs_distance) + y_offset_error = abs(final_y_offset) + + go_success = distance_error < 0.1 and y_offset_error < 0.1 # 距离误差和y偏移都小于10厘米视为成功 + + if observe: + if go_success: + success(f"移动成功", "成功") + else: + warning(f"移动不够精确,距离误差: {distance_error:.3f}米, Y偏移: {y_offset_error:.3f}米", "警告") + + return go_success -def run_task_3_back(ctrl, msg): - return \ No newline at end of file + +# INFO 保持文件相对路径,直接从 task-4中调用 +robot_cmd = { + 'mode':0, 'gait_id':0, 'contact':0, 'life_count':0, + 'vel_des':[0.0, 0.0, 0.0], + 'rpy_des':[0.0, 0.0, 0.0], + 'pos_des':[0.0, 0.0, 0.0], + 'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + 'ctrl_point':[0.0, 0.0, 0.0], + 'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + 'step_height':[0.0, 0.0], + 'value':0, 'duration':0 +} + +def pass_stone(ctrl, msg, distance=5.0, observe=True): + """ + 通过里程计设定distance,执行上坡gait。 + :param ctrl: 控制器对象 + :param msg: 控制消息 + :param distance: 期望移动距离(米) + :param observe: 是否打印过程信息 + """ + usergait_msg = file_send_lcmt() + try: + # 1. 生成gait参数文件 + steps = toml.load("./task_3/Gait_Params_up.toml") + full_steps = {'step':[robot_cmd]} + k = 0 + for i in steps['step']: + cmd = copy.deepcopy(robot_cmd) + cmd['duration'] = i['duration'] + if i['type'] == 'usergait': + cmd['mode'] = 11 # LOCOMOTION + cmd['gait_id'] = 110 # USERGAIT + cmd['vel_des'] = i['body_vel_des'] + cmd['rpy_des'] = i['body_pos_des'][0:3] + cmd['pos_des'] = i['body_pos_des'][3:6] + cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2] + cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5] + cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8] + cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11] + cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3 + cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3 + cmd['acc_des'] = i['weight'] + cmd['value'] = i['use_mpc_traj'] + cmd['contact'] = math.floor(i['landing_gain'] * 1e1) + cmd['ctrl_point'][2] = i['mu'] + if k == 0: + full_steps['step'] = [cmd] + else: + full_steps['step'].append(cmd) + k += 1 + with open("./task_3/Gait_Params_up_full.toml", 'w') as f: + f.write("# Gait Params\n") + f.writelines(toml.dumps(full_steps)) + + # 2. 发送gait定义和参数 + with open("./task_3/Gait_Def_up.toml",'r') as file_obj_gait_def, \ + open("./task_3/Gait_Params_up_full.toml",'r') as file_obj_gait_params: + usergait_msg.data = file_obj_gait_def.read() + ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) + time.sleep(0.5) + usergait_msg.data = file_obj_gait_params.read() + ctrl.lc_s.publish("user_gait_file", usergait_msg.encode()) + time.sleep(0.1) + + # 3. 记录起始位置 + start_xyz = ctrl.odo_msg.xyz[:2] # 只取x, y + if observe: + info(f"pass_up_down: 期望移动距离 {distance:.3f} 米,起始位置: {start_xyz}", "监测") + + # 4. 设置gait执行参数 + msg.mode = 62 + msg.value = 0 + msg.contact = 15 + msg.gait_id = 110 + msg.duration = 1000 + msg.life_count += 1 + + # 5. 控制循环,按里程计判断是否到达(去除max-iteration,直接while判断) + arrived = False + step = 0 + while True: + ctrl.Send_cmd(msg) + cur_xyz = ctrl.odo_msg.xyz[:2] + dx = cur_xyz[0] - start_xyz[0] + dy = cur_xyz[1] - start_xyz[1] + moved = (dx**2 + dy**2) ** 0.5 + if observe and step % 10 == 0: + info(f"Step:{step} 已移动距离={moved:.3f}m, 目标={distance:.3f}m, 当前xy={cur_xyz}", "监测") + if moved >= distance: + arrived = True + if observe: + success(f"pass_up_down: 已到达设定距离 {distance:.3f}m (实际: {moved:.3f}m)", "完成") + break + step += 1 + time.sleep(0.1) + + # 6. 完全停止 + if hasattr(ctrl.base_msg, 'stop_smooth'): + ctrl.base_msg.stop_smooth() + else: + ctrl.base_msg.stop() + ctrl.Send_cmd(msg) + time.sleep(0.1) + + # 7. 反馈最终位置 + final_xyz = ctrl.odo_msg.xyz[:2] + dx = final_xyz[0] - start_xyz[0] + dy = final_xyz[1] - start_xyz[1] + actual_distance = (dx**2 + dy**2) ** 0.5 + if observe: + info(f"pass_up_down: 最终位置 {final_xyz}, 实际移动距离 {actual_distance:.3f}m", "监测") + return arrived + + except KeyboardInterrupt: + msg.mode = 7 #PureDamper before KeyboardInterrupt: + msg.gait_id = 0 + msg.duration = 0 + msg.life_count += 1 + ctrl.Send_cmd(msg) + pass \ No newline at end of file diff --git a/task_4/pass_bar.py b/task_4/pass_bar.py index a3ded74..8ae4ca5 100755 --- a/task_4/pass_bar.py +++ b/task_4/pass_bar.py @@ -35,7 +35,8 @@ robot_cmd = { def pass_bar(ctrl, msg): """ - 俯身通过一个栅栏 + 俯身通过一个栅栏; + 会默认前进一定的距离。 """ step_num = 8 diff --git a/task_4/task_4.py b/task_4/task_4.py index 7b1299b..4870646 100755 --- a/task_4/task_4.py +++ b/task_4/task_4.py @@ -17,25 +17,22 @@ from utils.detect_dual_track_lines import detect_dual_track_lines from base_move.move_base_hori_line import calculate_distance_to_line from task_4.pass_bar import pass_bar from base_move.center_on_dual_tracks import center_on_dual_tracks +from task_3.task_3 import pass_stone # 创建本模块特定的日志记录器 logger = get_logger("任务4") +observe = True + +STONE_DISTANCE = 5.0 # TODO 距离参数需要微调 +RED_RATIO_THRESHOLD = 0.35 # TODO 红色区域比例阈值需要微调 + def run_task_4(ctrl, msg): section('任务4-1:直线移动', "移动") - # 设置机器人运动模式为快步跑 - msg.mode = 11 # 运动模式 - msg.gait_id = 3 # 步态ID(快步跑) - msg.vel_des = [0.35, 0, 0] # 期望速度 - msg.pos_des = [ 0, 0, 0] - msg.duration = 0 # 零时长表示持续运动,直到接收到新命令 - msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度 - msg.life_count += 1 - ctrl.Send_cmd(msg) - time.sleep(5) # 持续5秒钟 + pass_stone(ctrl, msg, distance=STONE_DISTANCE) - section('任务4-2:移动直到灰色天空比例小于阈值', "天空检测") - go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2) + section('任务4-2:移动直到红色区域比例大于阈值', "红色检测") + go_straight_until_red_bar(ctrl, msg, red_ratio_threshold=RED_RATIO_THRESHOLD, speed=0.2) section('任务4-3:通过栏杆', "移动") pass_bar(ctrl, msg) @@ -53,14 +50,14 @@ def run_task_4_back(ctrl, msg): go_straight(ctrl, msg, distance=2, speed=1, observe=True) center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False) - # 向右移动0.5秒 - section('任务4-回程:向右移动', "移动") - go_lateral(ctrl, msg, distance=-0.1, speed=0.15, observe=True) # DEBUG + # TODO 向右移动0.5秒 (或许不需要了) + # section('任务4-回程:向右移动', "移动") + # go_lateral(ctrl, msg, distance=-0.1, speed=0.15, observe=True) # DEBUG turn_degree_v2(ctrl, msg, degree=-90, absolute=True) section('任务4-1:移动直到灰色天空比例低于阈值', "天空检测") - go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.35, speed=0.2) + go_straight_until_red_bar(ctrl, msg, red_ratio_threshold=RED_RATIO_THRESHOLD, speed=0.2) section('任务4-2:通过栏杆', "移动") turn_degree_v2(ctrl, msg, degree=-90, absolute=True) @@ -69,11 +66,11 @@ def run_task_4_back(ctrl, msg): turn_degree_v2(ctrl, msg, degree=-90, absolute=True) section('任务4-3:stone', "移动") go_straight(ctrl, msg, distance=1, speed=2) - - turn_degree_v2(ctrl, msg, degree=-92, absolute=True) + turn_degree_v2(ctrl, msg, degree=-90, absolute=True) # Use enhanced calibration for better Y-axis correction on stone path - go_straight(ctrl, msg, distance=4.5, speed=0.35, mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True) + # go_straight(ctrl, msg, distance=4.5, speed=0.35, mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True) + pass_stone(ctrl, msg, distance=STONE_DISTANCE) # go_straight_with_enhanced_calibration(ctrl, msg, distance=4.5, speed=0.35, # mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True) @@ -85,387 +82,103 @@ def run_task_4_back(ctrl, msg): current_distance = calculate_distance_to_line(edge_info, camera_height, observe=True) go_straight(ctrl, msg, distance=current_distance, speed=0.20, mode=11, gait_id=3, step_height=[0.21, 0.21]) -def go_straight_until_sky_ratio_below(ctrl, msg, - sky_ratio_threshold=0.2, - step_distance=0.5, - max_distance=5, - speed=0.3 - ): +def go_straight_until_red_bar(ctrl, msg, + red_ratio_threshold=0.2, + step_distance=0.5, + max_distance=5, + speed=0.3 + ): """ - 控制机器人沿直线行走,直到灰色天空比例低于指定阈值 - + 控制机器人沿直线行走,直到红色区域比例高于指定阈值 + 参数: ctrl: Robot_Ctrl对象 msg: 控制命令消息对象 - sky_ratio_threshold: 灰色天空比例阈值,当检测到的比例低于此值时停止 + red_ratio_threshold: 红色区域比例阈值,当检测到的比例高于此值时停止 step_distance: 每次移动的步长(米) max_distance: 最大移动距离(米),防止无限前进 speed: 移动速度(米/秒) - + 返回: - bool: 是否成功找到天空比例低于阈值的位置 + bool: 是否成功找到红色区域比例高于阈值的位置 """ + def analyze_red_area_ratio(image): + """ + 分析图像中红色区域的占比 + 输入: BGR图像 + 返回: 红色区域占比 (0~1) + """ + # 转换到HSV空间 + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + # 红色有两个区间 + lower_red1 = np.array([0, 70, 50]) + upper_red1 = np.array([10, 255, 255]) + lower_red2 = np.array([160, 70, 50]) + upper_red2 = np.array([180, 255, 255]) + mask1 = cv2.inRange(hsv, lower_red1, upper_red1) + mask2 = cv2.inRange(hsv, lower_red2, upper_red2) + mask = cv2.bitwise_or(mask1, mask2) + red_pixels = np.count_nonzero(mask) + total_pixels = mask.shape[0] * mask.shape[1] + ratio = red_pixels / total_pixels if total_pixels > 0 else 0 + return ratio + total_distance = 0 success_flag = False - + # 设置移动命令 msg.mode = 11 # Locomotion模式 msg.gait_id = 26 # 自变频步态 msg.step_height = [0.06, 0.06] # 抬腿高度 - + while total_distance < max_distance: # 获取当前图像 - current_image = ctrl.image_processor.get_current_image() + current_image = ctrl.image_processor.get_current_image('ai') if current_image is None: warning("无法获取图像,等待...", "图像") time.sleep(0.5) continue - - # 保存当前图像用于分析 - temp_image_path = "/tmp/current_sky_image.jpg" - cv2.imwrite(temp_image_path, current_image) - - # 分析灰色天空比例 + + # 分析红色区域比例 try: - sky_ratio = analyze_gray_sky_ratio(temp_image_path) - info(f"当前灰色天空比例: {sky_ratio:.2%}", "分析") - - # 如果天空比例高于阈值,停止移动 - if sky_ratio < sky_ratio_threshold: - success(f"检测到灰色天空比例({sky_ratio:.2%})低于阈值({sky_ratio_threshold:.2%}),停止移动", "完成") + red_ratio = analyze_red_area_ratio(current_image) + info(f"当前红色区域比例: {red_ratio:.2%}", "分析") + + # 如果红色区域比例高于阈值,停止移动 + if red_ratio > red_ratio_threshold: + success(f"检测到红色区域比例({red_ratio:.2%})高于阈值({red_ratio_threshold:.2%}),停止移动", "完成") success_flag = True break - + except Exception as e: error(f"分析图像时出错: {e}", "错误") - + # 继续前进一段距离 info(f"继续前进 {step_distance} 米...", "移动") - + # 设置移动速度和方向 msg.vel_des = [speed, 0, 0] # [前进速度, 侧向速度, 角速度] msg.duration = 0 # wait next cmd msg.life_count += 1 - + # 发送命令 ctrl.Send_cmd(msg) - + # 估算前进时间 move_time = step_distance / speed time.sleep(move_time) - + # 累计移动距离 total_distance += step_distance info(f"已移动总距离: {total_distance:.2f} 米", "距离") - - # 平滑停止 - if hasattr(ctrl.base_msg, 'stop_smooth'): - ctrl.base_msg.stop_smooth() - else: - ctrl.base_msg.stop() - - if not success_flag and total_distance >= max_distance: - warning(f"已达到最大移动距离 {max_distance} 米,但未找到天空比例小于 {sky_ratio_threshold:.2%} 的位置", "超时") - - return success_flag -def go_straight_with_enhanced_calibration(ctrl, msg, distance, speed=0.5, observe=False, - mode=11, gait_id=3, step_height=[0.21, 0.21]): - """ - 控制机器人在石板路上沿直线行走,使用视觉校准和姿态传感器融合来保持直线 - - 参数: - ctrl: Robot_Ctrl 对象 - msg: 控制消息对象 - distance: 要行走的距离(米) - speed: 行走速度(米/秒) - observe: 是否输出调试信息 - mode: 运动模式 - gait_id: 步态ID - step_height: 摆动腿高度 - - 返回: - bool: 是否成功完成 - """ - section("开始石板路增强直线移动", "石板路移动") - - # 参数验证 - if abs(distance) < 0.01: - info("距离太短,无需移动", "信息") - return True - - # 检查相机是否可用 - if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'): - warning("机器人控制器没有提供图像处理器,将使用姿态传感器辅助校准", "警告") - - # 限制速度范围 - speed = min(max(abs(speed), 0.1), 1.0) - - # 确定前进或后退方向 - forward = distance > 0 - move_speed = speed if forward else -speed - abs_distance = abs(distance) - - # 获取起始位置和姿态 - start_position = list(ctrl.odo_msg.xyz) - start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向 - - if observe: - debug(f"起始位置: {start_position}", "位置") - info(f"开始石板路增强直线移动,距离: {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动") - - # 设置移动命令 - msg.mode = mode - msg.gait_id = gait_id - msg.step_height = step_height - msg.duration = 0 # wait next cmd - - # 根据需要移动的距离动态调整移动速度 - if abs_distance > 1.0: - actual_speed = move_speed # 距离较远时用设定速度 - else: - actual_speed = move_speed * 0.8 # 较近距离略微降速 - - # 设置移动速度和方向 - msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度] - msg.life_count += 1 - - # 发送命令 - ctrl.Send_cmd(msg) - - # 估算移动时间,但实际上会通过里程计控制 - estimated_time = abs_distance / abs(actual_speed) - timeout = estimated_time + 5 # 增加超时时间 - - # 使用里程计进行实时监控移动距离 - distance_moved = 0 - start_time = time.time() - last_check_time = start_time - position_check_interval = 0.1 # 位置检查间隔(秒) - - # 计算移动方向单位向量(世界坐标系下) - direction_vector = [math.cos(start_yaw), math.sin(start_yaw)] - if not forward: - direction_vector = [-direction_vector[0], -direction_vector[1]] - - # 视觉跟踪相关变量 - vision_check_interval = 0.2 # 视觉检查间隔(秒) - last_vision_check = start_time - vision_correction_gain = 0.006 # 视觉修正增益系数 - - # 用于滤波的队列 - deviation_queue = [] - filter_size = 5 - last_valid_deviation = 0 - - # PID控制参数 - 用于角度修正 - kp_angle = 0.6 # 比例系数 - ki_angle = 0.02 # 积分系数 - kd_angle = 0.1 # 微分系数 - - # PID控制变量 - angle_error_integral = 0 - last_angle_error = 0 - - # 偏移量累计 - 用于检测持续偏移 - y_offset_accumulator = 0 - - # 动态调整参数 - slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速 - completion_threshold = 0.95 # 当移动到目标距离的95%时停止 - - # 监控移动过程 - while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout: - current_time = time.time() - - # 按固定间隔检查位置 - if current_time - last_check_time >= position_check_interval: - # 获取当前位置和朝向 - current_position = ctrl.odo_msg.xyz - current_yaw = ctrl.odo_msg.rpy[2] - - # 计算在移动方向上的位移 - dx = current_position[0] - start_position[0] - dy = current_position[1] - start_position[1] - - # 计算在初始方向上的投影距离(实际前进距离) - distance_moved = dx * direction_vector[0] + dy * direction_vector[1] - distance_moved = abs(distance_moved) # 确保距离为正值 - - # 计算垂直于移动方向的偏移量(y方向偏移) - y_offset = -dx * direction_vector[1] + dy * direction_vector[0] - - # 累积y方向偏移量,检测持续偏移趋势 - y_offset_accumulator = y_offset_accumulator * 0.8 + y_offset * 0.2 - - # 根据前进或后退确定期望方向 - expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi) - - # 使用IMU朝向数据计算角度偏差 - yaw_error = current_yaw - expected_direction - # 角度归一化 - while yaw_error > math.pi: - yaw_error -= 2 * math.pi - while yaw_error < -math.pi: - yaw_error += 2 * math.pi - - # 使用PID控制计算角速度修正 - # 比例项 - p_control = kp_angle * yaw_error - - # 积分项 (带衰减) - angle_error_integral = angle_error_integral * 0.9 + yaw_error - angle_error_integral = max(-1.0, min(1.0, angle_error_integral)) # 限制积分范围 - i_control = ki_angle * angle_error_integral - - # 微分项 - d_control = kd_angle * (yaw_error - last_angle_error) - last_angle_error = yaw_error - - # 计算总的角速度控制量 - angular_correction = -(p_control + i_control + d_control) - # 限制最大角速度修正 - angular_correction = max(min(angular_correction, 0.3), -0.3) - - # 根据持续的y偏移趋势增加侧向校正 - lateral_correction = 0 - if abs(y_offset_accumulator) > 0.05: # 如果累积偏移超过5厘米 - lateral_correction = -y_offset_accumulator * 0.8 # 反向校正 - lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 限制最大侧向速度 - - if observe and abs(lateral_correction) > 0.05: - warning(f"累积Y偏移校正: {y_offset_accumulator:.3f}米,应用侧向速度 {lateral_correction:.3f}m/s", "偏移") - - # 计算完成比例 - completion_ratio = distance_moved / abs_distance - - # 根据距离完成情况调整速度 - if completion_ratio > slow_down_ratio: - # 计算减速系数 - slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio) - # 确保不会减速太多 - slow_factor = max(0.3, slow_factor) - new_speed = actual_speed * slow_factor - - if observe and abs(new_speed - msg.vel_des[0]) > 0.05: - info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动") - - actual_speed = new_speed - - # 应用修正 - 同时应用角速度和侧向速度修正 - msg.vel_des = [actual_speed, lateral_correction, angular_correction] - msg.life_count += 1 - ctrl.Send_cmd(msg) - - if observe and current_time % 1.0 < position_check_interval: - debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离") - debug(f"Y偏移: {y_offset:.3f}米, 累积偏移: {y_offset_accumulator:.3f}米", "偏移") - debug(f"朝向偏差: {math.degrees(yaw_error):.1f}度, 角速度修正: {angular_correction:.3f}rad/s", "角度") - debug(f"PID: P={p_control:.3f}, I={i_control:.3f}, D={d_control:.3f}", "控制") - - # 更新检查时间 - last_check_time = current_time - - # 定期进行视觉检查和修正 # TODO 这个函数如果用于 上下坡的话,感觉或许就不需要了,如果能保证 task 2-5 的效果。 - if hasattr(ctrl, 'image_processor') and current_time - last_vision_check >= vision_check_interval: - try: - # 获取当前相机帧 - frame = ctrl.image_processor.get_current_image() - if frame is not None: - # 检测轨道线 - 使用特定的石板路模式 - center_info, _, _ = detect_dual_track_lines( - frame, observe=False, save_log=False) - - # 如果成功检测到轨道线,使用它进行修正 - if center_info is not None: - # 获取当前偏差 - current_deviation = center_info["deviation"] - last_valid_deviation = current_deviation - - # 添加到队列进行滤波 - deviation_queue.append(current_deviation) - if len(deviation_queue) > filter_size: - deviation_queue.pop(0) - - # 计算滤波后的偏差值 (去除最大和最小值后的平均) - if len(deviation_queue) >= 3: - filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue - filtered_deviation = sum(filtered_deviations) / len(filtered_deviations) - else: - filtered_deviation = current_deviation - - # 计算视觉侧向修正速度 - vision_lateral_correction = -filtered_deviation * vision_correction_gain - # 限制最大侧向速度修正 - vision_lateral_correction = max(min(vision_lateral_correction, 0.2), -0.2) - - # 与当前的侧向校正进行融合 (加权平均) - if msg.vel_des[1] != 0: - # 如果已经有侧向校正,与视觉校正进行融合 - fused_lateral = msg.vel_des[1] * 0.3 + vision_lateral_correction * 0.7 - else: - # 如果没有侧向校正,直接使用视觉校正 - fused_lateral = vision_lateral_correction - - if observe and abs(vision_lateral_correction) > 0.05: - warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {vision_lateral_correction:.3f}m/s", "视觉") - - # 应用视觉修正,保留当前前进速度和角速度 - msg.vel_des = [msg.vel_des[0], fused_lateral, msg.vel_des[2]] - msg.life_count += 1 - ctrl.Send_cmd(msg) - - if observe and current_time % 1.0 < vision_check_interval: - debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉") - debug(f"融合侧向速度: {fused_lateral:.3f}m/s", "视觉") - except Exception as e: - if observe: - error(f"视觉检测异常: {e}", "错误") - - # 更新视觉检查时间 - last_vision_check = current_time - - # 短暂延时 - time.sleep(0.01) - # 平滑停止 - slowdown_steps = 5 - for i in range(slowdown_steps, 0, -1): - slowdown_factor = i / slowdown_steps - msg.vel_des = [actual_speed * slowdown_factor, 0, 0] - msg.life_count += 1 - ctrl.Send_cmd(msg) - time.sleep(0.1) - - # 最后完全停止 if hasattr(ctrl.base_msg, 'stop_smooth'): ctrl.base_msg.stop_smooth() else: ctrl.base_msg.stop() - - # 获取最终位置和实际移动距离 - final_position = ctrl.odo_msg.xyz - dx = final_position[0] - start_position[0] - dy = final_position[1] - start_position[1] - actual_distance = math.sqrt(dx*dx + dy*dy) - - # 计算最终y方向偏移 - final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0] - - if observe: - success(f"石板路增强直线移动完成,实际移动距离: {actual_distance:.3f}米", "完成") - info(f"最终Y方向偏移: {final_y_offset:.3f}米", "偏移") - - # 判断是否成功完成 - distance_error = abs(actual_distance - abs_distance) - y_offset_error = abs(final_y_offset) - - go_success = distance_error < 0.1 and y_offset_error < 0.1 # 距离误差和y偏移都小于10厘米视为成功 - - if observe: - if go_success: - success(f"移动成功", "成功") - else: - warning(f"移动不够精确,距离误差: {distance_error:.3f}米, Y偏移: {y_offset_error:.3f}米", "警告") - - return go_success + + if not success_flag and total_distance >= max_distance: + warning(f"已达到最大移动距离 {max_distance} 米,但未找到红色区域比例高于 {red_ratio_threshold:.2%} 的位置", "超时") + + return success_flag