mi-task/test/demo_test/qr_detect_demo.py
2025-08-18 20:34:03 +08:00

266 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from protocol.srv import CameraService
import cv2
import os
import sys
import time
from datetime import datetime
from pyzbar import pyzbar
# 添加utils路径以便导入image_raw模块
sys.path.append('/home/mi-task')
from utils.image_raw import ImageProcessor
# 配置显示环境(关键修改)
os.environ['OPENCV_VIDEOIO_PRIORITY_GTK'] = '1'
os.environ['DISPLAY'] = ':0' # 使用主显示器
class AICameraDemo(Node):
def __init__(self):
super().__init__('ai_camera_demo')
# 创建服务客户端
self.camera_client = self.create_client(CameraService, '/camera_service')
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待AI相机服务...')
# 图像订阅
self.image_subscription = self.create_subscription(
Image, '/image', self.image_callback, 10
)
# 截图控制参数
self.last_capture_time = time.time()
self.start_time = time.time()
self.capture_interval = 3.0 # 截图间隔(秒)
self.total_duration = 30.0 # 总运行时间(秒)
self.save_dir = "captures" # 普通截图保存目录
self.qr_save_dir = "qr_captures" # QR码截图保存目录
os.makedirs(self.save_dir, exist_ok=True)
os.makedirs(self.qr_save_dir, exist_ok=True)
# QR码扫描相关
self.last_qr_check_time = time.time()
self.qr_check_interval = 1.0 # QR码检测间隔
self.found_qr_codes = set() # 记录已发现的QR码避免重复输出
# 初始化OpenCV窗口
self.window_name = 'AI Camera Feed'
try:
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
self.gui_enabled = True
except:
self.get_logger().error("无法创建OpenCV窗口将仅保存截图")
self.gui_enabled = False
# 初始化图像处理器用于QR码扫描
self.current_image = None
def start_camera(self):
"""启动相机服务"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width, req.height, req.fps = 640, 480, 30
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
if result.result == 0:
self.get_logger().info('AI相机启动成功')
return True
else:
self.get_logger().error(f'AI相机启动失败 (错误码: {result.result})')
return False
def stop_camera(self):
"""停止相机服务"""
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
if result.result == 0:
self.get_logger().info('AI相机已停止')
return True
else:
self.get_logger().error(f'停止AI相机失败: {result.msg}')
return False
def detect_qr_codes(self, img):
"""使用pyzbar检测QR码"""
try:
# 检测所有条形码/QR码
decoded_objects = pyzbar.decode(img)
results = []
for obj in decoded_objects:
qr_data = obj.data.decode('utf-8')
qr_type = obj.type
# 在图像上绘制检测框
points = obj.polygon
if len(points) > 4:
hull = cv2.convexHull(np.array([point for point in points], dtype=np.float32))
cv2.polylines(img, [np.int32(hull)], True, (0, 255, 0), 3)
else:
cv2.polylines(img, [np.int32(points)], True, (0, 255, 0), 3)
# 添加文本标签
x = obj.rect.left
y = obj.rect.top - 10
cv2.putText(img, f'{qr_type}: {qr_data[:20]}...', (x, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
results.append({
'data': qr_data,
'type': qr_type,
'rect': obj.rect
})
return results, img
except Exception as e:
self.get_logger().error(f"QR码检测错误: {str(e)}")
return [], img
def image_callback(self, msg):
"""处理图像并定时截图"""
current_time = time.time()
elapsed = current_time - self.start_time
# 超过总时长则停止
if elapsed >= self.total_duration:
self.get_logger().info("已完成30秒截图任务")
raise KeyboardInterrupt
try:
# 转换图像格式
if msg.encoding in ['rgb8', 'bgr8']:
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
if msg.encoding == 'rgb8':
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 保存当前图像用于QR码检测
self.current_image = img.copy()
# QR码检测定时进行避免影响性能
if current_time - self.last_qr_check_time >= self.qr_check_interval:
qr_results, img_with_overlay = self.detect_qr_codes(img.copy())
# 处理检测到的QR码
for qr in qr_results:
qr_data = qr['data']
if qr_data not in self.found_qr_codes:
self.found_qr_codes.add(qr_data)
self.get_logger().info(f"🔍 检测到 {qr['type']}: {qr_data}")
# 保存包含QR码的特殊截图到专用文件夹
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
qr_filename = f"{self.qr_save_dir}/qr_detected_{timestamp}.jpg"
cv2.imwrite(qr_filename, img_with_overlay)
self.get_logger().info(f"📷 QR码检测截图已保存: {qr_filename}")
self.last_qr_check_time = current_time
# 如果检测到QR码显示带标注的图像
if qr_results and self.gui_enabled:
cv2.imshow(self.window_name, img_with_overlay)
elif self.gui_enabled:
cv2.imshow(self.window_name, img)
else:
# 正常显示
if self.gui_enabled:
cv2.imshow(self.window_name, img)
# 定时截图逻辑
if current_time - self.last_capture_time >= self.capture_interval:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/capture_{timestamp}.jpg"
cv2.imwrite(filename, img)
self.get_logger().info(f"📸 定时截图已保存: {filename}")
self.last_capture_time = current_time
# OpenCV窗口事件处理
if self.gui_enabled:
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # 按'q'退出
raise KeyboardInterrupt
elif key == ord('s'): # 按's'手动截图
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/manual_{timestamp}.jpg"
cv2.imwrite(filename, img)
self.get_logger().info(f"📱 手动截图已保存: {filename}")
except Exception as e:
self.get_logger().error(f"图像处理错误: {str(e)}")
def print_summary(self):
"""打印运行总结"""
self.get_logger().info("=" * 50)
self.get_logger().info("📊 运行总结:")
self.get_logger().info(f"🕐 总运行时间: {self.total_duration}")
self.get_logger().info(f"📷 截图间隔: {self.capture_interval}")
self.get_logger().info(f"🔍 发现QR码数量: {len(self.found_qr_codes)}")
if self.found_qr_codes:
self.get_logger().info("📋 发现的QR码:")
for i, qr_data in enumerate(self.found_qr_codes, 1):
self.get_logger().info(f" {i}. {qr_data}")
# 统计保存的文件
if os.path.exists(self.save_dir):
capture_files = os.listdir(self.save_dir)
self.get_logger().info(f"💾 普通截图文件数: {len(capture_files)}")
if os.path.exists(self.qr_save_dir):
qr_files = os.listdir(self.qr_save_dir)
self.get_logger().info(f"🔍 QR码截图文件数: {len(qr_files)}")
self.get_logger().info("=" * 50)
def main(args=None):
rclpy.init(args=args)
demo = AICameraDemo()
try:
if demo.start_camera():
demo.get_logger().info("🚀 开始AI相机演示...")
demo.get_logger().info("📋 功能说明:")
demo.get_logger().info(" - 每3秒自动截图")
demo.get_logger().info(" - 实时检测QR码/条形码")
demo.get_logger().info(" - 按 's' 手动截图")
demo.get_logger().info(" - 按 'q' 退出程序")
demo.get_logger().info(" - 程序将在30秒后自动结束")
demo.get_logger().info("-" * 40)
rclpy.spin(demo)
else:
demo.get_logger().error("❌ 无法启动AI相机")
except KeyboardInterrupt:
demo.get_logger().info("🛑 程序被用户终止")
except Exception as e:
demo.get_logger().error(f"❌ 运行错误: {str(e)}")
finally:
# 打印总结
demo.print_summary()
# 清理资源
demo.stop_camera()
if demo.gui_enabled:
cv2.destroyAllWindows()
demo.destroy_node()
rclpy.shutdown()
demo.get_logger().info("🏁 程序已安全退出")
if __name__ == '__main__':
main()