Add qrcode and ai camera demo

This commit is contained in:
hav 2025-08-18 20:34:03 +08:00
parent cd0ac2c447
commit d2eb82056f
11 changed files with 1106 additions and 161 deletions

View File

@ -42,6 +42,8 @@ def main():
# Ctrl.base_msg.stop()
# # GO STRAIGHT TEST
# go_straight(Ctrl,msg,distance=1)
# turn_degree_v2(Ctrl,msg,degree=90)
align_to_horizontal_line(Ctrl, msg, observe=False, max_attempts=3, detect_func_version=1)

24
main.py
View File

@ -10,14 +10,16 @@ import os
import time
from threading import Thread, Lock
import sys
import string
import rclpy
from utils.robot_control_cmd_lcmt import robot_control_cmd_lcmt
from utils.robot_control_response_lcmt import robot_control_response_lcmt
from utils.localization_lcmt import localization_lcmt
from utils.image_raw import ImageProcessor
from utils.base_msg import BaseMsg
# from utils.image_raw import ImageProcessor
from utils.speech_demo import speak
# from utils.marker_client import MarkerRunner
from task_1.task_1 import run_task_1
@ -29,13 +31,11 @@ from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back
from task_4.task_4 import run_task_4, run_task_4_back
from task_test.task_left_line import run_task_test
from task_5.task_5 import run_task_5
from base_move.turn_degree import turn_degree, turn_degree_v2
from base_move.center_on_dual_tracks import center_on_dual_tracks
from base_move.go_to_xy import go_to_x_v2, go_to_y_v2
from base_move.go_straight import go_straight
from task_4.pass_bar import pass_bar
from utils.log_helper import info
pass_marker = True
@ -50,13 +50,13 @@ def main():
try:
info("Recovery stand", "info")
Ctrl.base_msg.stand_up()
Ctrl.base_msg.stop() # BUG 垃圾指令 for eat
pass_bar(Ctrl, msg)
Ctrl.base_msg.stand_up()
print('yuyin')
speak('nihao')
# Ctrl.base_msg.stop() # BUG 垃圾指令 for eat
# time.sleep(100) # TEST,
# run_task_1(Ctrl, msg, time_sleep=TIME_SLEEP)
# arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
@ -107,7 +107,7 @@ class Robot_Ctrl(object):
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
# self.image_processor = ImageProcessor()
self.image_processor = ImageProcessor()
# DEBUG
# self.marker_runner = MarkerRunner(pass_flag=pass_marker)
self.send_lock = Lock()
@ -128,7 +128,7 @@ class Robot_Ctrl(object):
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start()
# self.image_processor.run()
self.image_processor.run()
# self.marker_runner.run()
def msg_handler(self, channel, data):
@ -214,7 +214,7 @@ class Robot_Ctrl(object):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
# self.image_processor.destroy()
self.image_processor.destroy()
# 销毁 MarkerRunner
if hasattr(self, 'marker_runner') and self.marker_runner is not None:
try:

View File

