#!/usr/bin/env python3 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy from qreader import QReader from threading import Thread, Lock import time import queue class ImageSubscriber(Node): def __init__(self): super().__init__('image_subscriber') # 定义 QoS 配置(匹配发布者的可靠性策略) qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT,取决于发布者 history=QoSHistoryPolicy.KEEP_LAST, depth=10 ) self.subscription = self.create_subscription( Image, '/rgb_camera/image_raw', self.image_callback, qos_profile=qos_profile ) self.subscription # 防止未使用变量警告 self.bridge = CvBridge() self.cv_image = None # Store latest image def image_callback(self, msg): try: # 将ROS图像消息转换为OpenCV格式 self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') except Exception as e: self.get_logger().error('Failed to convert image: %s' % str(e)) class ImageProcessor: def __init__(self): self.image_subscriber = ImageSubscriber() self.qreader = QReader() self.spin_thread = None self.running = True # 异步 QR 码扫描相关 self.scan_thread = None self.image_queue = queue.Queue(maxsize=3) # 限制队列大小,只保留最新的图像 self.scan_lock = Lock() self.last_qr_result = None self.last_qr_time = 0 self.is_scanning = False self.enable_async_scan = False def run(self): self.spin_thread = Thread(target=self._spin) self.spin_thread.start() def _spin(self): try: while self.running: rclpy.spin_once(self.image_subscriber, timeout_sec=0.1) except Exception as e: print(f"Spin thread error: {e}") def destroy(self): self.running = False self.stop_async_scan() if self.spin_thread: self.spin_thread.join() self.image_subscriber.destroy_node() def get_current_image(self): return self.image_subscriber.cv_image def decode_qrcode(self, img = None): if img is None: img = self.get_current_image() decoded_info = self.qreader.detect_and_decode(image=img) if decoded_info and len(decoded_info) > 0: return decoded_info[0] return None def start_async_scan(self, interval=0.3): """ 启动异步 QR 码扫描 参数: interval: 扫描间隔,单位秒 """ if self.scan_thread is not None and self.scan_thread.is_alive(): print("异步扫描已经在运行中") return self.enable_async_scan = True self.is_scanning = False self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,)) self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束 self.scan_thread.start() print("启动异步 QR 码扫描线程") def stop_async_scan(self): """停止异步 QR 码扫描""" self.enable_async_scan = False if self.scan_thread and self.scan_thread.is_alive(): self.scan_thread.join(timeout=1.0) print("异步 QR 码扫描线程已停止") def _async_scan_worker(self, interval): """异步扫描工作线程""" last_scan_time = 0 while self.enable_async_scan and self.running: current_time = time.time() # 按指定间隔扫描 if current_time - last_scan_time >= interval: img = self.get_current_image() if img is not None: try: self.is_scanning = True qr_data = self.decode_qrcode(img) self.is_scanning = False with self.scan_lock: if qr_data: self.last_qr_result = qr_data self.last_qr_time = current_time print(f"异步扫描到 QR 码: {qr_data}") except Exception as e: self.is_scanning = False print(f"异步 QR 码扫描出错: {e}") last_scan_time = current_time # 短暂休眠避免占用过多 CPU time.sleep(0.05) def get_last_qr_result(self): """获取最后一次成功扫描的 QR 码结果""" with self.scan_lock: return self.last_qr_result, self.last_qr_time """ DEBUG """ def main(args=None): rclpy.init(args=args) image_subscriber = ImageSubscriber() try: rclpy.spin(image_subscriber) except KeyboardInterrupt: pass # 清理 image_subscriber.destroy_node() if __name__ == '__main__': main()