mi-task/utils/image_raw.py
2025-08-20 23:24:08 +08:00

469 lines
16 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# //real相机只支持左目的depth
#ros2 launch realsense2_camera on_dog.py
#ros2 lifecycle set /camera/camera configure
#ros2 lifecycle set /camera/camera activate
#/camera/infra1/image_rect_raw左目图像
#/camera/infra2/image_rect_raw右目图像
#/camera/depth/image_rect_raw深度图
#/camera/imuIMU数据
# //启动stereo_camera
# ros2 run camera_test stereo_camera或者launch
# ros2 lifecycle set /stereo_camera configure
# ros2 lifecycle set /stereo_camera activate
# 启动AI相机
# ros2 run camera_test camera_server
#/image_left
#/image_right
#/image_rgb
# //AI相机ai相机的图像可以找到real相机对应的深度值
#/image
#########################################################################################
#/camera/camera_align节点启动指令
#ros2 launch realsense2_camera realsense_align_node.launch.py
#ros2 lifecycle set /camera/camera_align configure
#ros2 lifecycle set /camera/camera_align activate
#视觉建图
#1.启动slam
#source /opt/ros2/cyberdog/setup.bash
#ros2 launch laser_slam mapping_launch.py
#ros2 lifecycle set /map_builder 1
#ros2 lifecycle set /map_builder 3
#2.启动建图(另一个终端)
#source /opt/ros2/cyberdog/setup.bash
#ros2 service call /start_mapping std_srvs/srv/SetBool data:\ true
#保存地图和pose信息
#ros2 service call /stop_mapping visualization/srv/Stop "{finish: true, map_name: 'map'}"
#雷达修改
#sudo systemctl disable nvgetty.service
#重定位程序
#ros2 launch laser_slam localization_node.py
#ros2 lifecycle set /localization_node 1
#ros2 lifecycle set /localization_node 3
#另一个终端
#ros2 service call /start_location std_srvs/srv/SetBool "{data: true}"
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import os
from threading import Thread, Lock
import time
import queue
from datetime import datetime
from utils.log_helper import get_logger
# 导入AI相机服务
from protocol.srv import CameraService
# qrcode
from pyzbar import pyzbar
class ImageSubscriber(Node):
def __init__(self):
super().__init__('ai_camera_demo')
# 初始化安全控制标志
self._shutdown_flag = False
self._ros_initialized = True
# 创建服务客户端
self.camera_client = self.create_client(CameraService, '/camera_service')
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待AI相机服务...')
# 图像订阅
self.image_sub = self.create_subscription(
Image, '/image_rgb', self.image_callback_rgb, 10
)
self.image_sub_left = self.create_subscription(
Image, '/image_left', self.image_callback_left, 10
)
self.image_sub_right = self.create_subscription(
Image, '/image_right', self.image_callback_right, 10
)
self.image_sub_ai= self.create_subscription(
Image, '/image', self.image_callback_ai, 10
)
# 工具初始化
self.bridge = CvBridge()
self.cv_image_rgb = None
self.cv_image_left = None
self.cv_image_right = None
self.cv_image_ai = None
self.camera_started = False
# 截图控制参数
self.last_save_time = {
'rgb': time.time(),
'left': time.time(),
'right': time.time(),
'ai': time.time()
}
self.start_time = time.time()
self.save_interval = 0.5
self.total_duration = 1.0
self.save_dir = "saved_images"
os.makedirs(self.save_dir, exist_ok=True)
# 安全启动相机
self.start_camera()
def start_camera(self):
"""安全启动相机"""
if not self._ros_initialized:
return False
req = CameraService.Request()
req.command = 9
req.width, req.height, req.fps = 640, 480, 30
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
self.get_logger().info(f'服务返回: [code={result.result}, msg="{result.msg}"]')
if result.result == 0:
self.get_logger().info('相机启动成功')
self.camera_started = True
return True
else:
self.get_logger().error(f'启动失败 (错误码 {result.result})')
return False
def stop_camera(self):
"""安全停止相机"""
if not self.camera_started or not self._ros_initialized:
return True
req = CameraService.Request()
req.command = 10
try:
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
if future.result().result == 0:
self.get_logger().info('相机已停止')
self.camera_started = False
return True
else:
self.get_logger().error(f'停止失败: {future.result().msg}')
return False
except Exception as e:
self.get_logger().error(f'停止异常: {str(e)}')
return False
def save_image(self, image, prefix):
"""改进的保存方法"""
if image is None:
return False
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"{self.save_dir}/{prefix}_{timestamp}.jpg"
cv2.imwrite(filename, image)
# self.get_logger().info(f"已保存 {prefix} 图像: {filename}")
return True
except Exception as e:
self.get_logger().error(f"保存{prefix}图像失败: {str(e)}")
return False
def image_callback_rgb(self, msg):
# """安全的图像回调"""
# if getattr(self, '_shutdown_flag', False):
# self.get_logger().warning('image_callback, shutdown')
# print('image_callback, shutdown')
# return
# current_time = time.time()
try:
self.cv_image_rgb = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if time.time() - self.last_save_time['rgb'] >= self.save_interval:
# self.save_image(self.cv_image_rgb, 'rgb')
self.last_save_time['rgb'] = time.time()
except Exception as e:
self.get_logger().error(f"RGB图像处理错误: {str(e)}")
def image_callback_left(self, msg):
try:
self.cv_image_left = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if time.time() - self.last_save_time['left'] >= self.save_interval:
# self.save_image(self.cv_image_left, 'left')
self.last_save_time['left'] = time.time()
except Exception as e:
self.get_logger().error(f"左图像处理错误: {str(e)}")
def image_callback_right(self, msg):
try:
self.cv_image_right = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if time.time() - self.last_save_time['right'] >= self.save_interval:
# self.save_image(self.cv_image_right, 'right')
self.last_save_time['right'] = time.time()
except Exception as e:
self.get_logger().error(f"右图像处理错误: {str(e)}")
def image_callback_ai(self, msg):
try:
self.cv_image_ai = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if time.time() - self.last_save_time['ai'] >= self.save_interval:
# self.save_image(self.cv_image_ai, 'ai')
self.last_save_time['ai'] = time.time()
except Exception as e:
self.get_logger().error(f"ai图像处理错误: {str(e)}")
def safe_spin(self):
"""安全spin循环"""
while rclpy.ok() and not getattr(self, '_shutdown_flag', False):
rclpy.spin_once(self, timeout_sec=0.1)
def destroy(self):
"""安全销毁节点"""
try:
# 1. 停止相机服务
if getattr(self, 'camera_started', False):
self.stop_camera()
# 2. 设置关闭标志
self._shutdown_flag = True
# 3. 销毁节点如果ROS上下文有效
if hasattr(self, '_ros_initialized') and self._ros_initialized:
if rclpy.ok():
self.destroy_node()
# 4. 清理其他资源
if hasattr(self, 'bridge'):
del self.bridge
if hasattr(self, 'cv_image_rgb'):
del self.cv_image_rgb
if hasattr(self, 'cv_image_left'):
del self.cv_image_left
if hasattr(self, 'cv_image_right'):
del self.cv_image_right
if hasattr(self, 'cv_image_ai'):
del self.cv_image_ai
except Exception as e:
self.get_logger().error(f'销毁节点时出错: {str(e)}')
finally:
# 5. 确保标记为未初始化
self._ros_initialized = False
def get_latest_image(self,text):
"""外部获取最新图像接口"""
return self.cv_image.copy() if self.cv_image is not None else None
class ImageProcessor:
def __init__(self):
self.image_subscriber = ImageSubscriber()
self.spin_thread = None
self.running = True
self.log = get_logger("图像处理器")
self.log = get_logger("图像处理器")
# 异步 QR 码扫描相关
self.scan_thread = None
self.image_queue = queue.Queue(maxsize=3) # 限制队列大小,只保留最新的图像
self.scan_lock = Lock()
self.last_qr_result = None
self.last_qr_time = 0
self.is_scanning = False
self.enable_async_scan = False
def run(self):
self.spin_thread = Thread(target=self._spin)
self.spin_thread.start()
def _spin(self):
try:
while self.running:
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
except Exception as e:
# self.log.error(f"Spin线程错误: {e}", "错误")
print('threat wrong')
# def destroy(self):
# self.running = False
# self.stop_async_scan()
# if self.spin_thread:
# self.spin_thread.join()
# # 停止AI相机
# self.image_subscriber.stop_camera()
# self.image_subscriber.destroy_node()
def destroy(self):
self._shutdown_flag = True
if self.spin_thread:
self.spin_thread.join(timeout=1.0)
self.image_subscriber.stop_camera()
self.image_subscriber.destroy_node()
rclpy.shutdown()
self._ros_initialized = False
def get_current_image(self, text = 'rgb'):
# INFO 默认返回 rgb 图像
if text=='ai':
return self.image_subscriber.cv_image_ai
elif text=='left':
return self.image_subscriber.cv_image_left
elif text=='right':
return self.image_subscriber.cv_image_right
else: # text=='rgb':
return self.image_subscriber.cv_image_rgb
def decode_all_qrcodes(self, img=None):
"""使用pyzbar解码QR码"""
if img is None:
img = self.get_current_image('ai')
if img is None:
return None
try:
# 使用pyzbar解码所有条形码/QR码
decoded_objects = pyzbar.decode(img)
if decoded_objects:
# 返回第一个QR码的数据解码为字符串
qr_data = decoded_objects[0].data.decode('utf-8')
return qr_data
return None
except Exception as e:
print(f"QR码解码错误: {str(e)}")
return None
# def decode_all_qrcodes(self, img=None):
# """解码图像中的所有QR码/条形码"""
# if img is None:
# img = self.get_current_image()
# if img is None:
# return []
# try:
# decoded_objects = pyzbar.decode(img)
# results = []
# for obj in decoded_objects:
# result = {
# 'data': obj.data.decode('utf-8'),
# 'type': obj.type,
# 'rect': obj.rect, # 位置信息 (x, y, width, height)
# 'polygon': obj.polygon # 多边形顶点
# }
# results.append(result)
# return results
# except Exception as e:
# print(f"批量QR码解码错误: {str(e)}")
# return []
def start_async_scan(self, interval=0.3):
"""
启动异步 QR 码扫描
参数:
interval: 扫描间隔,单位秒
"""
if self.scan_thread is not None and self.scan_thread.is_alive():
self.log.warning("异步扫描已经在运行中", "警告")
print('scan,warn')
return
self.enable_async_scan = True
self.is_scanning = False
self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,))
self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束
self.scan_thread.start()
self.log.info("启动异步 QR 码扫描线程", "启动")
print('start')
def stop_async_scan(self):
"""停止异步 QR 码扫描"""
self.enable_async_scan = False
if self.scan_thread and self.scan_thread.is_alive():
self.scan_thread.join(timeout=1.0)
self.log.info("异步 QR 码扫描线程已停止", "停止")
print('stop')
def _async_scan_worker(self, interval):
"""异步扫描工作线程"""
last_scan_time = 0
while self.enable_async_scan and self.running:
current_time = time.time()
# 按指定间隔扫描
if current_time - last_scan_time >= interval:
img = self.get_current_image('ai')
if img is not None:
try:
self.is_scanning = True
qr_data = self.decode_all_qrcodes(img)
print(qr_data)
self.is_scanning = False
with self.scan_lock:
if qr_data:
self.last_qr_result = qr_data
self.last_qr_time = current_time
self.log.success(f"异步扫描到 QR 码: {qr_data}", "扫描")
print(f"异步扫描到 QR 码: {qr_data}")
except Exception as e:
self.is_scanning = False
self.log.error(f"异步 QR 码扫描出错: {e}", "错误")
print(f"异步 QR 码扫描出错: {e}")
else:
print('no img')
last_scan_time = current_time
# 短暂休眠避免占用过多 CPU
time.sleep(0.05)
def get_last_qr_result(self):
"""获取最后一次成功扫描的 QR 码结果"""
with self.scan_lock:
return self.last_qr_result, self.last_qr_time
""" DEBUG """
# def main(args=None):
# rclpy.init(args=args)
# image_subscriber = ImageSubscriber()
# try:
# rclpy.spin(image_subscriber)
# except KeyboardInterrupt:
# pass
# # 清理
# image_subscriber.stop_camera()
# image_subscriber.destroy_node()
# if __name__ == '__main__':
# main()