#!/usr/bin/env python3 #ros2 run camera_test camera_server import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 import os from pyzbar import pyzbar from threading import Thread, Lock import time import queue from datetime import datetime #from utils.log_helper import get_logger # 导入AI相机服务 from protocol.srv import CameraService #!/usr/bin/env python3 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 import os import time from datetime import datetime from protocol.srv import CameraService class ImageSubscriber(Node): def __init__(self): super().__init__('image_subscriber') # 初始化安全控制标志 self._shutdown_flag = False self._ros_initialized = True # 创建服务客户端 self.camera_client = self.create_client(CameraService, '/camera_service') while not self.camera_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('等待AI相机服务...') # 图像订阅 self.image_sub = self.create_subscription( Image, '/image', self.image_callback, 10 ) # 工具初始化 self.bridge = CvBridge() self.cv_image = None self.camera_started = False # 截图控制参数 self.last_save_time = time.time() self.start_time = time.time() self.save_interval = 0.5 self.total_duration = 1.0 self.save_dir = "saved_images" os.makedirs(self.save_dir, exist_ok=True) # 安全启动相机 self.start_camera() def start_camera(self): """安全启动相机""" if not self._ros_initialized: return False req = CameraService.Request() req.command = 9 req.width, req.height, req.fps = 640, 480, 30 future = self.camera_client.call_async(req) rclpy.spin_until_future_complete(self, future) result = future.result() self.get_logger().info(f'服务返回: [code={result.result}, msg="{result.msg}"]') if result.result == 0: self.get_logger().info('相机启动成功') self.camera_started = True return True else: self.get_logger().error(f'启动失败 (错误码 {result.result})') return False def stop_camera(self): """安全停止相机""" if not self.camera_started or not self._ros_initialized: return True req = CameraService.Request() req.command = 10 try: future = self.camera_client.call_async(req) rclpy.spin_until_future_complete(self, future, timeout_sec=2.0) if future.result().result == 0: self.get_logger().info('相机已停止') self.camera_started = False return True else: self.get_logger().error(f'停止失败: {future.result().msg}') return False except Exception as e: self.get_logger().error(f'停止异常: {str(e)}') return False def save_image(self): """安全保存图像""" if self.cv_image is None: self.get_logger().warning("无有效图像可保存") return False try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{self.save_dir}/capture_{timestamp}.jpg" cv2.imwrite(filename, self.cv_image) self.get_logger().info(f"截图已保存: {filename}") return True except Exception as e: self.get_logger().error(f"保存失败: {str(e)}") return False def image_callback(self, msg): """安全的图像回调""" if getattr(self, '_shutdown_flag', False): return current_time = time.time() elapsed = current_time - self.start_time if elapsed >= self.total_duration: self.get_logger().info("已完成30秒截图任务") self._shutdown_flag = True return try: self.cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8') if current_time - self.last_save_time >= self.save_interval: if self.save_image(): self.last_save_time = current_time except Exception as e: self.get_logger().error(f"图像处理错误: {str(e)}") def safe_spin(self): """安全spin循环""" while rclpy.ok() and not getattr(self, '_shutdown_flag', False): rclpy.spin_once(self, timeout_sec=0.1) def destroy(self): """安全销毁节点""" if not self._ros_initialized: return self.stop_camera() self._shutdown_flag = True if hasattr(self, '_ros_initialized'): self.destroy_node() rclpy.shutdown() def get_latest_image(self): """外部获取最新图像接口""" return self.cv_image.copy() if self.cv_image is not None else None class ImageProcessor: def __init__(self): self.image_subscriber = ImageSubscriber() self.spin_thread = None self.running = True # self.log = get_logger("图像处理器") # 异步 QR 码扫描相关 self.scan_thread = None self.image_queue = queue.Queue(maxsize=3) # 限制队列大小,只保留最新的图像 self.scan_lock = Lock() self.last_qr_result = None self.last_qr_time = 0 self.is_scanning = False self.enable_async_scan = False def run(self): self.spin_thread = Thread(target=self._spin) self.spin_thread.start() def _spin(self): try: while self.running: rclpy.spin_once(self.image_subscriber, timeout_sec=0.1) except Exception as e: # self.log.error(f"Spin线程错误: {e}", "错误") print('threat wrong') def destroy(self): self.running = False self.stop_async_scan() if self.spin_thread: self.spin_thread.join() # 停止AI相机 self.image_subscriber.stop_camera() self.image_subscriber.destroy_node() def get_current_image(self): return self.image_subscriber.cv_image def decode_qrcode(self, img=None): """使用pyzbar解码QR码""" if img is None: img = self.get_current_image() if img is None: return None try: # 使用pyzbar解码所有条形码/QR码 decoded_objects = pyzbar.decode(img) if decoded_objects: # 返回第一个QR码的数据(解码为字符串) qr_data = decoded_objects[0].data.decode('utf-8') return qr_data return None except Exception as e: print(f"QR码解码错误: {str(e)}") return None def decode_all_qrcodes(self, img=None): """解码图像中的所有QR码/条形码""" if img is None: img = self.get_current_image() if img is None: return [] try: decoded_objects = pyzbar.decode(img) results = [] for obj in decoded_objects: result = { 'data': obj.data.decode('utf-8'), 'type': obj.type, 'rect': obj.rect, # 位置信息 (x, y, width, height) 'polygon': obj.polygon # 多边形顶点 } results.append(result) return results except Exception as e: print(f"批量QR码解码错误: {str(e)}") return [] def start_async_scan(self, interval=0.3): """ 启动异步 QR 码扫描 参数: interval: 扫描间隔,单位秒 """ if self.scan_thread is not None and self.scan_thread.is_alive(): # self.log.warning("异步扫描已经在运行中", "警告") print('scan,warn') return self.enable_async_scan = True self.is_scanning = False self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,)) self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束 self.scan_thread.start() # self.log.info("启动异步 QR 码扫描线程", "启动") print('start') def stop_async_scan(self): """停止异步 QR 码扫描""" self.enable_async_scan = False if self.scan_thread and self.scan_thread.is_alive(): self.scan_thread.join(timeout=1.0) # self.log.info("异步 QR 码扫描线程已停止", "停止") print('stop') def _async_scan_worker(self, interval): """异步扫描工作线程""" last_scan_time = 0 while self.enable_async_scan and self.running: current_time = time.time() # 按指定间隔扫描 if current_time - last_scan_time >= interval: img = self.get_current_image() if img is not None: try: self.is_scanning = True qr_data = self.decode_qrcode(img) self.is_scanning = False with self.scan_lock: if qr_data: self.last_qr_result = qr_data self.last_qr_time = current_time # self.log.success(f"异步扫描到 QR 码: {qr_data}", "扫描") print(f"异步扫描到 QR 码: {qr_data}") except Exception as e: self.is_scanning = False # self.log.error(f"异步 QR 码扫描出错: {e}", "错误") print(f"异步 QR 码扫描出错: {e}") last_scan_time = current_time # 短暂休眠避免占用过多 CPU time.sleep(0.05) def get_last_qr_result(self): """获取最后一次成功扫描的 QR 码结果""" with self.scan_lock: return self.last_qr_result, self.last_qr_time """ DEBUG """ # def main(args=None): # rclpy.init(args=args) # image_subscriber = ImageSubscriber() # try: # rclpy.spin(image_subscriber) # except KeyboardInterrupt: # pass # # 清理 # image_subscriber.stop_camera() # image_subscriber.destroy_node() # if __name__ == '__main__': # main()