Refactor main.py and enhance go_straight_with_qrcode function in go_straight.py. Commented out unused code in main.py for clarity and added QR code scanning functionality to go_straight_with_qrcode, improving robot navigation and task execution. Updated run_task_5 to utilize the new function for better integration of QR code scanning during movement.
This commit is contained in:
		
							parent
							
								
									477511f291
								
							
						
					
					
						commit
						53b04a6a74
					
				@ -202,6 +202,222 @@ def go_straight_until_bar(ctrl, msg, distance, speed=0.5, observe=False):
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
def go_straight_with_qrcode(ctrl, msg, distance, speed=0.5, observe=False):
 | 
			
		||||
    """
 | 
			
		||||
    控制机器人沿直线行走指定距离,同时扫描二维码
 | 
			
		||||
    
 | 
			
		||||
    参数:
 | 
			
		||||
    ctrl: Robot_Ctrl 对象,包含里程计信息
 | 
			
		||||
    msg: robot_control_cmd_lcmt 对象,用于发送命令
 | 
			
		||||
    distance: 要行走的距离(米),正值为前进,负值为后退
 | 
			
		||||
    speed: 行走速度(米/秒),范围0.1~1.0,默认为0.5
 | 
			
		||||
    observe: 是否输出中间状态信息,默认为False
 | 
			
		||||
    
 | 
			
		||||
    返回:
 | 
			
		||||
    tuple: (bool, str) - (是否成功完成行走, 扫描到的QR码内容)
 | 
			
		||||
    """
 | 
			
		||||
    # 返回此任务的中间状态
 | 
			
		||||
    res = {}
 | 
			
		||||
    qr_result = None
 | 
			
		||||
    
 | 
			
		||||
    # 启动异步QR码扫描
 | 
			
		||||
    if hasattr(ctrl, 'image_processor') and ctrl.image_processor is not None:
 | 
			
		||||
        try:
 | 
			
		||||
            ctrl.image_processor.start_async_scan(interval=0.2)
 | 
			
		||||
            if observe:
 | 
			
		||||
                info("已启动异步QR码扫描", "扫描")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            if observe:
 | 
			
		||||
                error(f"启动QR码扫描失败: {e}", "失败")
 | 
			
		||||
    else:
 | 
			
		||||
        if observe:
 | 
			
		||||
            warning("无法启用QR码扫描,image_processor不存在", "警告")
 | 
			
		||||
    
 | 
			
		||||
    # 参数验证
 | 
			
		||||
    if abs(distance) < 0.01:
 | 
			
		||||
        info("距离太短,无需移动", "信息")
 | 
			
		||||
        # 停止异步扫描
 | 
			
		||||
        if hasattr(ctrl, 'image_processor') and ctrl.image_processor is not None:
 | 
			
		||||
            ctrl.image_processor.stop_async_scan()
 | 
			
		||||
        return True, None
 | 
			
		||||
        
 | 
			
		||||
    # 限制速度范围
 | 
			
		||||
    speed = min(max(abs(speed), 0.1), 1.0)
 | 
			
		||||
    
 | 
			
		||||
    # 确定前进或后退方向
 | 
			
		||||
    forward = distance > 0
 | 
			
		||||
    move_speed = speed if forward else -speed
 | 
			
		||||
    abs_distance = abs(distance)
 | 
			
		||||
    
 | 
			
		||||
    # 获取起始位置
 | 
			
		||||
    start_position = list(ctrl.odo_msg.xyz)
 | 
			
		||||
    start_yaw = ctrl.odo_msg.rpy[2]  # 记录起始朝向,用于保持直线
 | 
			
		||||
    
 | 
			
		||||
    if observe:
 | 
			
		||||
        debug(f"起始位置: {start_position}", "位置")
 | 
			
		||||
        info(f"开始{'前进' if forward else '后退'} {abs_distance:.3f}米,速度: {abs(move_speed):.2f}米/秒", "移动")
 | 
			
		||||
        # 在起点放置标记
 | 
			
		||||
        if hasattr(ctrl, 'place_marker'):
 | 
			
		||||
            ctrl.place_marker(start_position[0], start_position[1], 
 | 
			
		||||
                            start_position[2] if len(start_position) > 2 else 0.0, 
 | 
			
		||||
                            'green', observe=True)
 | 
			
		||||
    
 | 
			
		||||
    # 设置移动命令
 | 
			
		||||
    msg.mode = 11  # Locomotion模式
 | 
			
		||||
    msg.gait_id = 26  # 自变频步态
 | 
			
		||||
    
 | 
			
		||||
    # 根据需要移动的距离动态调整移动速度
 | 
			
		||||
    if abs_distance > 1.0:
 | 
			
		||||
        actual_speed = move_speed  # 距离较远时用设定速度
 | 
			
		||||
    elif abs_distance > 0.5:
 | 
			
		||||
        actual_speed = move_speed * 0.8  # 中等距离略微降速
 | 
			
		||||
    elif abs_distance > 0.2:
 | 
			
		||||
        actual_speed = move_speed * 0.6  # 较近距离降低速度
 | 
			
		||||
    else:
 | 
			
		||||
        actual_speed = move_speed * 0.4  # 非常接近时用更慢速度
 | 
			
		||||
    
 | 
			
		||||
    # 设置移动速度和方向
 | 
			
		||||
    msg.vel_des = [actual_speed, 0, 0]  # [前进速度, 侧向速度, 角速度]
 | 
			
		||||
    msg.duration = 0  # wait next cmd
 | 
			
		||||
    msg.step_height = [0.06, 0.06]  # 抬腿高度
 | 
			
		||||
    msg.life_count += 1
 | 
			
		||||
    
 | 
			
		||||
    # 发送命令
 | 
			
		||||
    ctrl.Send_cmd(msg)
 | 
			
		||||
    
 | 
			
		||||
    # 估算移动时间,但实际上会通过里程计控制
 | 
			
		||||
    estimated_time = abs_distance / abs(actual_speed)
 | 
			
		||||
    timeout = estimated_time + 3  # 增加超时时间为预计移动时间加3秒
 | 
			
		||||
    
 | 
			
		||||
    # 使用里程计进行实时监控移动距离
 | 
			
		||||
    distance_moved = 0
 | 
			
		||||
    start_time = time.time()
 | 
			
		||||
    last_position = start_position
 | 
			
		||||
    last_qr_check_time = start_time
 | 
			
		||||
    qr_check_interval = 0.3  # QR码检查间隔时间(秒)
 | 
			
		||||
    
 | 
			
		||||
    # 动态调整参数
 | 
			
		||||
    angle_correction_threshold = 0.05  # 角度偏差超过多少弧度开始修正
 | 
			
		||||
    slow_down_ratio = 0.85  # 当移动到目标距离的85%时开始减速
 | 
			
		||||
    completion_threshold = 0.95  # 当移动到目标距离的95%时停止
 | 
			
		||||
    position_check_interval = 0.1  # 位置检查间隔(秒)
 | 
			
		||||
    last_check_time = start_time
 | 
			
		||||
    
 | 
			
		||||
    # 监控移动距离
 | 
			
		||||
    while distance_moved < abs_distance * completion_threshold and time.time() - start_time < timeout:
 | 
			
		||||
        current_time = time.time()
 | 
			
		||||
        
 | 
			
		||||
        # 按固定间隔检查位置,减少计算负担
 | 
			
		||||
        if current_time - last_check_time >= position_check_interval:
 | 
			
		||||
            # 获取当前位置和朝向
 | 
			
		||||
            current_position = ctrl.odo_msg.xyz
 | 
			
		||||
            current_yaw = ctrl.odo_msg.rpy[2]
 | 
			
		||||
            
 | 
			
		||||
            # 计算已移动距离
 | 
			
		||||
            dx = current_position[0] - start_position[0]
 | 
			
		||||
            dy = current_position[1] - start_position[1]
 | 
			
		||||
            distance_moved = math.sqrt(dx*dx + dy*dy)
 | 
			
		||||
            
 | 
			
		||||
            # 根据前进或后退确定期望方向
 | 
			
		||||
            expected_direction = start_yaw if forward else (start_yaw + math.pi) % (2 * math.pi)
 | 
			
		||||
            
 | 
			
		||||
            # 使用IMU朝向数据计算角度偏差
 | 
			
		||||
            yaw_error = current_yaw - expected_direction
 | 
			
		||||
            # 角度归一化
 | 
			
		||||
            while yaw_error > math.pi:
 | 
			
		||||
                yaw_error -= 2 * math.pi
 | 
			
		||||
            while yaw_error < -math.pi:
 | 
			
		||||
                yaw_error += 2 * math.pi
 | 
			
		||||
                
 | 
			
		||||
            # 计算完成比例
 | 
			
		||||
            completion_ratio = distance_moved / abs_distance
 | 
			
		||||
            
 | 
			
		||||
            # 根据距离完成情况调整速度
 | 
			
		||||
            if completion_ratio > slow_down_ratio:
 | 
			
		||||
                # 计算减速系数
 | 
			
		||||
                slow_factor = 1.0 - (completion_ratio - slow_down_ratio) / (1.0 - slow_down_ratio)
 | 
			
		||||
                # 确保不会减速太多
 | 
			
		||||
                slow_factor = max(0.2, slow_factor)
 | 
			
		||||
                new_speed = actual_speed * slow_factor
 | 
			
		||||
                
 | 
			
		||||
                if observe and abs(new_speed - msg.vel_des[0]) > 0.05:
 | 
			
		||||
                    info(f"减速: {msg.vel_des[0]:.2f} -> {new_speed:.2f} 米/秒 (完成: {completion_ratio*100:.1f}%)", "移动")
 | 
			
		||||
                
 | 
			
		||||
                msg.vel_des[0] = new_speed
 | 
			
		||||
                msg.life_count += 1
 | 
			
		||||
                ctrl.Send_cmd(msg)
 | 
			
		||||
            
 | 
			
		||||
            if observe and current_time - start_time > 1 and (current_time % 0.5 < position_check_interval):
 | 
			
		||||
                debug(f"已移动: {distance_moved:.3f}米, 目标: {abs_distance:.3f}米 (完成: {completion_ratio*100:.1f}%)", "距离")
 | 
			
		||||
                debug(f"当前速度: [{msg.vel_des[0]:.2f}, {msg.vel_des[1]:.2f}, {msg.vel_des[2]:.2f}]", "移动")
 | 
			
		||||
            
 | 
			
		||||
            # 更新最后检查时间和位置
 | 
			
		||||
            last_check_time = current_time
 | 
			
		||||
            last_position = current_position
 | 
			
		||||
        
 | 
			
		||||
        # 检查QR码扫描结果
 | 
			
		||||
        if current_time - last_qr_check_time >= qr_check_interval:
 | 
			
		||||
            if hasattr(ctrl, 'image_processor') and ctrl.image_processor is not None:
 | 
			
		||||
                qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
 | 
			
		||||
                if qr_data and scan_time > start_time:
 | 
			
		||||
                    qr_result = qr_data
 | 
			
		||||
                    if observe:
 | 
			
		||||
                        success(f"在移动过程中扫描到QR码: {qr_data}", "扫描")
 | 
			
		||||
            last_qr_check_time = current_time
 | 
			
		||||
            
 | 
			
		||||
        time.sleep(0.01)  # 小间隔检查位置
 | 
			
		||||
    
 | 
			
		||||
    # 平滑停止
 | 
			
		||||
    if hasattr(ctrl.base_msg, 'stop_smooth'):
 | 
			
		||||
        ctrl.base_msg.stop_smooth()
 | 
			
		||||
    else:
 | 
			
		||||
        ctrl.base_msg.stop()
 | 
			
		||||
    
 | 
			
		||||
    # 获取最终位置和实际移动距离
 | 
			
		||||
    final_position = ctrl.odo_msg.xyz
 | 
			
		||||
    dx = final_position[0] - start_position[0]
 | 
			
		||||
    dy = final_position[1] - start_position[1]
 | 
			
		||||
    actual_distance = math.sqrt(dx*dx + dy*dy)
 | 
			
		||||
    
 | 
			
		||||
    # 停止异步扫描
 | 
			
		||||
    if hasattr(ctrl, 'image_processor') and ctrl.image_processor is not None:
 | 
			
		||||
        ctrl.image_processor.stop_async_scan()
 | 
			
		||||
        # 移动完成后再检查一次QR码扫描结果
 | 
			
		||||
        last_qr_data, last_scan_time = ctrl.image_processor.get_last_qr_result()
 | 
			
		||||
        if last_qr_data and (qr_result is None or last_scan_time > last_qr_check_time):
 | 
			
		||||
            qr_result = last_qr_data
 | 
			
		||||
            if observe:
 | 
			
		||||
                success(f"移动完成后最终扫描到QR码: {last_qr_data}", "扫描")
 | 
			
		||||
    
 | 
			
		||||
    if observe:
 | 
			
		||||
        success(f"移动完成,从里程计计算的移动距离: {actual_distance:.3f}米", "完成")
 | 
			
		||||
        # 在终点放置标记
 | 
			
		||||
        if hasattr(ctrl, 'place_marker'):
 | 
			
		||||
            ctrl.place_marker(final_position[0], final_position[1], 
 | 
			
		||||
                           final_position[2] if len(final_position) > 2 else 0.0, 
 | 
			
		||||
                           'red', observe=True)
 | 
			
		||||
    
 | 
			
		||||
    # 判断是否成功完成
 | 
			
		||||
    distance_error = abs(actual_distance - abs_distance)
 | 
			
		||||
    go_success = distance_error < 0.1  # 如果误差小于10厘米,则认为成功
 | 
			
		||||
    
 | 
			
		||||
    if observe:
 | 
			
		||||
        info(f"目标距离: {abs_distance:.3f}米, 实际距离: {actual_distance:.3f}米, 误差: {distance_error:.3f}米", "距离")
 | 
			
		||||
        if go_success:
 | 
			
		||||
            success(f"移动成功", "成功")
 | 
			
		||||
        else:
 | 
			
		||||
            warning(f"移动失败,误差过大: {distance_error:.3f}米", "失败")
 | 
			
		||||
        
 | 
			
		||||
        if qr_result:
 | 
			
		||||
            success(f"成功扫描到QR码: {qr_result}", "扫描结果")
 | 
			
		||||
        else:
 | 
			
		||||
            warning("未扫描到任何QR码", "扫描结果")
 | 
			
		||||
    
 | 
			
		||||
    # 将QR码结果添加到res字典中
 | 
			
		||||
    res['qr_result'] = qr_result
 | 
			
		||||
    
 | 
			
		||||
    return go_success, res
 | 
			
		||||
 | 
			
		||||