@ -1,123 +1,117 @@
#!/usr/bin/env python3
#ros2 run camera_test camera_server
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from protocol.srv import CameraService
import time
from PIL import Image as PILImage # 用于保存图像
import cv2
import os
import time
from datetime import datetime
# 配置显示环境(关键修改)
os.environ['OPENCV_VIDEOIO_PRIORITY_GTK'] = '1'
os.environ['DISPLAY'] = ':0' # 使用主显示器
class AICameraDemo(Node):
def __init__(self):
super().__init__('ai_camera_demo')
# 创建AI相机服务客户端
self.camera_client = self.create_client(
CameraService,
'/camera_service'
)
# 等待服务可用
# 创建服务客户端
self.camera_client = self.create_client(CameraService, '/camera_service')
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('AI相机服务不可用,等待中...')
self.get_logger().info('等待AI相机服务...')
# 订阅AI相机图像话题
# 图像订阅
self.image_subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10
Image, '/image', self.image_callback, 10
)
# 存储最新图像
self.latest_image = None
self.start_time = time.time() # 记录启动时间
self.saved = False # 标记是否已保存
# 存储最新图像
self.latest_image = None
# 截图控制参数
self.last_capture_time = time.time()
self.start_time = time.time()
self.window_name = 'AI Camera Feed' # OpenCV窗口名称
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL) # 创建显示窗口
self.capture_interval = 3.0 # 截图间隔(秒)
self.total_duration = 30.0 # 总运行时间(秒)
self.save_dir = "captures" # 截图保存目录
os.makedirs(self.save_dir, exist_ok=True)
# 初始化OpenCV窗口
self.window_name = 'AI Camera Feed'
try:
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
self.gui_enabled = True
except:
self.get_logger().error("无法创建OpenCV窗口将仅保存截图")
self.gui_enabled = False
def start_camera(self):
"""启动AI相机"""
"""启动相机服务"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width = 640 # 图像宽度
req.height = 480 # 图像高度
req.fps = 30 # 帧率
req.width, req.height, req.fps = 640, 480, 30
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已启动')
return True
else:
self.get_logger().error(f'启动AI相机失败: {future.result().msg}')
return False
return future.result().result == 0
def stop_camera(self):
"""停止AI相机"""
"""停止相机服务"""
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已停止')
return True
else:
self.get_logger().error(f'停止AI相机失败: {future.result().msg}')
return False
return future.result().result == 0
def image_callback(self, msg):
"""处理bgr8和rgb8格式的图像"""
"""处理图像并定时截图"""
current_time = time.time()
elapsed = current_time - self.start_time
if not self.saved and elapsed >= 3.0:
self.latest_image = msg
self.get_logger().info('收到AI相机图像准备保存...')
# 支持rgb8和bgr8格式
# 超过总时长则停止
if elapsed >= self.total_duration:
self.get_logger().info("已完成30秒截图任务")
raise KeyboardInterrupt
try:
# 转换图像格式
if msg.encoding in ['rgb8', 'bgr8']:
# 转换为numpy数组
image_data = np.frombuffer(msg.data, dtype=np.uint8).reshape(
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
# 如果是bgr格式转换为rgb
if msg.encoding == 'bgr8':
image_data = image_data[:, :, [2, 1, 0]] # BGR -> RGB
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# 定时截图逻辑
if current_time - self.last_capture_time >= self.capture_interval:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/capture_{timestamp}.jpg"
cv2.imwrite(filename, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
self.get_logger().info(f"已保存截图: {filename}")
self.last_capture_time = current_time
else:
self.get_logger().warn(f'不支持的图像格式: {msg.encoding}')
cv2.imshow(self.window_name, image_data)
cv2.waitKey(1) # 必须调用,否则窗口不会更新
# 实时显示
if self.gui_enabled:
cv2.imshow(self.window_name, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
except Exception as e:
self.get_logger().error(f"图像处理错误: {str(e)}")
def main(args=None):
rclpy.init(args=args)
demo = AICameraDemo()
try:
# 启动AI相机
if demo.start_camera():
# 运行节点,持续接收图像
demo.get_logger().info("开始截图任务 (每3秒截图持续30秒)")
rclpy.spin(demo)
except KeyboardInterrupt:
demo.get_logger().info('用户中断')
demo.get_logger().info("手动终止")
finally:
# 停止AI相机
demo.stop_camera()
cv2.destroyAllWindows()
demo.destroy_node()
rclpy.shutdown()

View File

@ -0,0 +1,266 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from protocol.srv import CameraService
import cv2
import os
import sys
import time
from datetime import datetime
from pyzbar import pyzbar
# 添加utils路径以便导入image_raw模块
sys.path.append('/home/mi-task')
from utils.image_raw import ImageProcessor
# 配置显示环境(关键修改)
os.environ['OPENCV_VIDEOIO_PRIORITY_GTK'] = '1'
os.environ['DISPLAY'] = ':0' # 使用主显示器
class AICameraDemo(Node):
def __init__(self):
super().__init__('ai_camera_demo')
# 创建服务客户端
self.camera_client = self.create_client(CameraService, '/camera_service')
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待AI相机服务...')
# 图像订阅
self.image_subscription = self.create_subscription(
Image, '/image', self.image_callback, 10
)
# 截图控制参数
self.last_capture_time = time.time()
self.start_time = time.time()
self.capture_interval = 3.0 # 截图间隔(秒)
self.total_duration = 30.0 # 总运行时间(秒)
self.save_dir = "captures" # 普通截图保存目录
self.qr_save_dir = "qr_captures" # QR码截图保存目录
os.makedirs(self.save_dir, exist_ok=True)
os.makedirs(self.qr_save_dir, exist_ok=True)
# QR码扫描相关
self.last_qr_check_time = time.time()
self.qr_check_interval = 1.0 # QR码检测间隔
self.found_qr_codes = set() # 记录已发现的QR码避免重复输出
# 初始化OpenCV窗口
self.window_name = 'AI Camera Feed'
try:
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
self.gui_enabled = True
except:
self.get_logger().error("无法创建OpenCV窗口将仅保存截图")
self.gui_enabled = False
# 初始化图像处理器用于QR码扫描
self.current_image = None
def start_camera(self):
"""启动相机服务"""
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width, req.height, req.fps = 640, 480, 30
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
if result.result == 0:
self.get_logger().info('AI相机启动成功')
return True
else:
self.get_logger().error(f'AI相机启动失败 (错误码: {result.result})')
return False
def stop_camera(self):
"""停止相机服务"""
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
if result.result == 0:
self.get_logger().info('AI相机已停止')
return True
else:
self.get_logger().error(f'停止AI相机失败: {result.msg}')
return False
def detect_qr_codes(self, img):
"""使用pyzbar检测QR码"""
try:
# 检测所有条形码/QR码
decoded_objects = pyzbar.decode(img)
results = []
for obj in decoded_objects:
qr_data = obj.data.decode('utf-8')
qr_type = obj.type
# 在图像上绘制检测框
points = obj.polygon
if len(points) > 4:
hull = cv2.convexHull(np.array([point for point in points], dtype=np.float32))
cv2.polylines(img, [np.int32(hull)], True, (0, 255, 0), 3)
else:
cv2.polylines(img, [np.int32(points)], True, (0, 255, 0), 3)
# 添加文本标签
x = obj.rect.left
y = obj.rect.top - 10
cv2.putText(img, f'{qr_type}: {qr_data[:20]}...', (x, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
results.append({
'data': qr_data,
'type': qr_type,
'rect': obj.rect
})
return results, img
except Exception as e:
self.get_logger().error(f"QR码检测错误: {str(e)}")
return [], img
def image_callback(self, msg):
"""处理图像并定时截图"""
current_time = time.time()
elapsed = current_time - self.start_time
# 超过总时长则停止
if elapsed >= self.total_duration:
self.get_logger().info("已完成30秒截图任务")
raise KeyboardInterrupt
try:
# 转换图像格式
if msg.encoding in ['rgb8', 'bgr8']:
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
if msg.encoding == 'rgb8':
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 保存当前图像用于QR码检测
self.current_image = img.copy()
# QR码检测定时进行避免影响性能
if current_time - self.last_qr_check_time >= self.qr_check_interval:
qr_results, img_with_overlay = self.detect_qr_codes(img.copy())
# 处理检测到的QR码
for qr in qr_results:
qr_data = qr['data']
if qr_data not in self.found_qr_codes:
self.found_qr_codes.add(qr_data)
self.get_logger().info(f"🔍 检测到 {qr['type']}: {qr_data}")
# 保存包含QR码的特殊截图到专用文件夹
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
qr_filename = f"{self.qr_save_dir}/qr_detected_{timestamp}.jpg"
cv2.imwrite(qr_filename, img_with_overlay)
self.get_logger().info(f"📷 QR码检测截图已保存: {qr_filename}")
self.last_qr_check_time = current_time
# 如果检测到QR码显示带标注的图像
if qr_results and self.gui_enabled:
cv2.imshow(self.window_name, img_with_overlay)
elif self.gui_enabled:
cv2.imshow(self.window_name, img)
else:
# 正常显示
if self.gui_enabled:
cv2.imshow(self.window_name, img)
# 定时截图逻辑
if current_time - self.last_capture_time >= self.capture_interval:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/capture_{timestamp}.jpg"
cv2.imwrite(filename, img)
self.get_logger().info(f"📸 定时截图已保存: {filename}")
self.last_capture_time = current_time
# OpenCV窗口事件处理
if self.gui_enabled:
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # 按'q'退出
raise KeyboardInterrupt
elif key == ord('s'): # 按's'手动截图
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/manual_{timestamp}.jpg"
cv2.imwrite(filename, img)
self.get_logger().info(f"📱 手动截图已保存: {filename}")
except Exception as e:
self.get_logger().error(f"图像处理错误: {str(e)}")
def print_summary(self):
"""打印运行总结"""
self.get_logger().info("=" * 50)
self.get_logger().info("📊 运行总结:")
self.get_logger().info(f"🕐 总运行时间: {self.total_duration}")
self.get_logger().info(f"📷 截图间隔: {self.capture_interval}")
self.get_logger().info(f"🔍 发现QR码数量: {len(self.found_qr_codes)}")
if self.found_qr_codes:
self.get_logger().info("📋 发现的QR码:")
for i, qr_data in enumerate(self.found_qr_codes, 1):
self.get_logger().info(f" {i}. {qr_data}")
# 统计保存的文件
if os.path.exists(self.save_dir):
capture_files = os.listdir(self.save_dir)
self.get_logger().info(f"💾 普通截图文件数: {len(capture_files)}")
if os.path.exists(self.qr_save_dir):
qr_files = os.listdir(self.qr_save_dir)
self.get_logger().info(f"🔍 QR码截图文件数: {len(qr_files)}")
self.get_logger().info("=" * 50)
def main(args=None):
rclpy.init(args=args)
demo = AICameraDemo()
try:
if demo.start_camera():
demo.get_logger().info("🚀 开始AI相机演示...")
demo.get_logger().info("📋 功能说明:")
demo.get_logger().info(" - 每3秒自动截图")
demo.get_logger().info(" - 实时检测QR码/条形码")
demo.get_logger().info(" - 按 's' 手动截图")
demo.get_logger().info(" - 按 'q' 退出程序")
demo.get_logger().info(" - 程序将在30秒后自动结束")
demo.get_logger().info("-" * 40)
rclpy.spin(demo)
else:
demo.get_logger().error("❌ 无法启动AI相机")
except KeyboardInterrupt:
demo.get_logger().info("🛑 程序被用户终止")
except Exception as e:
demo.get_logger().error(f"❌ 运行错误: {str(e)}")
finally:
# 打印总结
demo.print_summary()
# 清理资源
demo.stop_camera()
if demo.gui_enabled:
cv2.destroyAllWindows()
demo.destroy_node()
rclpy.shutdown()
demo.get_logger().info("🏁 程序已安全退出")
if __name__ == '__main__':
main()

199
test/demo_test/react_demo.py Executable file
View File

@ -0,0 +1,199 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, SingleTofPayload
from std_msgs.msg import Header
import time
from enum import Enum
import numpy as np
# TOF传感器位置枚举 (与协议一致)
class TofPosition(Enum):
LEFT_HEAD = 0
RIGHT_HEAD = 1
LEFT_REAR = 2
RIGHT_REAR = 3
HEAD = 4
REAR = 5
# 机器状态枚举
class RobotState(Enum):
PREPARING_LOADING = 1 # 准备配货状态
TRANSPORTING = 2 # 运输中状态
PREPARING_UNLOADING = 3 # 准备卸货状态
class AdvancedTofInteractive(Node):
def __init__(self):
super().__init__('advanced_tof_interactive')
self.get_logger().info("高级TOF交互节点启动")
# 机器狗当前状态 (初始为准备配货状态)
self.current_state = RobotState.PREPARING_LOADING
# TOF检测参数
self.touch_threshold = 0.3 # 触摸检测阈值(米)
self.touch_percentage = 0.6 # 有效触摸点比例
self.cooldown_time = 2.0 # 触摸事件冷却时间(秒)
self.last_detection_time = 0.0
self.min_valid_distance = 0.15 # 最小有效距离(150mm)
self.max_valid_distance = 0.66 # 最大有效距离(660mm)
# 触摸模式检测参数
self.touch_history = {"left": False, "right": False}
self.touch_duration_threshold = 0.5 # 持续触摸时间阈值(秒)
self.touch_start_time = {"left": 0.0, "right": 0.0}
# 创建TOF数据订阅器
self.head_tof_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.head_tof_callback,
10
)
# 创建状态提示发布器
self.state_pub = self.create_publisher(
SingleTofPayload, # 使用TOF消息作为载体
'interactive_state',
10
)
# 状态提示消息
self.state_messages = {
RobotState.PREPARING_LOADING: "准备配货:请同时触摸头部两侧完成配货",
RobotState.PREPARING_UNLOADING: "准备卸货:请同时触摸头部两侧完成卸货",
RobotState.TRANSPORTING: "运输中:请勿触摸"
}
def process_tof_data(self, tof_data):
"""
处理单个TOF传感器的8x8数据矩阵
返回:
valid_matrix: 有效数据点矩阵 (True/False)
mean_distance: 平均距离
touch_detected: 是否检测到触摸
"""
# 转换为numpy数组
data_array = np.array(tof_data.data).reshape(8, 8)
# 创建有效数据掩码
valid_mask = (data_array > self.min_valid_distance) & (data_array < self.max_valid_distance)
# 计算有效点的平均距离
if np.any(valid_mask):
mean_distance = np.mean(data_array[valid_mask])
else:
mean_distance = self.max_valid_distance
# 检测触摸
touch_mask = (data_array < self.touch_threshold) & valid_mask
touch_points = np.sum(touch_mask)
touch_detected = (touch_points / valid_mask.size) >= self.touch_percentage
return valid_mask, mean_distance, touch_detected
def head_tof_callback(self, msg):
"""处理头部TOF数据回调"""
current_time = time.time()
# 检查冷却时间
if current_time - self.last_detection_time < self.cooldown_time:
return
# 处理左侧TOF数据
left_valid, left_mean, left_touch = self.process_tof_data(msg.left_head)
# 处理右侧TOF数据
right_valid, right_mean, right_touch = self.process_tof_data(msg.right_head)
# 更新触摸历史
self.update_touch_history('left', left_touch, current_time)
self.update_touch_history('right', right_touch, current_time)
# 检测同时触摸事件
if self.touch_history['left'] and self.touch_history['right']:
self.get_logger().info(f"检测到同时触摸: 左={left_mean:.3f}m, 右={right_mean:.3f}m")
self.last_detection_time = current_time
self.handle_interaction()
def update_touch_history(self, side, current_touch, current_time):
"""更新触摸历史状态"""
if current_touch:
if not self.touch_history[side]:
# 触摸开始
self.touch_start_time[side] = current_time
self.touch_history[side] = True
else:
# 检测触摸持续时间是否达到阈值
if self.touch_history[side]:
duration = current_time - self.touch_start_time[side]
if duration < self.touch_duration_threshold:
self.get_logger().debug(f"{side}触摸时间不足: {duration:.2f}s")
self.touch_history[side] = False
def handle_interaction(self):
"""处理交互事件"""
# 创建状态消息
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.data_available = True
if self.current_state == RobotState.PREPARING_LOADING:
self.get_logger().info("✅ 配货完成!开始运输...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [1.0] # 表示配货完成
self.current_state = RobotState.TRANSPORTING
elif self.current_state == RobotState.PREPARING_UNLOADING:
self.get_logger().info("✅ 卸货完成!准备新的配货...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [2.0] # 表示卸货完成
self.current_state = RobotState.PREPARING_LOADING
else:
self.get_logger().info("⚠️ 当前状态不接收触摸指令")
state_msg.data_available = False
# 发布状态消息
self.state_pub.publish(state_msg)
self.play_audio_feedback()
def play_audio_feedback(self):
"""播放音频反馈"""
# 实际实现需调用机器狗音频接口
# 示例self.audio_publisher.publish(audio_msg)
self.get_logger().info("播放操作确认音效")
def publish_state_info(self):
"""定期发布状态信息"""
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data_available = True
if self.current_state in self.state_messages:
state_str = self.state_messages[self.current_state]
self.get_logger().info(state_str)
# 将状态消息编码到data字段
state_msg.data = [float(self.current_state.value)]
else:
state_msg.data_available = False
self.state_pub.publish(state_msg)
def main(args=None):
rclpy.init(args=args)
node = AdvancedTofInteractive()
try:
# 定期发布状态信息
timer = node.create_timer(3.0, node.publish_state_info)
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -61,6 +61,18 @@ class AudioDemo(Node):
else:
self.get_logger().error('语音播放失败')
def speak(string x):
demo = AudioDemo()
# 步骤1: 启用语音功能
if demo.enable_audio():
# 步骤2: 播放语音
demo.play_audio(x)
# 短暂延迟确保完成
time.sleep(2)
demo.destroy_node()
def main(args=None):
rclpy.init(args=args)
demo = AudioDemo()

254
tmp/interdemo3.py Normal file
View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
# 导入ROS2消息类型
from protocol.msg import HeadTofPayload, MotionStatus, AudioPlayExtend
from protocol.srv import MotionResultCmd
from std_srvs.srv import SetBool
# 假设导入传感器API模块
# 注意实际使用时需要根据真实的API模块路径进行导入
from tof_sensor import TofSensor # 假设的TOF传感器API类
class DeliveryInteraction(Node):
def __init__(self):
super().__init__('delivery_interaction_node')
# 配置参数
self.obstacle_distance = 0.3 # 检测障碍物的距离阈值(米)
self.obstacle_percentage = 0.8 # 检测到障碍物的比例阈值(80%)
self.sensitivity_s = 1.0 # 检测灵敏度(秒)
# 状态变量
self.current_area = 0 # 0:无区域 1:配货区 2:卸货区
self.head_left_detected = False
self.head_right_detected = False
self.motion_id = 0
self.old_motion_id = 0
self.sensor_initialized = False
# 初始化TOF传感器
self.tof_sensor = TofSensor()
self.init_sensor()
# 创建回调组
tof_cb_group = MutuallyExclusiveCallbackGroup()
motion_cb_group = MutuallyExclusiveCallbackGroup()
service_cb_group = MutuallyExclusiveCallbackGroup()
# 订阅TOF数据
self.tof_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.tof_callback,
10,
callback_group=tof_cb_group
)
# 订阅运动状态
self.motion_sub = self.create_subscription(
MotionStatus,
'motion_status',
self.motion_callback,
10,
callback_group=motion_cb_group
)
# 创建区域设置服务
self.set_area_srv = self.create_service(
SetBool,
'set_delivery_area',
self.set_area_callback,
callback_group=service_cb_group
)
# 创建运动服务客户端
self.motion_client = self.create_client(
MotionResultCmd,
'motion_result_cmd'
)
# 创建语音发布者
self.speech_pub = self.create_publisher(
AudioPlayExtend,
'speech_play_extend',
10
)
# 创建定时器检查交互条件
self.timer = self.create_timer(
0.5, # 每0.5秒检查一次
self.check_interaction
)
self.get_logger().info("配货交互节点已启动,等待区域设置...")
def init_sensor(self):
"""初始化并打开TOF传感器"""
try:
# 初始化传感器simulator=False表示使用实际硬件
self.tof_sensor.Init(simulator=False)
self.get_logger().info("TOF传感器初始化成功")
# 打开传感器
if self.tof_sensor.Open():
self.sensor_initialized = True
self.get_logger().info("TOF传感器打开成功")
# 启动传感器
if self.tof_sensor.Start():
self.get_logger().info("TOF传感器启动成功")
else:
self.get_logger().error("TOF传感器启动失败")
else:
self.get_logger().error("TOF传感器打开失败")
except Exception as e:
self.get_logger().error(f"TOF传感器初始化过程出错: {str(e)}")
def tof_callback(self, msg):
"""处理TOF传感器数据"""
# 只有传感器初始化成功才处理数据
if not self.sensor_initialized:
return
# 检测左侧传感器
left_obstacle_count = sum(1 for d in msg.left_head.data if d < self.obstacle_distance)
left_percentage = left_obstacle_count / len(msg.left_head.data) if msg.left_head.data else 0
self.head_left_detected = left_percentage >= self.obstacle_percentage
# 检测右侧传感器
right_obstacle_count = sum(1 for d in msg.right_head.data if d < self.obstacle_distance)
right_percentage = right_obstacle_count / len(msg.right_head.data) if msg.right_head.data else 0
self.head_right_detected = right_percentage >= self.obstacle_percentage
def motion_callback(self, msg):
"""更新运动状态"""
if self.motion_id != msg.motion_id:
self.old_motion_id = self.motion_id
self.motion_id = msg.motion_id
def set_area_callback(self, request, response):
"""设置当前区域的服务回调"""
# 检查传感器是否已初始化
if not self.sensor_initialized:
response.success = False
response.message = "TOF传感器未初始化无法设置区域"
self.get_logger().warn(response.message)
return response
# request.data: True=配货区, False=卸货区
self.current_area = 1 if request.data else 2
area_name = "配货库位" if request.data else "卸货库位"
self.get_logger().info(f"已进入{area_name},等待交互指示...")
response.success = True
response.message = f"区域设置为{area_name}"
return response
def check_interaction(self):
"""检查交互条件并触发相应行为"""
# 传感器未初始化则不进行检查
if not self.sensor_initialized:
return
# 只在配货区或卸货区且坐下状态下检测交互
if self.current_area == 0 or self.motion_id != 214:
return
# 检测头部两侧同时触摸
if self.head_left_detected and self.head_right_detected:
if self.current_area == 1: # 配货区
self.get_logger().info("配货完成指示触发")
self.trigger_delivery_complete()
self.current_area = 0 # 重置区域
elif self.current_area == 2: # 卸货区
self.get_logger().info("卸货完成指示触发")
self.trigger_unloading_complete()
self.current_area = 0 # 重置区域
def trigger_delivery_complete(self):
"""配货完成处理"""
# 播放语音提示
self.play_audio(5000) # 假设5000是"配货完成"的语音ID
# 执行动作
self.execute_motion(212) # 相对姿势动作
# 等待动作完成
self.get_logger().info("配货已完成,开始运输...")
def trigger_unloading_complete(self):
"""卸货完成处理"""
# 播放语音提示
self.play_audio(5001) # 假设5001是"卸货完成"的语音ID
# 执行动作
self.execute_motion(150) # 摇尾巴动作
# 等待动作完成
self.get_logger().info("卸货已完成,准备新的配货...")
def play_audio(self, play_id):
"""播放语音"""
msg = AudioPlayExtend()
msg.module_name = "delivery_interaction"
msg.is_online = False
msg.speech.play_id = play_id
self.speech_pub.publish(msg)
def execute_motion(self, motion_id):
"""执行指定动作"""
if not self.motion_client.wait_for_service(timeout_sec=1.0):
self.get_logger().warn("运动服务不可用")
return
request = MotionResultCmd.Request()
request.motion_id = motion_id
request.duration = 1000 # 持续时间(毫秒)
future = self.motion_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
if future.done():
response = future.result()
if response.result:
self.get_logger().info(f"动作{motion_id}执行成功")
else:
self.get_logger().error(f"动作{motion_id}执行失败")
else:
self.get_logger().error("运动服务调用超时")
def destroy_node(self):
"""节点销毁时关闭传感器"""
if self.sensor_initialized:
self.tof_sensor.Stop()
self.tof_sensor.Close()
self.get_logger().info("TOF传感器已关闭")
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
# 创建节点
node = DeliveryInteraction()
# 使用多线程执行器
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -98,19 +98,19 @@ class BaseMsg:
print("\n=== 控制指令详情 ===")
print(f"模式(mode): {self.msg.mode}")
print(f"步态ID(gait_id): {self.msg.gait_id}")
print(f"接触状态(contact): {self.msg.contact}")
print(f"生命周期(life_count): {self.msg.life_count}")
print(f"目标速度(vel_des): {self.msg.vel_des}")
print(f"目标姿态(rpy_des): {self.msg.rpy_des}")
print(f"目标位置(pos_des): {self.msg.pos_des}")
print(f"目标加速度(acc_des): {self.msg.acc_des}")
print(f"控制点(ctrl_point): {self.msg.ctrl_point}")
print(f"足端位姿(foot_pose): {self.msg.foot_pose}")
print(f"步高(step_height): {self.msg.step_height}")
print(f"参数值(value): {self.msg.value}")
# print("\n=== 控制指令详情 ===")
# print(f"模式(mode): {self.msg.mode}")
# print(f"步态ID(gait_id): {self.msg.gait_id}")
# print(f"接触状态(contact): {self.msg.contact}")
# print(f"生命周期(life_count): {self.msg.life_count}")
# print(f"目标速度(vel_des): {self.msg.vel_des}")
# print(f"目标姿态(rpy_des): {self.msg.rpy_des}")
# print(f"目标位置(pos_des): {self.msg.pos_des}")
# print(f"目标加速度(acc_des): {self.msg.acc_des}")
# print(f"控制点(ctrl_point): {self.msg.ctrl_point}")
# print(f"足端位姿(foot_pose): {self.msg.foot_pose}")
# print(f"步高(step_height): {self.msg.step_height}")
# print(f"参数值(value): {self.msg.value}")
self.ctrl.Send_cmd(self.msg)

View File

@ -3,7 +3,7 @@ import numpy as np
import os
import datetime
from sklearn import linear_model
from utils.log_helper import get_logger, debug, info, warning, error, success
#from utils.log_helper import get_logger, debug, info, warning, error, success
def detect_horizontal_track_edge(image, observe=False, delay=1000, save_log=True):
"""

