- Update image saving path in ImageSubscriber class - Remove unused imports and code related to task_4 and arrow detection - Add import for run_task_test - Comment out run_task_4 and uncomment run_task_test in main function
71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
from sensor_msgs.msg import Image
|
|
from cv_bridge import CvBridge
|
|
import cv2
|
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
|
from datetime import datetime
|
|
|
|
class ImageSubscriber(Node):
|
|
def __init__(self):
|
|
super().__init__('image_subscriber')
|
|
|
|
# Define QoS Profile
|
|
qos_profile = QoSProfile(
|
|
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
|
history=QoSHistoryPolicy.KEEP_LAST,
|
|
depth=10
|
|
)
|
|
|
|
self.subscription = self.create_subscription(
|
|
Image,
|
|
'/rgb_camera/image_raw',
|
|
self.image_callback,
|
|
qos_profile=qos_profile
|
|
)
|
|
self.subscription # Prevent unused variable warning
|
|
self.bridge = CvBridge()
|
|
self.cv_image = None # Store latest image
|
|
self.image_saved = False # Flag to track if image is saved
|
|
|
|
def image_callback(self, msg):
|
|
if self.image_saved:
|
|
return # Skip processing if image already saved
|
|
|
|
try:
|
|
# Convert image message to OpenCV format
|
|
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
|
self.save_image()
|
|
self.image_saved = True # Mark image as saved
|
|
|
|
# Shutdown the node after saving the image
|
|
self.destroy_node()
|
|
rclpy.shutdown()
|
|
cv2.destroyAllWindows()
|
|
exit(0)
|
|
|
|
except Exception as e:
|
|
self.get_logger().error('Failed to convert image: %s' % str(e))
|
|
|
|
def save_image(self):
|
|
if self.cv_image is not None:
|
|
# Generate a timestamped filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"./captured_images/left_path/image_{timestamp}.png"
|
|
|
|
cv2.imwrite(filename, self.cv_image)
|
|
self.get_logger().info(f"Saved image as {filename}")
|
|
else:
|
|
self.get_logger().warn("No image received yet to save.")
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
image_subscriber = ImageSubscriber()
|
|
|
|
try:
|
|
rclpy.spin(image_subscriber)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
main() |