mi-task/test/ros2/rgb-camera/img-raw-get.py

71 lines
2.2 KiB
Python
Raw Normal View History

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from datetime import datetime
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
# Define QoS Profile
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
self.subscription = self.create_subscription(
Image,
'/rgb_camera/image_raw',
self.image_callback,
qos_profile=qos_profile
)
self.subscription # Prevent unused variable warning
self.bridge = CvBridge()
self.cv_image = None # Store latest image
self.image_saved = False # Flag to track if image is saved
def image_callback(self, msg):
if self.image_saved:
return # Skip processing if image already saved
try:
# Convert image message to OpenCV format
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
self.save_image()
self.image_saved = True # Mark image as saved
# Shutdown the node after saving the image
self.destroy_node()
rclpy.shutdown()
cv2.destroyAllWindows()
exit(0)
except Exception as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
def save_image(self):
if self.cv_image is not None:
# Generate a timestamped filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"captured_images/arrow-right/image_{timestamp}.png"
cv2.imwrite(filename, self.cv_image)
self.get_logger().info(f"Saved image as {filename}")
else:
self.get_logger().warn("No image received yet to save.")
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
try:
rclpy.spin(image_subscriber)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()