View File

@ -1,102 +1,176 @@
#!/usr/bin/env python3
#ros2 run camera_test camera_server
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
# from qreader import QReader
import os
from pyzbar import pyzbar
from threading import Thread, Lock
import time
import queue
from utils.log_helper import get_logger
from datetime import datetime
#from utils.log_helper import get_logger
# 导入AI相机服务
from protocol.srv import CameraService
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import os
import time
from datetime import datetime
from protocol.srv import CameraService
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
# 创建AI相机服务客户端
self.camera_client = self.create_client(
CameraService,
'/camera_service'
)
# 初始化安全控制标志
self._shutdown_flag = False
self._ros_initialized = True
# 等待服务可用
# 创建服务客户端
self.camera_client = self.create_client(CameraService, '/camera_service')
while not self.camera_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('AI相机服务不可用,等待中...')
self.get_logger().info('等待AI相机服务...')
# 订阅AI相机图像话题
self.image_subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10
# 图像订阅
self.image_sub = self.create_subscription(
Image, '/image', self.image_callback, 10
)
# 工具初始化
self.bridge = CvBridge()
self.cv_image = None # Store latest image
self.cv_image = None
self.camera_started = False
# 启动AI相机
# 截图控制参数
self.last_save_time = time.time()
self.start_time = time.time()
self.save_interval = 0.5
self.total_duration = 1.0
self.save_dir = "saved_images"
os.makedirs(self.save_dir, exist_ok=True)
# 安全启动相机
self.start_camera()
def start_camera(self):
"""启动AI相机"""
"""安全启动相机"""
if not self._ros_initialized:
return False
req = CameraService.Request()
req.command = 9 # START_IMAGE_PUBLISH
req.width = 640 # 图像宽度
req.height = 480 # 图像高度
req.fps = 30 # 帧率
req.command = 9
req.width, req.height, req.fps = 640, 480, 30
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已启动')
result = future.result()
self.get_logger().info(f'服务返回: [code={result.result}, msg="{result.msg}"]')
if result.result == 0:
self.get_logger().info('相机启动成功')
self.camera_started = True
return True
else:
self.get_logger().error(f'启动AI相机失败: {future.result().msg}')
self.get_logger().error(f'启动失败 (错误码 {result.result})')
return False
def stop_camera(self):
"""停止AI相机"""
if not self.camera_started:
"""安全停止相机"""
if not self.camera_started or not self._ros_initialized:
return True
req = CameraService.Request()
req.command = 10 # STOP_IMAGE_PUBLISH
req.command = 10
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().result == 0: # RESULT_SUCCESS
self.get_logger().info('AI相机已停止')
self.camera_started = False
try:
future = self.camera_client.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
if future.result().result == 0:
self.get_logger().info('相机已停止')
self.camera_started = False
return True
else:
self.get_logger().error(f'停止失败: {future.result().msg}')
return False
except Exception as e:
self.get_logger().error(f'停止异常: {str(e)}')
return False
def save_image(self):
"""安全保存图像"""
if self.cv_image is None:
self.get_logger().warning("无有效图像可保存")
return False
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.save_dir}/capture_{timestamp}.jpg"
cv2.imwrite(filename, self.cv_image)
self.get_logger().info(f"截图已保存: {filename}")
return True
else:
self.get_logger().error(f'停止AI相机失败: {future.result().msg}')
except Exception as e:
self.get_logger().error(f"保存失败: {str(e)}")
return False
def image_callback(self, msg):
try:
# 将ROS图像消息转换为OpenCV格式
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
"""安全的图像回调"""
if getattr(self, '_shutdown_flag', False):
return
current_time = time.time()
elapsed = current_time - self.start_time
if elapsed >= self.total_duration:
self.get_logger().info("已完成30秒截图任务")
self._shutdown_flag = True
return
try:
self.cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if current_time - self.last_save_time >= self.save_interval:
if self.save_image():
self.last_save_time = current_time
except Exception as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
self.get_logger().error(f"图像处理错误: {str(e)}")
def safe_spin(self):
"""安全spin循环"""
while rclpy.ok() and not getattr(self, '_shutdown_flag', False):
rclpy.spin_once(self, timeout_sec=0.1)
def destroy(self):
"""安全销毁节点"""
if not self._ros_initialized:
return
self.stop_camera()
self._shutdown_flag = True
if hasattr(self, '_ros_initialized'):
self.destroy_node()
rclpy.shutdown()
def get_latest_image(self):
"""外部获取最新图像接口"""
return self.cv_image.copy() if self.cv_image is not None else None
class ImageProcessor:
def __init__(self):
self.image_subscriber = ImageSubscriber()
# self.qreader = QReader()
self.spin_thread = None
self.running = True
self.log = get_logger("图像处理器")
# self.log = get_logger("图像处理器")
# 异步 QR 码扫描相关
self.scan_thread = None
@ -116,7 +190,8 @@ class ImageProcessor:
while self.running:
rclpy.spin_once(self.image_subscriber, timeout_sec=0.1)
except Exception as e:
self.log.error(f"Spin线程错误: {e}", "错误")
# self.log.error(f"Spin线程错误: {e}", "错误")
print('threat wrong')
def destroy(self):
self.running = False
@ -130,14 +205,53 @@ class ImageProcessor:
def get_current_image(self):
return self.image_subscriber.cv_image
def decode_qrcode(self, img = None):
def decode_qrcode(self, img=None):
"""使用pyzbar解码QR码"""
if img is None:
img = self.get_current_image()
# decoded_info = self.qreader.detect_and_decode(image=img)
decoded_info = None
if decoded_info and len(decoded_info) > 0:
return decoded_info[0]
return None
if img is None:
return None
try:
# 使用pyzbar解码所有条形码/QR码
decoded_objects = pyzbar.decode(img)
if decoded_objects:
# 返回第一个QR码的数据解码为字符串
qr_data = decoded_objects[0].data.decode('utf-8')
return qr_data
return None
except Exception as e:
print(f"QR码解码错误: {str(e)}")
return None
def decode_all_qrcodes(self, img=None):
"""解码图像中的所有QR码/条形码"""
if img is None:
img = self.get_current_image()
if img is None:
return []
try:
decoded_objects = pyzbar.decode(img)
results = []
for obj in decoded_objects:
result = {
'data': obj.data.decode('utf-8'),
'type': obj.type,
'rect': obj.rect, # 位置信息 (x, y, width, height)
'polygon': obj.polygon # 多边形顶点
}
results.append(result)
return results
except Exception as e:
print(f"批量QR码解码错误: {str(e)}")
return []
def start_async_scan(self, interval=0.3):
"""
@ -147,7 +261,8 @@ class ImageProcessor:
interval: 扫描间隔单位秒
"""
if self.scan_thread is not None and self.scan_thread.is_alive():
self.log.warning("异步扫描已经在运行中", "警告")
# self.log.warning("异步扫描已经在运行中", "警告")
print('scan,warn')
return
self.enable_async_scan = True
@ -155,14 +270,16 @@ class ImageProcessor:
self.scan_thread = Thread(target=self._async_scan_worker, args=(interval,))
self.scan_thread.daemon = True # 设为守护线程,主线程结束时自动结束
self.scan_thread.start()
self.log.info("启动异步 QR 码扫描线程", "启动")
# self.log.info("启动异步 QR 码扫描线程", "启动")
print('start')
def stop_async_scan(self):
"""停止异步 QR 码扫描"""
self.enable_async_scan = False
if self.scan_thread and self.scan_thread.is_alive():
self.scan_thread.join(timeout=1.0)
self.log.info("异步 QR 码扫描线程已停止", "停止")
# self.log.info("异步 QR 码扫描线程已停止", "停止")
print('stop')
def _async_scan_worker(self, interval):
"""异步扫描工作线程"""
@ -184,10 +301,12 @@ class ImageProcessor:
if qr_data:
self.last_qr_result = qr_data
self.last_qr_time = current_time
self.log.success(f"异步扫描到 QR 码: {qr_data}", "扫描")
# self.log.success(f"异步扫描到 QR 码: {qr_data}", "扫描")
print(f"异步扫描到 QR 码: {qr_data}")
except Exception as e:
self.is_scanning = False
self.log.error(f"异步 QR 码扫描出错: {e}", "错误")
# self.log.error(f"异步 QR 码扫描出错: {e}", "错误")
print(f"异步 QR 码扫描出错: {e}")
last_scan_time = current_time
@ -202,19 +321,19 @@ class ImageProcessor:
""" DEBUG """
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
# def main(args=None):
# rclpy.init(args=args)
# image_subscriber = ImageSubscriber()
try:
rclpy.spin(image_subscriber)
except KeyboardInterrupt:
pass
# try:
# rclpy.spin(image_subscriber)
# except KeyboardInterrupt:
# pass
# 清理
image_subscriber.stop_camera()
image_subscriber.destroy_node()
# # 清理
# image_subscriber.stop_camera()
# image_subscriber.destroy_node()
if __name__ == '__main__':
main()
# if __name__ == '__main__':
# main()

