feat: 更新任务3和任务4,整合上下坡和通过栅栏的功能,优化视觉检测和移动控制逻辑

This commit is contained in:
havocrao 2025-08-20 01:33:22 +08:00
parent 1ce8266c62
commit b23526ff9d
3 changed files with 541 additions and 683 deletions

View File

@ -27,309 +27,62 @@ logger = get_logger("任务3")
observe = True
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
YELLOW_RATIO_THRESHOLD = 0.15 # TODO 黄色区域比例阈值
def pass_up_down(ctrl, msg):
usergait_msg = file_send_lcmt()
# lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
try:
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k =0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
f = open("./task_3/Gait_Params_up_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
# pre
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
file_obj_gait_params = open("./task_3/Gait_Params_up_full.toml",'r')
usergait_msg.data = file_obj_gait_def.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 8 # 连续15次检测z轴不再增加则认为已经停止
z_speed_threshold = 0.01 # z轴速度阈值小于这个值认为已经停止爬升
climb_speed_threshold = 0.05 # 检测到开始爬坡的速度阈值
max_iterations = 230 # 最大循环次数,作为安全保障
min_iterations = 170 # 最小循环次数,作为安全保障
# 姿态判断参数
pitch_threshold = 0.05 # 俯仰角阈值(弧度)
angular_rate_threshold = 0.03 # 角速度阈值(弧度/秒)
# 阶段控制
climbing_detected = False # 是否检测到正在爬坡
# 高度变化记录
height_window = []
pitch_window = []
window_size = 8
# 记录起始姿态和高度
start_height = ctrl.odo_msg.xyz[2]
info(f"开始监测爬坡过程,初始高度: {start_height:.3f}", "监测")
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
# 获取当前状态数据
vz = ctrl.odo_msg.vxyz[2] # Z轴速度
current_height = ctrl.odo_msg.xyz[2] # 当前高度
current_pitch = ctrl.odo_msg.rpy[1] # 当前俯仰角
pitch_rate = ctrl.odo_msg.omegaBody[1] # 俯仰角速度
vbody_z = ctrl.odo_msg.vBody[2] # 机体坐标系Z速度
# 更新滑动窗口数据
height_window.append(current_height)
pitch_window.append(current_pitch)
if len(height_window) > window_size:
height_window.pop(0)
pitch_window.pop(0)
# 每10次迭代打印一次当前信息
if i % 10 == 0:
info(f"Step:{i} 当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}, 俯仰角={current_pitch:.3f}, 角速度={pitch_rate:.3f}", "监测")
# 检测是否开始爬坡阶段
if not climbing_detected and vz > climb_speed_threshold:
climbing_detected = True
info(f"检测到开始爬坡Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}, 俯仰角: {current_pitch:.3f}", "监测")
# 多条件判断是否完成爬坡
if i > min_iterations and climbing_detected and len(height_window) == window_size:
# 计算高度和俯仰角的稳定性
height_std = np.std(height_window) # 高度标准差
pitch_std = np.std(pitch_window) # 俯仰角标准差
# 多条件综合判断
position_stable = abs(vz) < z_speed_threshold # 垂直速度稳定
attitude_stable = abs(current_pitch) < pitch_threshold # 俯仰角接近水平
angular_rate_stable = abs(pitch_rate) < angular_rate_threshold # 角速度稳定
height_stable = height_std < 0.01 # 高度变化小
pitch_stable = pitch_std < 0.01 # 俯仰角变化小
vbody_stable = abs(vbody_z) < 0.01 # 机体Z方向速度稳定
# 综合判断条件
if (position_stable and attitude_stable and angular_rate_stable) or \
(position_stable and height_stable and pitch_stable) or \
(vbody_stable and attitude_stable and height_stable):
stable_count += 1
if stable_count >= stable_threshold:
info(f"检测到已完成爬坡:\n - Z轴速度: {vz:.3f}\n - 俯仰角: {current_pitch:.3f}\n - 角速度: {pitch_rate:.3f}\n - 当前高度: {current_height:.3f}\n - 上升了: {current_height - start_height:.3f}", "监测")
break
else:
# 重置稳定计数
stable_count = 0
time.sleep(0.2)
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass
section('任务3-2x = 2', "开始")
def run_task_3(ctrl, msg, time_sleep=5000):
section('任务3上下坡', "启动")
info('开始执行任务3...', "启动")
turn_degree_v2(ctrl, msg, 90, absolute=True)
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False, detect_height=0.3)
go_straight(ctrl, msg, distance=0.5, speed=0.5, observe=True)
time.sleep(1)
section('任务3-3down', "完成")
try:
steps = toml.load("./task_3/Gait_Params_down.toml")
full_steps = {'step':[robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
f = open("./task_3/Gait_Params_down_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
section('任务3-1up and down', "开始")
go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# pass_up_down(ctrl, msg)
# pre
file_obj_gait_def = open("./task_3/Gait_Def_up.toml",'r')
file_obj_gait_params = open("./task_3/Gait_Params_down_full.toml",'r')
usergait_msg.data = file_obj_gait_def.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
turn_degree_v2(ctrl, msg, 90, absolute=True)
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False)
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
section('任务3-2yellow stop', "开始")
go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3)
# 参数设置
stable_count = 0 # 用于计数z轴稳定的次数
stable_threshold = 10 # 连续10次检测z轴速度接近零则认为已经到达平地
z_speed_threshold = 0.005 # z轴速度阈值小于这个值认为已经停止下降
descent_speed_threshold = -0.05 # 检测到开始下坡的速度阈值(负值表示下降)
max_iterations = 110 # 最大循环次数,作为安全保障
min_iterations = 80 # 最小循环次数,确保有足够的时间开始动作
start_height = ctrl.odo_msg.xyz[2] # 记录起始高度
# 原地站立3秒
section("原地站立3秒", "站立")
msg.mode = 12
msg.gait_id = 0
msg.duration = 0
msg.step_height = [0.06, 0.06]
msg.vel_des = [0, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
# 姿态判断参数
pitch_threshold = 0.05 # 俯仰角阈值(弧度)
angular_rate_threshold = 0.03 # 角速度阈值(弧度/秒)
info("开始原地站立3秒", "站立")
time.sleep(time_sleep / 1000)
info("完成原地站立", "站立")
# 阶段控制
descending_detected = False # 是否检测到正在下坡
flat_ground_detected = False # 是否检测到已到达平地
# 高度变化记录
height_window = []
pitch_window = []
window_size = 8
def run_task_3_back(ctrl, msg, time_sleep=5000):
section('任务3-1up', "开始")
section('任务3-2yellow stop', "开始")
go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=YELLOW_RATIO_THRESHOLD, speed=0.3)
info(f"开始监测下坡过程,初始高度: {start_height}", "监测")
# 原地站立3秒
section("原地站立3秒", "站立")
msg.mode = 12
msg.gait_id = 0
msg.duration = 0
msg.step_height = [0.06, 0.06]
msg.vel_des = [0, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
for i in range(max_iterations):
# 发送控制命令维持心跳
ctrl.Send_cmd(msg)
info("开始原地站立3秒", "站立")
time.sleep(time_sleep / 1000)
info("完成原地站立", "站立")
# 获取当前状态数据
vz = ctrl.odo_msg.vxyz[2] # Z轴速度
current_height = ctrl.odo_msg.xyz[2] # 当前高度
current_pitch = ctrl.odo_msg.rpy[1] # 当前俯仰角
pitch_rate = ctrl.odo_msg.omegaBody[1] # 俯仰角速度
vbody_z = ctrl.odo_msg.vBody[2] # 机体坐标系Z速度
section('任务3-3up and down', "开始")
go_straight_with_enhanced_calibration(ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
# 更新滑动窗口数据
height_window.append(current_height)
pitch_window.append(current_pitch)
if len(height_window) > window_size:
height_window.pop(0)
pitch_window.pop(0)
# 每10次迭代打印一次当前信息
if observe and i % 10 == 0:
info(f"Step:{i} 当前Z轴速度={vz:.3f}, 当前高度={current_height:.3f}, 俯仰角={current_pitch:.3f}, 角速度={pitch_rate:.3f}", "监测")
# 检测是否开始下坡阶段
if not descending_detected and vz < descent_speed_threshold:
descending_detected = True
info(f"检测到开始下坡Z轴速度: {vz:.3f}, 当前高度: {current_height:.3f}, 俯仰角: {current_pitch:.3f}", "监测")
# 多条件判断是否到达平地
if i > min_iterations and descending_detected and len(height_window) == window_size:
# 计算高度和俯仰角的稳定性
height_std = np.std(height_window) # 高度标准差
pitch_std = np.std(pitch_window) # 俯仰角标准差
# 多条件综合判断
position_stable = abs(vz) < z_speed_threshold # 垂直速度稳定
attitude_stable = abs(current_pitch) < pitch_threshold # 俯仰角接近水平
angular_rate_stable = abs(pitch_rate) < angular_rate_threshold # 角速度稳定
height_stable = height_std < 0.01 # 高度变化小
pitch_stable = pitch_std < 0.01 # 俯仰角变化小
vbody_stable = abs(vbody_z) < 0.01 # 机体Z方向速度稳定
# 综合判断条件
if (position_stable and attitude_stable and angular_rate_stable) or \
(position_stable and height_stable and pitch_stable) or \
(vbody_stable and attitude_stable and height_stable):
stable_count += 1
if stable_count >= stable_threshold:
info(f"检测到已到达平地:\n - Z轴速度: {vz:.3f}\n - 俯仰角: {current_pitch:.3f}\n - 角速度: {pitch_rate:.3f}\n - 高度: {current_height:.3f}\n - 下降了: {start_height - current_height:.3f}", "监测")
flat_ground_detected = True
break
else:
# 重置稳定计数
stable_count = 0
time.sleep(0.2)
if not flat_ground_detected:
info(f"达到最大循环次数,未能明确检测到到达平地。当前高度: {ctrl.odo_msg.xyz[2]:.3f}", "警告")
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass
# TODO 调优达到一个合适的位置,准备 task 2-5 的返回
# 或许用到 move-to-line 函数
def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_time=30, observe=True):
@ -387,7 +140,7 @@ def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_
# 定期发送移动命令保持移动状态
if current_time - last_check_time >= check_interval:
# 获取当前图像并保存到临时文件
current_image = ctrl.image_processor.get_current_image()
current_image = ctrl.image_processor.get_current_image('ai') # INFO 默认采用 ai 相机
# 创建临时文件保存图像
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
@ -490,35 +243,426 @@ def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_
return False
def run_task_3(ctrl, msg, time_sleep=5000):
section('任务3上下坡', "启动")
info('开始执行任务3...', "启动")
def go_straight_with_enhanced_calibration(ctrl, msg, distance, speed=0.5, observe=False,
mode=11, gait_id=3, step_height=[0.21, 0.21]):
"""
控制机器人在石板路上沿直线行走使用视觉校准和姿态传感器融合来保持直线
turn_degree_v2(ctrl, msg, 90, absolute=True)
参数:
ctrl: Robot_Ctrl 对象
msg: 控制消息对象
distance: 要行走的距离()
speed: 行走速度(/)
observe: 是否输出调试信息
mode: 运动模式
gait_id: 步态ID
step_height: 摆动腿高度
section('任务3-1up', "开始")
pass_up_down(ctrl, msg)
返回:
bool: 是否成功完成
"""
section("开始石板路增强直线移动", "石板路移动")
turn_degree_v2(ctrl, msg, 90, absolute=True)
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False)
# 参数验证
if abs(distance) < 0.01:
info("距离太短,无需移动", "信息")
return True
section('任务3-2yellow stop', "开始")
go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3)
# 检查相机是否可用
if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'):
warning("机器人控制器没有提供图像处理器,将使用姿态传感器辅助校准", "警告")
# 原地站立3秒
section("原地站立3秒", "站立")
msg.mode = 12
msg.gait_id = 0
msg.duration = 0
msg.step_height = [0.06, 0.06]
msg.vel_des = [0, 0, 0]
# 限制速度范围
speed = min(max(abs(speed), 0.1), 1.0)
# 确定前进或后退方向
forward = distance > 0
move_speed = speed if forward else -speed
abs_distance = abs(distance)
# 获取起始位置和姿态
start_position = list(ctrl.odo_msg.xyz)
start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向
if observe:
debug(f"起始位置: {start_position}", "位置")
info(f"开始石板路增强直线移动,距离: {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动")
# 设置移动命令
msg.mode = mode
msg.gait_id = gait_id
msg.step_height = step_height
msg.duration = 0 # wait next cmd
# 根据需要移动的距离动态调整移动速度
if abs_distance > 1.0:
actual_speed = move_speed # 距离较远时用设定速度
else:
actual_speed = move_speed * 0.8 # 较近距离略微降速
# 设置移动速度和方向
msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度]
msg.life_count += 1
# 发送命令
ctrl.Send_cmd(msg)
info("开始原地站立3秒", "站立")
time.sleep(time_sleep / 1000)
info("完成原地站立", "站立")
# 估算移动时间,但实际上会通过里程计控制
estimated_time = abs_distance / abs(actual_speed)
timeout = estimated_time + 5 # 增加超时时间
# 使用里程计进行实时监控移动距离
distance_moved = 0
start_time = time.time()
last_check_time = start_time
position_check_interval = 0.1 # 位置检查间隔(秒)
# 计算移动方向单位向量(世界坐标系下)
direction_vector = [math.cos(start_yaw), math.sin(start_yaw)]
if not forward:
direction_vector = [-direction_vector[0], -direction_vector[1]]
# 视觉跟踪相关变量
vision_check_interval = 0.2 # 视觉检查间隔(秒)
last_vision_check = start_time
vision_correction_gain = 0.006 # 视觉修正增益系数
# 用于滤波的队列
deviation_queue = []
filter_size = 5
last_valid_deviation = 0
# PID控制参数 - 用于角度修正
kp_angle = 0.6 # 比例系数
ki_angle = 0.02 # 积分系数
kd_angle = 0.1 # 微分系数
# PID控制变量
angle_error_integral = 0
last_angle_error = 0
# 偏移量累计 - 用于检测持续偏移
y_offset_accumulator = 0
# 动态调整参数
slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速
completion_threshold = 0.95 # 当移动到目标距离的95%时停止
# 监控移动过程
while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout:
current_time = time.time()
# 按固定间隔检查位置
if current_time - last_check_time >= position_check_interval:
# 获取当前位置和朝向
current_position = ctrl.odo_msg.xyz
current_yaw = ctrl.odo_msg.rpy[2]
# 计算在移动方向上的位移
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
# 计算在初始方向上的投影距离(实际前进距离)
distance_moved = dx * direction_vector[0] + dy * direction_vector[1]
distance_moved = abs(distance_moved) # 确保距离为正值
# 计算垂直于移动方向的偏移量y方向偏移
y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
# 累积y方向偏移量检测持续偏移趋势
y_offset_accumulator = y_offset_accumulator * 0.8 + y_offset * 0.2
# 根据前进或后退确定期望方向
expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi)
# 使用IMU朝向数据计算角度偏差
yaw_error = current_yaw - expected_direction
# 角度归一化
while yaw_error > math.pi:
yaw_error -= 2 * math.pi
while yaw_error < -math.pi:
yaw_error += 2 * math.pi
# 使用PID控制计算角速度修正
# 比例项
p_control = kp_angle * yaw_error
# 积分项 (带衰减)
angle_error_integral = angle_error_integral * 0.9 + yaw_error
angle_error_integral = max(-1.0, min(1.0, angle_error_integral)) # 限制积分范围
i_control = ki_angle * angle_error_integral
# 微分项
d_control = kd_angle * (yaw_error - last_angle_error)
last_angle_error = yaw_error
# 计算总的角速度控制量
angular_correction = -(p_control + i_control + d_control)
# 限制最大角速度修正
angular_correction = max(min(angular_correction, 0.3), -0.3)
# 根据持续的y偏移趋势增加侧向校正
lateral_correction = 0
if abs(y_offset_accumulator) > 0.05: # 如果累积偏移超过5厘米
lateral_correction = -y_offset_accumulator * 0.8 # 反向校正
lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 限制最大侧向速度
if observe and abs(lateral_correction) > 0.05:
warning(f"累积Y偏移校正: {y_offset_accumulator:.3f}米,应用侧向速度 {lateral_correction:.3f}m/s", "偏移")
# 计算完成比例
completion_ratio = distance_moved / abs_distance
# 根据距离完成情况调整速度
if completion_ratio > slow_down_ratio:
# 计算减速系数
slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio)
# 确保不会减速太多
slow_factor = max(0.3, slow_factor)
new_speed = actual_speed * slow_factor
if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动")
actual_speed = new_speed
# 应用修正 - 同时应用角速度和侧向速度修正
msg.vel_des = [actual_speed, lateral_correction, angular_correction]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < position_check_interval:
debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离")
debug(f"Y偏移: {y_offset:.3f}米, 累积偏移: {y_offset_accumulator:.3f}", "偏移")
debug(f"朝向偏差: {math.degrees(yaw_error):.1f}度, 角速度修正: {angular_correction:.3f}rad/s", "角度")
debug(f"PID: P={p_control:.3f}, I={i_control:.3f}, D={d_control:.3f}", "控制")
# 更新检查时间
last_check_time = current_time
# 定期进行视觉检查和修正 # TODO 这个函数如果用于 上下坡的话,感觉或许就不需要了,如果能保证 task 2-5 的效果。
if hasattr(ctrl, 'image_processor') and current_time - last_vision_check >= vision_check_interval:
try:
# 获取当前相机帧
frame = ctrl.image_processor.get_current_image()
if frame is not None:
# 检测轨道线 - 使用特定的石板路模式
center_info, _, _ = detect_dual_track_lines(
frame, observe=False, save_log=False)
# 如果成功检测到轨道线,使用它进行修正
if center_info is not None:
# 获取当前偏差
current_deviation = center_info["deviation"]
last_valid_deviation = current_deviation
# 添加到队列进行滤波
deviation_queue.append(current_deviation)
if len(deviation_queue) > filter_size:
deviation_queue.pop(0)
# 计算滤波后的偏差值 (去除最大和最小值后的平均)
if len(deviation_queue) >= 3:
filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue
filtered_deviation = sum(filtered_deviations) / len(filtered_deviations)
else:
filtered_deviation = current_deviation
# 计算视觉侧向修正速度
vision_lateral_correction = -filtered_deviation * vision_correction_gain
# 限制最大侧向速度修正
vision_lateral_correction = max(min(vision_lateral_correction, 0.2), -0.2)
# 与当前的侧向校正进行融合 (加权平均)
if msg.vel_des[1] != 0:
# 如果已经有侧向校正,与视觉校正进行融合
fused_lateral = msg.vel_des[1] * 0.3 + vision_lateral_correction * 0.7
else:
# 如果没有侧向校正,直接使用视觉校正
fused_lateral = vision_lateral_correction
if observe and abs(vision_lateral_correction) > 0.05:
warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {vision_lateral_correction:.3f}m/s", "视觉")
# 应用视觉修正,保留当前前进速度和角速度
msg.vel_des = [msg.vel_des[0], fused_lateral, msg.vel_des[2]]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < vision_check_interval:
debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉")
debug(f"融合侧向速度: {fused_lateral:.3f}m/s", "视觉")
except Exception as e:
if observe:
error(f"视觉检测异常: {e}", "错误")
# 更新视觉检查时间
last_vision_check = current_time
# 短暂延时
time.sleep(0.01)
# 平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [actual_speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 最后完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
# 获取最终位置和实际移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
actual_distance = math.sqrt(dx*dx + dy*dy)
# 计算最终y方向偏移
final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
if observe:
success(f"石板路增强直线移动完成,实际移动距离: {actual_distance:.3f}", "完成")
info(f"最终Y方向偏移: {final_y_offset:.3f}", "偏移")
# 判断是否成功完成
distance_error = abs(actual_distance - abs_distance)
y_offset_error = abs(final_y_offset)
go_success = distance_error < 0.1 and y_offset_error < 0.1 # 距离误差和y偏移都小于10厘米视为成功
if observe:
if go_success:
success(f"移动成功", "成功")
else:
warning(f"移动不够精确,距离误差: {distance_error:.3f}米, Y偏移: {y_offset_error:.3f}", "警告")
return go_success
def run_task_3_back(ctrl, msg):
return
# INFO 保持文件相对路径,直接从 task-4中调用
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
def pass_stone(ctrl, msg, distance=5.0, observe=True):
"""
通过里程计设定distance执行上坡gait
:param ctrl: 控制器对象
:param msg: 控制消息
:param distance: 期望移动距离
:param observe: 是否打印过程信息
"""
usergait_msg = file_send_lcmt()
try:
# 1. 生成gait参数文件
steps = toml.load("./task_3/Gait_Params_up.toml")
full_steps = {'step':[robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k += 1
with open("./task_3/Gait_Params_up_full.toml", 'w') as f:
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
# 2. 发送gait定义和参数
with open("./task_3/Gait_Def_up.toml",'r') as file_obj_gait_def, \
open("./task_3/Gait_Params_up_full.toml",'r') as file_obj_gait_params:
usergait_msg.data = file_obj_gait_def.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.5)
usergait_msg.data = file_obj_gait_params.read()
ctrl.lc_s.publish("user_gait_file", usergait_msg.encode())
time.sleep(0.1)
# 3. 记录起始位置
start_xyz = ctrl.odo_msg.xyz[:2] # 只取x, y
if observe:
info(f"pass_up_down: 期望移动距离 {distance:.3f} 米,起始位置: {start_xyz}", "监测")
# 4. 设置gait执行参数
msg.mode = 62
msg.value = 0
msg.contact = 15
msg.gait_id = 110
msg.duration = 1000
msg.life_count += 1
# 5. 控制循环按里程计判断是否到达去除max-iteration直接while判断
arrived = False
step = 0
while True:
ctrl.Send_cmd(msg)
cur_xyz = ctrl.odo_msg.xyz[:2]
dx = cur_xyz[0] - start_xyz[0]
dy = cur_xyz[1] - start_xyz[1]
moved = (dx**2 + dy**2) ** 0.5
if observe and step % 10 == 0:
info(f"Step:{step} 已移动距离={moved:.3f}m, 目标={distance:.3f}m, 当前xy={cur_xyz}", "监测")
if moved >= distance:
arrived = True
if observe:
success(f"pass_up_down: 已到达设定距离 {distance:.3f}m (实际: {moved:.3f}m)", "完成")
break
step += 1
time.sleep(0.1)
# 6. 完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 7. 反馈最终位置
final_xyz = ctrl.odo_msg.xyz[:2]
dx = final_xyz[0] - start_xyz[0]
dy = final_xyz[1] - start_xyz[1]
actual_distance = (dx**2 + dy**2) ** 0.5
if observe:
info(f"pass_up_down: 最终位置 {final_xyz}, 实际移动距离 {actual_distance:.3f}m", "监测")
return arrived
except KeyboardInterrupt:
msg.mode = 7 #PureDamper before KeyboardInterrupt:
msg.gait_id = 0
msg.duration = 0
msg.life_count += 1
ctrl.Send_cmd(msg)
pass

View File

@ -35,7 +35,8 @@ robot_cmd = {
def pass_bar(ctrl, msg):
"""
俯身通过一个栅栏
俯身通过一个栅栏;
会默认前进一定的距离
"""
step_num = 8

View File

@ -17,25 +17,22 @@ from utils.detect_dual_track_lines import detect_dual_track_lines
from base_move.move_base_hori_line import calculate_distance_to_line
from task_4.pass_bar import pass_bar
from base_move.center_on_dual_tracks import center_on_dual_tracks
from task_3.task_3 import pass_stone
# 创建本模块特定的日志记录器
logger = get_logger("任务4")
observe = True
STONE_DISTANCE = 5.0 # TODO 距离参数需要微调
RED_RATIO_THRESHOLD = 0.35 # TODO 红色区域比例阈值需要微调
def run_task_4(ctrl, msg):
section('任务4-1直线移动', "移动")
# 设置机器人运动模式为快步跑
msg.mode = 11 # 运动模式
msg.gait_id = 3 # 步态ID快步跑
msg.vel_des = [0.35, 0, 0] # 期望速度
msg.pos_des = [ 0, 0, 0]
msg.duration = 0 # 零时长表示持续运动,直到接收到新命令
msg.step_height = [0.21, 0.21] # 持续运动时摆动腿的离地高度
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(5) # 持续5秒钟
pass_stone(ctrl, msg, distance=STONE_DISTANCE)
section('任务4-2移动直到灰色天空比例小于阈值', "天空检测")
go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.2)
section('任务4-2移动直到红色区域比例大于阈值', "红色检测")
go_straight_until_red_bar(ctrl, msg, red_ratio_threshold=RED_RATIO_THRESHOLD, speed=0.2)
section('任务4-3通过栏杆', "移动")
pass_bar(ctrl, msg)
@ -53,14 +50,14 @@ def run_task_4_back(ctrl, msg):
go_straight(ctrl, msg, distance=2, speed=1, observe=True)
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False)
# 向右移动0.5秒
section('任务4-回程:向右移动', "移动")
go_lateral(ctrl, msg, distance=-0.1, speed=0.15, observe=True) # DEBUG
# TODO 向右移动0.5秒 (或许不需要了)
# section('任务4-回程:向右移动', "移动")
# go_lateral(ctrl, msg, distance=-0.1, speed=0.15, observe=True) # DEBUG
turn_degree_v2(ctrl, msg, degree=-90, absolute=True)
section('任务4-1移动直到灰色天空比例低于阈值', "天空检测")
go_straight_until_sky_ratio_below(ctrl, msg, sky_ratio_threshold=0.35, speed=0.2)
go_straight_until_red_bar(ctrl, msg, red_ratio_threshold=RED_RATIO_THRESHOLD, speed=0.2)
section('任务4-2通过栏杆', "移动")
turn_degree_v2(ctrl, msg, degree=-90, absolute=True)
@ -69,11 +66,11 @@ def run_task_4_back(ctrl, msg):
turn_degree_v2(ctrl, msg, degree=-90, absolute=True)
section('任务4-3stone', "移动")
go_straight(ctrl, msg, distance=1, speed=2)
turn_degree_v2(ctrl, msg, degree=-92, absolute=True)
turn_degree_v2(ctrl, msg, degree=-90, absolute=True)
# Use enhanced calibration for better Y-axis correction on stone path
go_straight(ctrl, msg, distance=4.5, speed=0.35, mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True)
# go_straight(ctrl, msg, distance=4.5, speed=0.35, mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True)
pass_stone(ctrl, msg, distance=STONE_DISTANCE)
# go_straight_with_enhanced_calibration(ctrl, msg, distance=4.5, speed=0.35,
# mode=11, gait_id=3, step_height=[0.21, 0.21], observe=True)
@ -85,26 +82,47 @@ def run_task_4_back(ctrl, msg):
current_distance = calculate_distance_to_line(edge_info, camera_height, observe=True)
go_straight(ctrl, msg, distance=current_distance, speed=0.20, mode=11, gait_id=3, step_height=[0.21, 0.21])
def go_straight_until_sky_ratio_below(ctrl, msg,
sky_ratio_threshold=0.2,
step_distance=0.5,
max_distance=5,
speed=0.3
):
def go_straight_until_red_bar(ctrl, msg,
red_ratio_threshold=0.2,
step_distance=0.5,
max_distance=5,
speed=0.3
):
"""
控制机器人沿直线行走直到灰色天空比例低于指定阈值
控制机器人沿直线行走直到红色区域比例高于指定阈值
参数:
ctrl: Robot_Ctrl对象
msg: 控制命令消息对象
sky_ratio_threshold: 灰色天空比例阈值当检测到的比例低于此值时停止
red_ratio_threshold: 红色区域比例阈值当检测到的比例高于此值时停止
step_distance: 每次移动的步长()
max_distance: 最大移动距离()防止无限前进
speed: 移动速度(/)
返回:
bool: 是否成功找到天空比例低于阈值的位置
bool: 是否成功找到红色区域比例高于阈值的位置
"""
def analyze_red_area_ratio(image):
"""
分析图像中红色区域的占比
输入: BGR图像
返回: 红色区域占比 (0~1)
"""
# 转换到HSV空间
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 红色有两个区间
lower_red1 = np.array([0, 70, 50])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([160, 70, 50])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
red_pixels = np.count_nonzero(mask)
total_pixels = mask.shape[0] * mask.shape[1]
ratio = red_pixels / total_pixels if total_pixels > 0 else 0
return ratio
total_distance = 0
success_flag = False
@ -115,24 +133,20 @@ def go_straight_until_sky_ratio_below(ctrl, msg,
while total_distance < max_distance:
# 获取当前图像
current_image = ctrl.image_processor.get_current_image()
current_image = ctrl.image_processor.get_current_image('ai')
if current_image is None:
warning("无法获取图像,等待...", "图像")
time.sleep(0.5)
continue
# 保存当前图像用于分析
temp_image_path = "/tmp/current_sky_image.jpg"
cv2.imwrite(temp_image_path, current_image)
# 分析灰色天空比例
# 分析红色区域比例
try:
sky_ratio = analyze_gray_sky_ratio(temp_image_path)
info(f"当前灰色天空比例: {sky_ratio:.2%}", "分析")
red_ratio = analyze_red_area_ratio(current_image)
info(f"当前红色区域比例: {red_ratio:.2%}", "分析")
# 如果天空比例高于阈值,停止移动
if sky_ratio < sky_ratio_threshold:
success(f"检测到灰色天空比例({sky_ratio:.2%})低于阈值({sky_ratio_threshold:.2%}),停止移动", "完成")
# 如果红色区域比例高于阈值,停止移动
if red_ratio > red_ratio_threshold:
success(f"检测到红色区域比例({red_ratio:.2%})高于阈值({red_ratio_threshold:.2%}),停止移动", "完成")
success_flag = True
break
@ -165,307 +179,6 @@ def go_straight_until_sky_ratio_below(ctrl, msg,
ctrl.base_msg.stop()
if not success_flag and total_distance >= max_distance:
warning(f"已达到最大移动距离 {max_distance} 米,但未找到天空比例小于 {sky_ratio_threshold:.2%} 的位置", "超时")
warning(f"已达到最大移动距离 {max_distance} 米,但未找到红色区域比例高于 {red_ratio_threshold:.2%} 的位置", "超时")
return success_flag
def go_straight_with_enhanced_calibration(ctrl, msg, distance, speed=0.5, observe=False,
mode=11, gait_id=3, step_height=[0.21, 0.21]):
"""
控制机器人在石板路上沿直线行走使用视觉校准和姿态传感器融合来保持直线
参数:
ctrl: Robot_Ctrl 对象
msg: 控制消息对象
distance: 要行走的距离()
speed: 行走速度(/)
observe: 是否输出调试信息
mode: 运动模式
gait_id: 步态ID
step_height: 摆动腿高度
返回:
bool: 是否成功完成
"""
section("开始石板路增强直线移动", "石板路移动")
# 参数验证
if abs(distance) < 0.01:
info("距离太短,无需移动", "信息")
return True
# 检查相机是否可用
if not hasattr(ctrl, 'image_processor') or not hasattr(ctrl.image_processor, 'get_current_image'):
warning("机器人控制器没有提供图像处理器,将使用姿态传感器辅助校准", "警告")
# 限制速度范围
speed = min(max(abs(speed), 0.1), 1.0)
# 确定前进或后退方向
forward = distance > 0
move_speed = speed if forward else -speed
abs_distance = abs(distance)
# 获取起始位置和姿态
start_position = list(ctrl.odo_msg.xyz)
start_yaw = ctrl.odo_msg.rpy[2] # 记录起始朝向
if observe:
debug(f"起始位置: {start_position}", "位置")
info(f"开始石板路增强直线移动,距离: {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动")
# 设置移动命令
msg.mode = mode
msg.gait_id = gait_id
msg.step_height = step_height
msg.duration = 0 # wait next cmd
# 根据需要移动的距离动态调整移动速度
if abs_distance > 1.0:
actual_speed = move_speed # 距离较远时用设定速度
else:
actual_speed = move_speed * 0.8 # 较近距离略微降速
# 设置移动速度和方向
msg.vel_des = [actual_speed, 0, 0] # [前进速度, 侧向速度, 角速度]
msg.life_count += 1
# 发送命令
ctrl.Send_cmd(msg)
# 估算移动时间,但实际上会通过里程计控制
estimated_time = abs_distance / abs(actual_speed)
timeout = estimated_time + 5 # 增加超时时间
# 使用里程计进行实时监控移动距离
distance_moved = 0
start_time = time.time()
last_check_time = start_time
position_check_interval = 0.1 # 位置检查间隔(秒)
# 计算移动方向单位向量(世界坐标系下)
direction_vector = [math.cos(start_yaw), math.sin(start_yaw)]
if not forward:
direction_vector = [-direction_vector[0], -direction_vector[1]]
# 视觉跟踪相关变量
vision_check_interval = 0.2 # 视觉检查间隔(秒)
last_vision_check = start_time
vision_correction_gain = 0.006 # 视觉修正增益系数
# 用于滤波的队列
deviation_queue = []
filter_size = 5
last_valid_deviation = 0
# PID控制参数 - 用于角度修正
kp_angle = 0.6 # 比例系数
ki_angle = 0.02 # 积分系数
kd_angle = 0.1 # 微分系数
# PID控制变量
angle_error_integral = 0
last_angle_error = 0
# 偏移量累计 - 用于检测持续偏移
y_offset_accumulator = 0
# 动态调整参数
slow_down_ratio = 0.85 # 当移动到目标距离的85%时开始减速
completion_threshold = 0.95 # 当移动到目标距离的95%时停止
# 监控移动过程
while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout:
current_time = time.time()
# 按固定间隔检查位置
if current_time - last_check_time >= position_check_interval:
# 获取当前位置和朝向
current_position = ctrl.odo_msg.xyz
current_yaw = ctrl.odo_msg.rpy[2]
# 计算在移动方向上的位移
dx = current_position[0] - start_position[0]
dy = current_position[1] - start_position[1]
# 计算在初始方向上的投影距离(实际前进距离)
distance_moved = dx * direction_vector[0] + dy * direction_vector[1]
distance_moved = abs(distance_moved) # 确保距离为正值
# 计算垂直于移动方向的偏移量y方向偏移
y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
# 累积y方向偏移量检测持续偏移趋势
y_offset_accumulator = y_offset_accumulator * 0.8 + y_offset * 0.2
# 根据前进或后退确定期望方向
expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi)
# 使用IMU朝向数据计算角度偏差
yaw_error = current_yaw - expected_direction
# 角度归一化
while yaw_error > math.pi:
yaw_error -= 2 * math.pi
while yaw_error < -math.pi:
yaw_error += 2 * math.pi
# 使用PID控制计算角速度修正
# 比例项
p_control = kp_angle * yaw_error
# 积分项 (带衰减)
angle_error_integral = angle_error_integral * 0.9 + yaw_error
angle_error_integral = max(-1.0, min(1.0, angle_error_integral)) # 限制积分范围
i_control = ki_angle * angle_error_integral
# 微分项
d_control = kd_angle * (yaw_error - last_angle_error)
last_angle_error = yaw_error
# 计算总的角速度控制量
angular_correction = -(p_control + i_control + d_control)
# 限制最大角速度修正
angular_correction = max(min(angular_correction, 0.3), -0.3)
# 根据持续的y偏移趋势增加侧向校正
lateral_correction = 0
if abs(y_offset_accumulator) > 0.05: # 如果累积偏移超过5厘米
lateral_correction = -y_offset_accumulator * 0.8 # 反向校正
lateral_correction = max(min(lateral_correction, 0.15), -0.15) # 限制最大侧向速度
if observe and abs(lateral_correction) > 0.05:
warning(f"累积Y偏移校正: {y_offset_accumulator:.3f}米,应用侧向速度 {lateral_correction:.3f}m/s", "偏移")
# 计算完成比例
completion_ratio = distance_moved / abs_distance
# 根据距离完成情况调整速度
if completion_ratio > slow_down_ratio:
# 计算减速系数
slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio)
# 确保不会减速太多
slow_factor = max(0.3, slow_factor)
new_speed = actual_speed * slow_factor
if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动")
actual_speed = new_speed
# 应用修正 - 同时应用角速度和侧向速度修正
msg.vel_des = [actual_speed, lateral_correction, angular_correction]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < position_check_interval:
debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离")
debug(f"Y偏移: {y_offset:.3f}米, 累积偏移: {y_offset_accumulator:.3f}", "偏移")
debug(f"朝向偏差: {math.degrees(yaw_error):.1f}度, 角速度修正: {angular_correction:.3f}rad/s", "角度")
debug(f"PID: P={p_control:.3f}, I={i_control:.3f}, D={d_control:.3f}", "控制")
# 更新检查时间
last_check_time = current_time
# 定期进行视觉检查和修正 # TODO 这个函数如果用于 上下坡的话,感觉或许就不需要了,如果能保证 task 2-5 的效果。
if hasattr(ctrl, 'image_processor') and current_time - last_vision_check >= vision_check_interval:
try:
# 获取当前相机帧
frame = ctrl.image_processor.get_current_image()
if frame is not None:
# 检测轨道线 - 使用特定的石板路模式
center_info, _, _ = detect_dual_track_lines(
frame, observe=False, save_log=False)
# 如果成功检测到轨道线,使用它进行修正
if center_info is not None:
# 获取当前偏差
current_deviation = center_info["deviation"]
last_valid_deviation = current_deviation
# 添加到队列进行滤波
deviation_queue.append(current_deviation)
if len(deviation_queue) > filter_size:
deviation_queue.pop(0)
# 计算滤波后的偏差值 (去除最大和最小值后的平均)
if len(deviation_queue) >= 3:
filtered_deviations = sorted(deviation_queue)[1:-1] if len(deviation_queue) > 2 else deviation_queue
filtered_deviation = sum(filtered_deviations) / len(filtered_deviations)
else:
filtered_deviation = current_deviation
# 计算视觉侧向修正速度
vision_lateral_correction = -filtered_deviation * vision_correction_gain
# 限制最大侧向速度修正
vision_lateral_correction = max(min(vision_lateral_correction, 0.2), -0.2)
# 与当前的侧向校正进行融合 (加权平均)
if msg.vel_des[1] != 0:
# 如果已经有侧向校正,与视觉校正进行融合
fused_lateral = msg.vel_des[1] * 0.3 + vision_lateral_correction * 0.7
else:
# 如果没有侧向校正,直接使用视觉校正
fused_lateral = vision_lateral_correction
if observe and abs(vision_lateral_correction) > 0.05:
warning(f"视觉修正: 偏差 {filtered_deviation:.1f}像素,应用侧向速度 {vision_lateral_correction:.3f}m/s", "视觉")
# 应用视觉修正,保留当前前进速度和角速度
msg.vel_des = [msg.vel_des[0], fused_lateral, msg.vel_des[2]]
msg.life_count += 1
ctrl.Send_cmd(msg)
if observe and current_time % 1.0 < vision_check_interval:
debug(f"视觉检测: 原始偏差 {current_deviation:.1f}像素, 滤波后 {filtered_deviation:.1f}像素", "视觉")
debug(f"融合侧向速度: {fused_lateral:.3f}m/s", "视觉")
except Exception as e:
if observe:
error(f"视觉检测异常: {e}", "错误")
# 更新视觉检查时间
last_vision_check = current_time
# 短暂延时
time.sleep(0.01)
# 平滑停止
slowdown_steps = 5
for i in range(slowdown_steps, 0, -1):
slowdown_factor = i / slowdown_steps
msg.vel_des = [actual_speed * slowdown_factor, 0, 0]
msg.life_count += 1
ctrl.Send_cmd(msg)
time.sleep(0.1)
# 最后完全停止
if hasattr(ctrl.base_msg, 'stop_smooth'):
ctrl.base_msg.stop_smooth()
else:
ctrl.base_msg.stop()
# 获取最终位置和实际移动距离
final_position = ctrl.odo_msg.xyz
dx = final_position[0] - start_position[0]
dy = final_position[1] - start_position[1]
actual_distance = math.sqrt(dx*dx + dy*dy)
# 计算最终y方向偏移
final_y_offset = -dx * direction_vector[1] + dy * direction_vector[0]
if observe:
success(f"石板路增强直线移动完成,实际移动距离: {actual_distance:.3f}", "完成")
info(f"最终Y方向偏移: {final_y_offset:.3f}", "偏移")
# 判断是否成功完成
distance_error = abs(actual_distance - abs_distance)
y_offset_error = abs(final_y_offset)
go_success = distance_error < 0.1 and y_offset_error < 0.1 # 距离误差和y偏移都小于10厘米视为成功
if observe:
if go_success:
success(f"移动成功", "成功")
else:
warning(f"移动不够精确,距离误差: {distance_error:.3f}米, Y偏移: {y_offset_error:.3f}", "警告")
return go_success