mi-task/test/demo_test/rgb_camera_demo.py

78 lines
2.5 KiB
Python
Raw Normal View History

2025-08-19 14:47:19 +08:00
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import time
from datetime import datetime
class CameraSubscriber(Node):
def __init__(self):
super().__init__('camera_subscriber')
self.bridge = CvBridge()
self.screenshot_taken = False
self.start_time = time.time()
self.wait_time = 10.0 # 最大等待时间
# 订阅带诊断功能
self.sub_rgb = self.create_subscription(
Image,
'/mi_desktop_48_b0_2d_7b_05_1d/image', # 修改为实际发布的话题
self.rgb_callback,
10)
# 定时器检查连接状态
self.timer = self.create_timer(1.0, self.check_connection)
self.get_logger().info("wrong!")
def check_connection(self):
elapsed = time.time() - self.start_time
if elapsed > self.wait_time and not any([hasattr(self, f'{cam}_image')
for cam in ['left', 'right', 'rgb']]):
self.get_logger().error(f"No images received after {self.wait_time} seconds!")
self.diagnose_connection()
def diagnose_connection(self):
self.get_logger().info("Running diagnostics...")
# 检查话题列表
topics = self.get_topic_names_and_types()
cam_topics = [t[0] for t in topics if 'image' in t[0].lower()]
self.get_logger().info(f"Available image topics: {cam_topics}")
# 建议用户检查的命令
self.get_logger().info("Suggested checks:")
def save_image(self, image, name):
if image is not None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{name}_{timestamp}.png"
cv2.imwrite(filename, image)
self.get_logger().info(f"Saved {filename}")
def rgb_callback(self, msg):
self.rgb_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
self.check_screenshot()
def check_screenshot(self):
if time.time() - self.start_time >= 3.0 and not self.screenshot_taken:
self.save_image(self.rgb_image, 'rgb')
self.screenshot_taken = True
def main(args=None):
rclpy.init(args=args)
node = CameraSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()