- 修改 move_to_hori_line 函数中的超时设置和停止命令发送方式 - 在主程序中初始化和销毁 ROS 2 上下文 - 增加校准相关变量和逻辑,提高定位精度 - 优化 ImageProcessor 和 MarkerRunner 类的实现
93 lines
2.5 KiB
Python
93 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
from sensor_msgs.msg import Image
|
||
from cv_bridge import CvBridge
|
||
import cv2
|
||
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||
from qreader import QReader
|
||
from threading import Thread
|
||
|
||
|
||
class ImageSubscriber(Node):
|
||
def __init__(self):
|
||
super().__init__('image_subscriber')
|
||
# 定义 QoS 配置(匹配发布者的可靠性策略)
|
||
qos_profile = QoSProfile(
|
||
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT,取决于发布者
|
||
history=QoSHistoryPolicy.KEEP_LAST,
|
||
depth=10
|
||
)
|
||
|
||
self.subscription = self.create_subscription(
|
||
Image,
|
||
'/rgb_camera/image_raw',
|
||
self.image_callback,
|
||
qos_profile=qos_profile
|
||
)
|
||
self.subscription # 防止未使用变量警告
|
||
self.bridge = CvBridge()
|
||
self.cv_image = None # Store latest image
|
||
|
||
def image_callback(self, msg):
|
||
try:
|
||
# 将ROS图像消息转换为OpenCV格式
|
||
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
||
|
||
except Exception as e:
|
||
self.get_logger().error('Failed to convert image: %s' % str(e))
|
||
|
||
|
||
class ImageProcessor:
|
||
def __init__(self):
|
||
self.image_subscriber = ImageSubscriber()
|
||
self.qreader = QReader()
|
||
self.spin_thread = None
|
||
self.running = True
|
||
|
||
def run(self):
|
||
self.spin_thread = Thread(target=self._spin)
|
||
self.spin_thread.start()
|
||
|
||
def _spin(self):
|
||
try:
|
||
while self.running:
|
||
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
|
||
except Exception as e:
|
||
print(f"Spin thread error: {e}")
|
||
|
||
def destroy(self):
|
||
self.running = False
|
||
if self.spin_thread:
|
||
self.spin_thread.join()
|
||
self.image_subscriber.destroy_node()
|
||
|
||
def get_current_image(self):
|
||
return self.image_subscriber.cv_image
|
||
|
||
def decode_qrcode(self, img = None):
|
||
if img is None:
|
||
img = self.get_current_image()
|
||
decoded_info = self.qreader.detect_and_decode(image=img)
|
||
return decoded_info[0]
|
||
|
||
|
||
""" DEBUG """
|
||
|
||
def main(args=None):
|
||
rclpy.init(args=args)
|
||
image_subscriber = ImageSubscriber()
|
||
|
||
try:
|
||
rclpy.spin(image_subscriber)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
|
||
# 清理
|
||
image_subscriber.destroy_node()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|