99
utils/speech_demo.py Executable file
View File

@ -0,0 +1,99 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from protocol.srv import AudioTextPlay
from std_srvs.srv import SetBool
import time
import string
class AudioDemo(Node):
def __init__(self):
super().__init__('audio_demo')
# 创建音频播放服务客户端
self.audio_play_client = self.create_client(
AudioTextPlay,
'/mi_desktop_48_b0_2d_7b_05_1d/speech_text_play'
)
# 创建语音开关服务客户端
self.audio_switch_client = self.create_client(
SetBool,
'/mi_desktop_48_b0_2d_7b_05_1d/audio_action_set'
)
# 等待服务可用
while not self.audio_play_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('语音播放服务不可用,等待中...')
while not self.audio_switch_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('语音开关服务不可用,等待中...')
def enable_audio(self):
"""启用语音功能"""
req = SetBool.Request()
req.data = True # 开启语音功能
future = self.audio_switch_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result().success:
self.get_logger().info('语音功能已启用')
return True
else:
self.get_logger().error(f'启用语音失败: {future.result().message}')
return False
def play_audio(self, text):
"""播放指定文本的语音"""
req = AudioTextPlay.Request()
req.module_name = "demo_script" # 播放者名称
req.is_online = True # 使用在线TTS
req.text = text # 要播放的文本
# 注意由于我们使用在线播放speech字段可以留空
# 根据服务定义speech字段是AudioPlay类型但在线播放不需要它
future = self.audio_play_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
response = future.result()
if response.status == 0:
self.get_logger().info('语音播放完成')
else:
self.get_logger().error('语音播放失败')
def speak(text,args=None):
"""语音播放函数(供外部调用)"""
rclpy.init()
demo = AudioDemo()
try:
# 步骤1: 启用语音功能
if demo.enable_audio():
# 步骤2: 播放语音
demo.play_audio(text)
# 短暂延迟确保完成
time.sleep(2)
finally:
demo.destroy_node()
rclpy.shutdown()
# def main(text,args=None):
# rclpy.init(args=args)
# demo = AudioDemo()
# # 步骤1: 启用语音功能
# if demo.enable_audio():
# # 步骤2: 播放语音
# demo.play_audio(text)
# # 短暂延迟确保完成
# time.sleep(2)
# demo.destroy_node()
# rclpy.shutdown()
# if __name__ == '__main__':
# main('nihao')