Merge branch 'main' of ssh://120.27.199.238:222/Havoc420mac/mi-task into main

This commit is contained in:
havoc420ubuntu 2025-05-15 15:22:27 +00:00
commit a8574fc1db
3 changed files with 188 additions and 10 deletions

View File

@ -202,7 +202,7 @@ def calculate_distance_to_line(edge_info, camera_height, camera_tilt_angle_deg=0
return final_distance
def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False, scan_qrcode=False, qr_check_interval=0.5):
"""
控制机器人校准并移动到横向线前的指定距离
@ -211,25 +211,56 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
msg: robot_control_cmd_lcmt 对象用于发送命令
target_distance: 目标位置与横向线的距离()默认为0.5
observe: 是否输出中间状态信息和可视化结果默认为False
scan_qrcode: 是否在移动过程中扫描QR码默认为False
qr_check_interval: QR码检查间隔时间()默认为0.5
返回:
bool: 是否成功到达目标位置
bool or tuple: 如果scan_qrcode为False返回bool表示是否成功到达目标位置
如果scan_qrcode为True返回(bool, str)元组表示(是否成功到达目标位置, QR码扫描结果)
"""
# 启动异步QR码扫描如果需要
qr_result = None
if scan_qrcode:
# 确保image_processor存在
if not hasattr(ctrl, 'image_processor') or ctrl.image_processor is None:
print("警告: 无法启用QR码扫描image_processor不存在")
scan_qrcode = False
else:
# 启动异步扫描
try:
ctrl.image_processor.start_async_scan(interval=0.2)
print("已启动异步QR码扫描")
except Exception as e:
print(f"启动QR码扫描失败: {e}")
scan_qrcode = False
# 首先校准到水平
print("校准到横向线水平")
aligned = align_to_horizontal_line(ctrl, msg, observe=observe)
if not aligned:
print("无法校准到横向线水平,停止移动")
if scan_qrcode:
ctrl.image_processor.stop_async_scan()
return False, None
return False
# 校准后检查是否已经扫描到QR码
if scan_qrcode:
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
if qr_data:
qr_result = qr_data
print(f"🔍 校准过程中已扫描到QR码: {qr_data}")
# 检测横向线
# image = cv2.imread("current_image.jpg") # TEST
image = ctrl.image_processor.get_current_image()
edge_point, edge_info = detect_horizontal_track_edge(image, observe=observe)
if edge_point is None or edge_info is None:
print("无法检测到横向线,停止移动")
if scan_qrcode:
ctrl.image_processor.stop_async_scan()
return False, qr_result
return False
# 获取相机高度
@ -240,6 +271,9 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
if current_distance is None:
print("无法计算到横向线的距离,停止移动")
if scan_qrcode:
ctrl.image_processor.stop_async_scan()
return False, qr_result
return False
if observe:
@ -250,6 +284,9 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
if abs(distance_to_move) < 0.05: # 如果已经很接近目标距离
print("已经达到目标距离,无需移动")
if scan_qrcode:
ctrl.image_processor.stop_async_scan()
return True, qr_result
return True
# 设置移动命令
@ -289,8 +326,9 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
distance_moved = 0
start_time = time.time()
timeout = move_time + 1 # 超时时间设置为预计移动时间加1秒
last_qr_check_time = 0
# 监控移动距离,但不执行减速改用stop_smooth
# 监控移动距离,并在移动过程中检查QR码如果启用
while distance_moved < abs(distance_to_move) * 0.95 and time.time() - start_time < timeout:
# 计算已移动距离
current_position = ctrl.odo_msg.xyz
@ -298,6 +336,16 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
dy = current_position[1] - start_position[1]
distance_moved = math.sqrt(dx*dx + dy*dy)
# 检查QR码扫描结果如果启用
if scan_qrcode:
current_time = time.time()
if current_time - last_qr_check_time >= qr_check_interval:
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
if qr_data and scan_time > start_time: # 确保是在移动开始后的扫描结果
qr_result = qr_data
print(f"🔍 在移动过程中扫描到QR码: {qr_data}")
last_qr_check_time = current_time
if observe and time.time() % 0.5 < 0.02: # 每0.5秒左右打印一次
print(f"已移动: {distance_moved:.3f}米, 目标: {abs(distance_to_move):.3f}")
@ -319,8 +367,24 @@ def move_to_hori_line(ctrl, msg, target_distance=0.5, observe=False):
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(end_position[0], end_position[1], end_position[2] if len(end_position) > 2 else 0.0, 'red', observe=True)
# 如果没有提供图像处理器或图像验证失败,则使用里程计数据判断
return abs(distance_moved - abs(distance_to_move)) < 0.1 # 如果误差小于10厘米则认为成功
# 移动完成后再检查一次QR码扫描结果
if scan_qrcode:
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
if qr_data and (qr_result is None or scan_time > last_qr_check_time):
qr_result = qr_data
print(f"🔍 移动完成后最终扫描到QR码: {qr_data}")
# 停止异步扫描
ctrl.image_processor.stop_async_scan()
# 判断移动是否成功
move_success = abs(distance_moved - abs(distance_to_move)) < 0.1 # 如果误差小于10厘米则认为成功
# 根据scan_qrcode参数返回不同格式的结果
if scan_qrcode:
return move_success, qr_result
else:
return move_success
def arc_turn_around_hori_line(ctrl, msg, angle_deg=90, left=True, target_distance=0.2, observe=False):
"""

View File

@ -1,10 +1,13 @@
import time
import sys
import os
import math
# 添加父目录到路径以便能够导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from base_move.move_base_hori_line import move_to_hori_line, arc_turn_around_hori_line
from base_move.move_base_hori_line import move_to_hori_line, arc_turn_around_hori_line, align_to_horizontal_line
from utils.detect_track import detect_horizontal_track_edge
from base_move.move_base_hori_line import calculate_distance_to_line
observe = True
@ -16,7 +19,20 @@ def run_task_1(ctrl, msg):
arc_turn_around_hori_line(ctrl, msg, angle_deg=85, left=False, observe=observe)
print('😺 task 1 - 2')
move_to_hori_line(ctrl, msg, target_distance=1, observe=observe)
# 使用内置的QR码扫描功能执行移动
move_success, qr_result = move_to_hori_line(
ctrl=ctrl,
msg=msg,
target_distance=1,
observe=observe,
scan_qrcode=True, # 启用QR码扫描
qr_check_interval=0.3 # 每0.3秒检查一次QR码结果
)
if qr_result:
print(f"🎯 成功扫描到QR码: {qr_result}")
else:
print("⚠️ 未能扫描到任何QR码")
print('😺 task 1 - 3')
arc_turn_around_hori_line(ctrl, msg, angle_deg=180, target_distance=0.4, left=True, observe=observe)
@ -27,4 +43,26 @@ def run_task_1(ctrl, msg):
move_to_hori_line(ctrl, msg, observe=observe)
# 保留move_with_qr_scan函数作为备份如果需要特殊处理可以使用
def move_with_qr_scan(ctrl, msg, target_distance=0.5, observe=False):
"""
结合移动到指定位置和 QR 码扫描功能的函数使用异步扫描
参数:
ctrl: Robot_Ctrl 对象
msg: 机器人控制消息对象
target_distance: 目标距离
observe: 是否打印调试信息
"""
# 直接使用内置的扫描功能
return move_to_hori_line(
ctrl=ctrl,
msg=msg,
target_distance=target_distance,
observe=observe,
scan_qrcode=True,
qr_check_interval=0.3
)

View File

@ -7,7 +7,9 @@ from cv_bridge import CvBridge
import cv2
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from qreader import QReader
from threading import Thread
from threading import Thread, Lock
import time
import queue
class ImageSubscriber(Node):
@ -45,6 +47,15 @@ class ImageProcessor:
self.qreader = QReader()
self.spin_thread = None
self.running = True
# 异步 QR 码扫描相关
self.scan_thread = None
self.image_queue = queue.Queue(maxsize=3) # 限制队列大小,只保留最新的图像
self.scan_lock = Lock()
self.last_qr_result = None
self.last_qr_time = 0
self.is_scanning = False
self.enable_async_scan = False
def run(self):
self.spin_thread = Thread(target=self._spin)
@ -59,6 +70,7 @@ class ImageProcessor:
def destroy(self):
self.running = False
self.stop_async_scan()
if self.spin_thread:
self.spin_thread.join()
self.image_subscriber.destroy_node()
@ -69,8 +81,72 @@ class ImageProcessor:
def decode_qrcode(self, img = None):
if img is None:
img = self.get_current_image()
if img is None:
return None
decoded_info = self.qreader.detect_and_decode(image=img)
return decoded_info[0]
if decoded_info and len(decoded_info) > 0:
return decoded_info[0]
return None
def start_async_scan(self, interval=0.3):
"""
启动异步 QR 码扫描
参数:
interval: 扫描间隔单位秒
"""
if self.scan_thread is not None and self.scan_thread.is_alive():
print("异步扫描已经在运行中")
return
self.enable_async_scan = True
self.is_scanning = False
self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,))
self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束
self.scan_thread.start()
print("启动异步 QR 码扫描线程")
def stop_async_scan(self):
"""停止异步 QR 码扫描"""
self.enable_async_scan = False
if self.scan_thread and self.scan_thread.is_alive():
self.scan_thread.join(timeout=1.0)
print("异步 QR 码扫描线程已停止")
def _async_scan_worker(self, interval):
"""异步扫描工作线程"""
last_scan_time = 0
while self.enable_async_scan and self.running:
current_time = time.time()
# 按指定间隔扫描
if current_time - last_scan_time >= interval:
img = self.get_current_image()
if img is not None:
try:
self.is_scanning = True
qr_data = self.decode_qrcode(img)
self.is_scanning = False
with self.scan_lock:
if qr_data:
self.last_qr_result = qr_data
self.last_qr_time = current_time
print(f"异步扫描到 QR 码: {qr_data}")
except Exception as e:
self.is_scanning = False
print(f"异步 QR 码扫描出错: {e}")
last_scan_time = current_time
# 短暂休眠避免占用过多 CPU
time.sleep(0.05)
def get_last_qr_result(self):
"""获取最后一次成功扫描的 QR 码结果"""
with self.scan_lock:
return self.last_qr_result, self.last_qr_time
""" DEBUG """