mi-task/basic_motion/camera.py

158 lines
5.6 KiB
Python
Raw Normal View History

2025-08-21 15:29:20 +08:00
#!/usr/bin/env python3bu
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from rclpy.qos import qos_profile_sensor_data, QoSProfile, ReliabilityPolicy
import cv2
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
class MiroCameraROS2(Node):
def __init__(self):
"""初始化ROS 2节点和相机订阅者"""
super().__init__('miro_camera_node')
# 创建CV桥接器用于ROS图像消息和OpenCV图像之间的转换
self.bridge = CvBridge()
# 存储相机图像的变量
self.image = None
# 声明参数
self.declare_parameter('namespace', '')
self.declare_parameter('camera_topic', '/rgb_camera/image_raw')
# 获取参数值
self.namespace = self.get_parameter('namespace').get_parameter_value().string_value
self.camera_topic = self.get_parameter('camera_topic').get_parameter_value().string_value
# 构建完整话题名称
self.full_topic = f"{self.namespace}{self.camera_topic}"
# 创建与相机发布者匹配的QoS配置BEST_EFFORT
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT, # 与相机驱动保持一致
history=qos_profile_sensor_data.history, # 保留sensor_data的其他配置
depth=qos_profile_sensor_data.depth
)
# 订阅相机图像话题使用BEST_EFFORT QoS配置
self.image_sub = self.create_subscription(
Image,
self.full_topic,
self.image_callback,
qos
)
self.get_logger().info(f"正在监听话题: {self.full_topic}")
self.get_logger().info(f"使用QoS配置: Reliability={qos.reliability.name}, Depth={qos.depth}")
def image_callback(self, data):
"""处理相机图像消息"""
try:
# 将ROS图像消息转换为OpenCV格式BGR
cv_image = self.bridge.imgmsg_to_cv2(data, 'bgr8')
# 转换为RGB格式
self.image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
except Exception as e:
self.get_logger().error(f'处理相机图像时出错: {e}')
def get_latest_image(self, timeout_sec=5.0):
"""获取最新的相机图像,带超时机制"""
start_time = self.get_clock().now().nanoseconds * 1e-9
# 等待图像数据
while rclpy.ok():
if self.image is not None:
return self.image
# 检查是否超时
current_time = self.get_clock().now().nanoseconds * 1e-9
if current_time - start_time > timeout_sec:
self.get_logger().warn(f'等待图像超时 ({timeout_sec}秒)')
return None
rclpy.spin_once(self, timeout_sec=0.1)
def save_image(self, image, filename=None):
"""保存图像到文件"""
if image is None:
self.get_logger().error('错误:没有可用的图像')
return
if filename is None:
# 默认文件名:当前时间戳
filename = f'miro_camera_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png'
try:
# 转换回BGR格式保存
cv_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
cv2.imwrite(filename, cv_image)
self.get_logger().info(f'图像已保存为: {filename}')
except Exception as e:
self.get_logger().error(f'保存图像时出错: {e}')
def show_image(self, image, title='Miro Camera Image'):
"""显示图像使用matplotlib"""
if image is None:
self.get_logger().error('错误:没有可用的图像')
return
plt.figure(figsize=(10, 8))
plt.imshow(image)
plt.title(title)
plt.axis('off')
plt.show()
# 使用示例
def main(args=None):
rclpy.init(args=args)
try:
# 创建相机接口实例
camera = MiroCameraROS2()
# 循环获取并显示图像
for i in range(5): # 获取5次图像
if not rclpy.ok():
break
# 获取最新图像
image = camera.get_latest_image()
# 显示并保存图像
if image is not None:
camera.show_image(image, f'Camera Image (Frame {i+1})')
camera.save_image(image, f'camera_image_{i+1}.png')
except KeyboardInterrupt:
camera.get_logger().info('程序已被用户中断')
finally:
# 清理资源
camera.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#python3 camera.py --ros-args -p auto_detect:=true
# /apply_force ['cyberdog_msg/msg/ApplyForce']
# /clicked_point ['geometry_msgs/msg/PointStamped']
# /clock ['rosgraph_msgs/msg/Clock']
# /goal_pose ['geometry_msgs/msg/PoseStamped']
# /imu ['sensor_msgs/msg/Imu']
# /initialpose ['geometry_msgs/msg/PoseWithCovarianceStamped']
# /joint_states ['sensor_msgs/msg/JointState']
# /parameter_events ['rcl_interfaces/msg/ParameterEvent']
# /performance_metrics ['gazebo_msgs/msg/PerformanceMetrics']
# /rgb_camera/camera_info ['sensor_msgs/msg/CameraInfo']
# /rgb_camera/image_raw ['sensor_msgs/msg/Image']
# /robot_description ['std_msgs/msg/String']
# /rosout ['rcl_interfaces/msg/Log']
# /tf ['tf2_msgs/msg/TFMessage']
# /tf_static ['tf2_msgs/msg/TFMessage']
# /yaml_parameter ['cyberdog_msg/msg/YamlParam']