mi-task/utils/image_raw.py

95 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from qreader import QReader
from threading import Thread
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
# 定义 QoS 配置(匹配发布者的可靠性策略)
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT取决于发布者
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
self.subscription = self.create_subscription(
Image,
'/rgb_camera/image_raw',
self.image_callback,
qos_profile=qos_profile
)
self.subscription # 防止未使用变量警告
self.bridge = CvBridge()
self.cv_image = None # Store latest image
def image_callback(self, msg):
try:
# 将ROS图像消息转换为OpenCV格式
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
class ImageProcessor:
def __init__(self):
rclpy.init()
self.image_subscriber = ImageSubscriber()
self.qreader = QReader()
self.spin_thread = None
self.running = True
def run(self):
self.spin_thread = Thread(target=self._spin)
self.spin_thread.start()
def _spin(self):
try:
while self.running:
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
except Exception as e:
print(f"Spin thread error: {e}")
def destroy(self):
self.running = False
if self.spin_thread:
self.spin_thread.join()
self.image_subscriber.destroy_node()
rclpy.shutdown()
def get_current_image(self):
return self.image_subscriber.cv_image
def decode_qrcode(self, img = None):
if img is None:
img = self.get_current_image()
decoded_info = self.qreader.detect_and_decode(image=img)
return decoded_info[0]
""" DEBUG """
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
try:
rclpy.spin(image_subscriber)
except KeyboardInterrupt:
pass
# 清理
image_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()