mi-task/utils/image_raw.py
2025-08-18 11:32:16 +08:00

221 lines
7.0 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
# from qreader import QReader
from threading import Thread, Lock
import time
import queue
from utils.log_helper import get_logger
# 导入AI相机服务
from protocol.srv import CameraService
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
# 创建AI相机服务客户端
self.camera_client = self.create_client(
CameraService,
'/camera_service'
)
# 等待服务可用
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('AI相机服务不可用等待中...')
# 订阅AI相机图像话题
self.image_subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10
)
self.bridge = CvBridge()
self.cv_image = None # Store latest image
self.camera_started = False
# 启动AI相机
self.start_camera()
def start_camera(self):
"""启动AI相机"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width = 640 # 图像宽度
req.height = 480 # 图像高度
req.fps = 30 # 帧率
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已启动')
self.camera_started = True
return True
else:
self.get_logger().error(f'启动AI相机失败: {future.result().msg}')
return False
def stop_camera(self):
"""停止AI相机"""
if not self.camera_started:
return True
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已停止')
self.camera_started = False
return True
else:
self.get_logger().error(f'停止AI相机失败: {future.result().msg}')
return False
def image_callback(self, msg):
try:
# 将ROS图像消息转换为OpenCV格式
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
class ImageProcessor:
def __init__(self):
self.image_subscriber = ImageSubscriber()
# self.qreader = QReader()
self.spin_thread = None
self.running = True
self.log = get_logger("图像处理器")
# 异步 QR 码扫描相关
self.scan_thread = None
self.image_queue = queue.Queue(maxsize=3) # 限制队列大小,只保留最新的图像
self.scan_lock = Lock()
self.last_qr_result = None
self.last_qr_time = 0
self.is_scanning = False
self.enable_async_scan = False
def run(self):
self.spin_thread = Thread(target=self._spin)
self.spin_thread.start()
def _spin(self):
try:
while self.running:
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
except Exception as e:
self.log.error(f"Spin线程错误: {e}", "错误")
def destroy(self):
self.running = False
self.stop_async_scan()
if self.spin_thread:
self.spin_thread.join()
# 停止AI相机
self.image_subscriber.stop_camera()
self.image_subscriber.destroy_node()
def get_current_image(self):
return self.image_subscriber.cv_image
def decode_qrcode(self, img = None):
if img is None:
img = self.get_current_image()
# decoded_info = self.qreader.detect_and_decode(image=img)
decoded_info = None
if decoded_info and len(decoded_info) > 0:
return decoded_info[0]
return None
def start_async_scan(self, interval=0.3):
"""
启动异步 QR 码扫描
参数:
interval: 扫描间隔,单位秒
"""
if self.scan_thread is not None and self.scan_thread.is_alive():
self.log.warning("异步扫描已经在运行中", "警告")
return
self.enable_async_scan = True
self.is_scanning = False
self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,))
self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束
self.scan_thread.start()
self.log.info("启动异步 QR 码扫描线程", "启动")
def stop_async_scan(self):
"""停止异步 QR 码扫描"""
self.enable_async_scan = False
if self.scan_thread and self.scan_thread.is_alive():
self.scan_thread.join(timeout=1.0)
self.log.info("异步 QR 码扫描线程已停止", "停止")
def _async_scan_worker(self, interval):
"""异步扫描工作线程"""
last_scan_time = 0
while self.enable_async_scan and self.running:
current_time = time.time()
# 按指定间隔扫描
if current_time - last_scan_time >= interval:
img = self.get_current_image()
if img is not None:
try:
self.is_scanning = True
qr_data = self.decode_qrcode(img)
self.is_scanning = False
with self.scan_lock:
if qr_data:
self.last_qr_result = qr_data
self.last_qr_time = current_time
self.log.success(f"异步扫描到 QR 码: {qr_data}", "扫描")
except Exception as e:
self.is_scanning = False
self.log.error(f"异步 QR 码扫描出错: {e}", "错误")
last_scan_time = current_time
# 短暂休眠避免占用过多 CPU
time.sleep(0.05)
def get_last_qr_result(self):
"""获取最后一次成功扫描的 QR 码结果"""
with self.scan_lock:
return self.last_qr_result, self.last_qr_time
""" DEBUG """
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
try:
rclpy.spin(image_subscriber)
except KeyboardInterrupt:
pass
# 清理
image_subscriber.stop_camera()
image_subscriber.destroy_node()
if __name__ == '__main__':
main()