test: new camera

This commit is contained in:
havocrao 2025-08-18 11:32:16 +08:00
parent 0aa9da9327
commit 932e582e9d
2 changed files with 130 additions and 11 deletions

View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
测试AI相机集成的脚本
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.image_raw import ImageProcessor
import time
def test_ai_camera_integration():
"""测试AI相机集成功能"""
print("开始测试AI相机集成...")
try:
# 创建图像处理器实例
processor = ImageProcessor()
print("✓ 图像处理器创建成功")
# 启动处理器
processor.run()
print("✓ 图像处理器启动成功")
# 等待一段时间让相机启动
print("等待相机启动...")
time.sleep(3)
# 测试获取图像
image = processor.get_current_image()
if image is not None:
print(f"✓ 成功获取图像,尺寸: {image.shape}")
else:
print("⚠ 未获取到图像")
# 测试异步QR码扫描
print("启动异步QR码扫描...")
processor.start_async_scan(interval=1.0)
# 运行一段时间
print("运行10秒进行测试...")
time.sleep(10)
# 停止扫描
processor.stop_async_scan()
print("✓ 异步扫描已停止")
# 获取QR码结果
qr_result, qr_time = processor.get_last_qr_result()
if qr_result:
print(f"✓ 扫描到QR码: {qr_result}")
else:
print(" 未扫描到QR码")
# 清理资源
processor.destroy()
print("✓ 资源清理完成")
print("测试完成!")
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_ai_camera_integration()

View File

@ -5,33 +5,81 @@ from rclpy.node import Node
from sensor_msgs.msg import Image from sensor_msgs.msg import Image
from cv_bridge import CvBridge from cv_bridge import CvBridge
import cv2 import cv2
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
# from qreader import QReader # from qreader import QReader
from threading import Thread, Lock from threading import Thread, Lock
import time import time
import queue import queue
from utils.log_helper import get_logger from utils.log_helper import get_logger
# 导入AI相机服务
from protocol.srv import CameraService
class ImageSubscriber(Node): class ImageSubscriber(Node):
def __init__(self): def __init__(self):
super().__init__('image_subscriber') super().__init__('image_subscriber')
# 定义 QoS 配置(匹配发布者的可靠性策略)
qos_profile = QoSProfile( # 创建AI相机服务客户端
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT取决于发布者 self.camera_client = self.create_client(
history=QoSHistoryPolicy.KEEP_LAST, CameraService,
depth=10 '/camera_service'
) )
self.subscription = self.create_subscription( # 等待服务可用
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('AI相机服务不可用等待中...')
# 订阅AI相机图像话题
self.image_subscription = self.create_subscription(
Image, Image,
'/rgb_camera/image_raw', '/image',
self.image_callback, self.image_callback,
qos_profile=qos_profile 10
) )
self.subscription # 防止未使用变量警告
self.bridge = CvBridge() self.bridge = CvBridge()
self.cv_image = None # Store latest image self.cv_image = None # Store latest image
self.camera_started = False
# 启动AI相机
self.start_camera()
def start_camera(self):
"""启动AI相机"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width = 640 # 图像宽度
req.height = 480 # 图像高度
req.fps = 30 # 帧率
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已启动')
self.camera_started = True
return True
else:
self.get_logger().error(f'启动AI相机失败: {future.result().msg}')
return False
def stop_camera(self):
"""停止AI相机"""
if not self.camera_started:
return True
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已停止')
self.camera_started = False
return True
else:
self.get_logger().error(f'停止AI相机失败: {future.result().msg}')
return False
def image_callback(self, msg): def image_callback(self, msg):
try: try:
@ -75,6 +123,8 @@ class ImageProcessor:
self.stop_async_scan() self.stop_async_scan()
if self.spin_thread: if self.spin_thread:
self.spin_thread.join() self.spin_thread.join()
# 停止AI相机
self.image_subscriber.stop_camera()
self.image_subscriber.destroy_node() self.image_subscriber.destroy_node()
def get_current_image(self): def get_current_image(self):
@ -162,6 +212,7 @@ def main(args=None):
pass pass
# 清理 # 清理
image_subscriber.stop_camera()
image_subscriber.destroy_node() image_subscriber.destroy_node()