mi-task/tmp/img-demo.py

110 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from datetime import datetime
import os
class SimpleImageSubscriber(Node):
def __init__(self):
super().__init__('simple_image_subscriber')
# Define QoS Profile
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
self.subscription = self.create_subscription(
Image,
'/rgb_camera/image_raw',
self.image_callback,
qos_profile=qos_profile
)
self.subscription # Prevent unused variable warning
self.image_received = False # Flag to track if image is received
self.image_count = 0 # Count received images
def image_callback(self, msg):
if self.image_received:
return # Skip processing if already processed
try:
# Get image metadata
height = msg.height
width = msg.width
encoding = msg.encoding
step = msg.step
data_size = len(msg.data)
# Log image information
self.get_logger().info(f"收到图像消息!")
self.get_logger().info(f"图像尺寸: {width} x {height}")
self.get_logger().info(f"编码格式: {encoding}")
self.get_logger().info(f"步长: {step}")
self.get_logger().info(f"数据大小: {data_size} 字节")
# Save image data to a simple text file for verification
self.save_image_info(height, width, encoding, step, data_size)
self.image_received = True
self.image_count += 1
# Log success and continue listening for more images
self.get_logger().info(f"成功处理图像 #{self.image_count}")
# Optionally shutdown after receiving a few images
if self.image_count >= 3:
self.get_logger().info("已收到3张图像测试完成!")
self.destroy_node()
rclpy.shutdown()
exit(0)
except Exception as e:
self.get_logger().error(f'处理图像时出错: {str(e)}')
def save_image_info(self, height, width, encoding, step, data_size):
# Create output directory if it doesn't exist
output_dir = "./captured_images/dual_path"
os.makedirs(output_dir, exist_ok=True)
# Generate a timestamped filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{output_dir}/image_info_{timestamp}.txt"
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"图像信息记录 - {timestamp}\n")
f.write(f"图像尺寸: {width} x {height}\n")
f.write(f"编码格式: {encoding}\n")
f.write(f"步长: {step}\n")
f.write(f"数据大小: {data_size} 字节\n")
f.write(f"ROS节点: {self.get_name()}\n")
f.write(f"话题: /rgb_camera/image_raw\n")
f.write("图像数据已成功接收ROS模块工作正常!\n")
self.get_logger().info(f"图像信息已保存到: {filename}")
except Exception as e:
self.get_logger().error(f"保存图像信息时出错: {str(e)}")
def main(args=None):
rclpy.init(args=args)
simple_subscriber = SimpleImageSubscriber()
print("启动简化的ROS图像订阅测试...")
print("正在监听话题: /rgb_camera/image_raw")
print("按 Ctrl+C 停止测试")
try:
rclpy.spin(simple_subscriber)
except KeyboardInterrupt:
print("\n测试被用户中断")
finally:
print("测试完成,清理资源...")
simple_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()