mi-task/utils/image_raw.py

93 lines
2.5 KiB
Python
Raw Normal View History

2025-05-11 15:44:54 +00:00
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from qreader import QReader
from threading import Thread
2025-05-11 15:44:54 +00:00
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
# 定义 QoS 配置(匹配发布者的可靠性策略)
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT取决于发布者
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
self.subscription = self.create_subscription(
Image,
'/rgb_camera/image_raw',
self.image_callback,
qos_profile=qos_profile
)
self.subscription # 防止未使用变量警告
self.bridge = CvBridge()
self.cv_image = None # Store latest image
def image_callback(self, msg):
try:
# 将ROS图像消息转换为OpenCV格式
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
2025-05-11 15:44:54 +00:00
class ImageProcessor:
def __init__(self):
self.image_subscriber = ImageSubscriber()
self.qreader = QReader()
self.spin_thread = None
self.running = True
2025-05-11 15:44:54 +00:00
def run(self):
self.spin_thread = Thread(target=self._spin)
self.spin_thread.start()
def _spin(self):
2025-05-11 15:44:54 +00:00
try:
while self.running:
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
except Exception as e:
print(f"Spin thread error: {e}")
def destroy(self):
self.running = False
if self.spin_thread:
self.spin_thread.join()
self.image_subscriber.destroy_node()
2025-05-11 15:44:54 +00:00
def get_current_image(self):
return self.image_subscriber.cv_image
def decode_qrcode(self, img = None):
if img is None:
img = self.get_current_image()
decoded_info = self.qreader.detect_and_decode(image=img)
return decoded_info[0]
2025-05-11 15:44:54 +00:00
""" DEBUG """
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
try:
rclpy.spin(image_subscriber)
except KeyboardInterrupt:
pass
# 清理
image_subscriber.destroy_node()
if __name__ == '__main__':
main()