mi-task/test/demo_test/ai_camera_demo.py
2025-08-18 11:06:42 +08:00

125 lines
3.9 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from protocol.srv import CameraService
import time
from PIL import Image as PILImage # 用于保存图像
import cv2
class AICameraDemo(Node):
def __init__(self):
super().__init__('ai_camera_demo')
# 创建AI相机服务客户端
self.camera_client = self.create_client(
CameraService,
'/camera_service'
)
# 等待服务可用
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('AI相机服务不可用等待中...')
# 订阅AI相机图像话题
self.image_subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10
)
# 存储最新图像
self.latest_image = None
self.start_time = time.time() # 记录启动时间
self.saved = False # 标记是否已保存
# 存储最新图像
self.latest_image = None
self.start_time = time.time()
self.window_name = 'AI Camera Feed' # OpenCV窗口名称
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL) # 创建显示窗口
def start_camera(self):
"""启动AI相机"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width = 640 # 图像宽度
req.height = 480 # 图像高度
req.fps = 30 # 帧率
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已启动')
return True
else:
self.get_logger().error(f'启动AI相机失败: {future.result().msg}')
return False
def stop_camera(self):
"""停止AI相机"""
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已停止')
return True
else:
self.get_logger().error(f'停止AI相机失败: {future.result().msg}')
return False
def image_callback(self, msg):
"""处理bgr8和rgb8格式的图像"""
current_time = time.time()
elapsed = current_time - self.start_time
if not self.saved and elapsed >= 3.0:
self.latest_image = msg
self.get_logger().info('收到AI相机图像准备保存...')
# 支持rgb8和bgr8格式
if msg.encoding in ['rgb8', 'bgr8']:
# 转换为numpy数组
image_data = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
# 如果是bgr格式转换为rgb
if msg.encoding == 'bgr8':
image_data = image_data[:, :, [2, 1, 0]] # BGR -> RGB
else:
self.get_logger().warn(f'不支持的图像格式: {msg.encoding}')
cv2.imshow(self.window_name, image_data)
cv2.waitKey(1) # 必须调用,否则窗口不会更新
def main(args=None):
rclpy.init(args=args)
demo = AICameraDemo()
try:
# 启动AI相机
if demo.start_camera():
# 运行节点,持续接收图像
rclpy.spin(demo)
except KeyboardInterrupt:
demo.get_logger().info('用户中断')
finally:
# 停止AI相机
demo.stop_camera()
demo.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()