# 用法示例
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    # 这里是示例代码,实际使用时需要提供合适的ctrl和msg对象
 | 
			
		||||
 | 
			
		||||
@ -391,7 +391,7 @@ def turn_degree_v2(ctrl, msg, degree=90, absolute=False, precision=True):
 | 
			
		||||
            # 每0.5秒打印一次进度
 | 
			
		||||
            elapsed = time.time() - start_time
 | 
			
		||||
            if int(elapsed * 2) % 2 == 0:
 | 
			
		||||
                info(f"旋转进度: {elapsed:.1f}s/{wait_time:.1f}s, 剩余角度: {math.degrees(current_error):.2f}°", "进度")
 | 
			
		||||
                debug(f"旋转进度: {elapsed:.1f}s/{wait_time:.1f}s, 剩余角度: {math.degrees(current_error):.2f}°", "进度")
 | 
			
		||||
        
 | 
			
		||||
        # 切换到站立姿态
 | 
			
		||||
        debug("旋转停止,切换到站立姿态", "姿态")
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										26
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								main.py
									
									
									
									
									
								
							@ -51,32 +51,30 @@ def main():
 | 
			
		||||
        # time.sleep(100) # TEST,
 | 
			
		||||
        # run_task_1(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
 | 
			
		||||
        # arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
 | 
			
		||||
 | 
			
		||||
        print(f"arrow_direction: {arrow_direction}")
 | 
			
		||||
        run_task_2_5(Ctrl, msg, direction=arrow_direction)
 | 
			
		||||
        # print(f"arrow_direction: {arrow_direction}")
 | 
			
		||||
        # run_task_2_5(Ctrl, msg, direction=arrow_direction)
 | 
			
		||||
 | 
			
		||||
        # 执行任务3
 | 
			
		||||
        run_task_3(Ctrl, msg)
 | 
			
		||||
        # arrow_direction = 'right' # TEST
 | 
			
		||||
 | 
			
		||||
        # run_task_test(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        # time.sleep(100)
 | 
			
		||||
 | 
			
		||||
        if arrow_direction == 'left':
 | 
			
		||||
            run_task_4(Ctrl, msg)
 | 
			
		||||
        else:
 | 
			
		||||
            # run_task_3(Ctrl, msg)
 | 
			
		||||
            pass
 | 
			
		||||
        # if arrow_direction == 'left':
 | 
			
		||||
        #     run_task_4(Ctrl, msg)
 | 
			
		||||
        # else:
 | 
			
		||||
        #     run_task_3(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        run_task_5(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
        if arrow_direction == 'left':
 | 
			
		||||
            # run_task_3_back(Ctrl, msg)
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            run_task_4_back(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        run_task_5(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
        run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
 | 
			
		||||
 | 
			
		||||
        run_task_2_back(Ctrl, msg)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4981
									
								
								task_4/Gait_Params_stoop_full.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4981
									
								
								task_4/Gait_Params_stoop_full.toml
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@ -9,7 +9,7 @@ import math
 | 
			
		||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
			
		||||
 | 
			
		||||
from base_move.turn_degree import turn_degree
 | 
			
		||||
from base_move.go_straight import go_straight
 | 
			
		||||
from base_move.go_straight import go_straight, go_straight_with_qrcode
 | 
			
		||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
 | 
			
		||||
from utils.gray_sky_analyzer import analyze_gray_sky_ratio
 | 
			
		||||
from base_move.move_base_hori_line import detect_horizontal_track_edge, detect_horizontal_track_edge_v2, detect_horizontal_track_edge_v3, calculate_distance_to_line
 | 
			
		||||
@ -201,39 +201,27 @@ def go_straight_to_horizontal_line_with_qr(ctrl, msg, target_distance=0.5, speed
 | 
			
		||||
    return res['success'], qr_result, res
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def run_task_5(ctrl, msg):
 | 
			
		||||
def run_task_5(ctrl, msg, observe=False):
 | 
			
		||||
    """
 | 
			
		||||
    走向卸货
 | 
			
		||||
    """
 | 
			
		||||
    section('任务5-1:直线移动并扫描二维码', "移动")
 | 
			
		||||
    
 | 
			
		||||
    # 目标与横线的距离为0.5米
 | 
			
		||||
    target_distance = 0.5
 | 
			
		||||
    # 前进速度为0.5米/秒
 | 
			
		||||
    speed = 0.5
 | 
			
		||||
    # 最大移动距离为8米
 | 
			
		||||
    max_distance = 8
 | 
			
		||||
    # 启用观察模式
 | 
			
		||||
    observe = True
 | 
			
		||||
    max_distance = 2
 | 
			
		||||
    
 | 
			
		||||
    # 开始移动并扫描二维码
 | 
			
		||||
    success, qr_result, move_info = go_straight_to_horizontal_line_with_qr(
 | 
			
		||||
        ctrl, msg, 
 | 
			
		||||
        target_distance=target_distance,
 | 
			
		||||
        speed=speed,
 | 
			
		||||
        max_distance=max_distance,
 | 
			
		||||
        observe=observe
 | 
			
		||||
    )
 | 
			
		||||
    go_success, res = go_straight_with_qrcode(ctrl, msg, distance=max_distance, speed=1, observe=observe)
 | 
			
		||||
    
 | 
			
		||||
    # 输出结果
 | 
			
		||||
    if success:
 | 
			
		||||
    if go_success:
 | 
			
		||||
        success("成功到达横线前指定距离", "完成")
 | 
			
		||||
        if qr_result:
 | 
			
		||||
            info(f"扫描到二维码: {qr_result}", "二维码")
 | 
			
		||||
        if res['qr_result']:
 | 
			
		||||
            info(f"扫描到二维码: {res['qr_result']}", "二维码")
 | 
			
		||||
        else:
 | 
			
		||||
            warning("未扫描到二维码", "二维码")
 | 
			
		||||
    else:
 | 
			
		||||
        error("未能成功到达横线前指定距离", "失败")
 | 
			
		||||
    
 | 
			
		||||
    # 返回移动和扫描结果
 | 
			
		||||
    return success, qr_result, move_info
 | 
			
		||||
    return go_success, res['qr_result']
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user