mi-task/tmp/interdemo3.py
2025-08-21 18:19:04 +08:00

202 lines
7.4 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, SingleTofPayload
from std_msgs.msg import Header
import time
from enum import Enum
import numpy as np
from utils.speech_demo import speak
# TOF传感器位置枚举 (与协议一致)
class TofPosition(Enum):
LEFT_HEAD = 0
RIGHT_HEAD = 1
LEFT_REAR = 2
RIGHT_REAR = 3
HEAD = 4
REAR = 5
# 机器状态枚举
class RobotState(Enum):
PREPARING_LOADING = 1 # 准备配货状态
TRANSPORTING = 2 # 运输中状态
PREPARING_UNLOADING = 3 # 准备卸货状态
class AdvancedTofInteractive(Node):
def __init__(self):
super().__init__('advanced_tof_interactive')
self.get_logger().info("高级TOF交互节点启动")
# 机器狗当前状态 (初始为准备配货状态)
self.current_state = RobotState.PREPARING_LOADING
# TOF检测参数
self.touch_threshold = 0.15 # 触摸检测阈值(米)
self.touch_percentage = 0.2 # 有效触摸点比例
self.cooldown_time = 2.0 # 触摸事件冷却时间(秒)
self.last_detection_time = 0.0
self.min_valid_distance = 0 # 最小有效距离(150mm)
self.max_valid_distance = 1 # 最大有效距离(660mm)
# 触摸模式检测参数
self.touch_history = {"left": False, "right": False}
self.touch_duration_threshold = 1 # 持续触摸时间阈值(秒)
self.touch_start_time = {"left": 0.0, "right": 0.0}
# 创建TOF数据订阅器
self.head_tof_sub = self.create_subscription(
HeadTofPayload,
'/custom_namespace/head_tof_payload',
self.head_tof_callback,
10
)
# 创建状态提示发布器
self.state_pub = self.create_publisher(
SingleTofPayload, # 使用TOF消息作为载体
'/custom_namespace/interactive_state',
10
)
# 状态提示消息
self.state_messages = {
RobotState.PREPARING_LOADING: "准备配货:请同时触摸头部两侧完成配货",
RobotState.PREPARING_UNLOADING: "准备卸货:请同时触摸头部两侧完成卸货",
RobotState.TRANSPORTING: "运输中:请勿触摸"
}
def process_tof_data(self, tof_data):
"""
处理单个TOF传感器的8x8数据矩阵
返回:
valid_matrix: 有效数据点矩阵 (True/False)
mean_distance: 平均距离
touch_detected: 是否检测到触摸
"""
# 转换为numpy数组
data_array = np.array(tof_data.data).reshape(8, 8)
# 创建有效数据掩码
valid_mask = (data_array > self.min_valid_distance) & (data_array < self.max_valid_distance)
# 计算有效点的平均距离
if np.any(valid_mask):
mean_distance = np.mean(data_array[valid_mask])
else:
mean_distance = self.max_valid_distance
# 检测触摸
touch_mask = (data_array < self.touch_threshold) & valid_mask
touch_points = np.sum(touch_mask)
touch_detected = (touch_points / valid_mask.size) >= self.touch_percentage
return valid_mask, mean_distance, touch_detected
def head_tof_callback(self, msg):
"""处理头部TOF数据回调"""
current_time = time.time()
# 检查冷却时间
if current_time - self.last_detection_time < self.cooldown_time:
return
# 处理左侧TOF数据
left_valid, left_mean, left_touch = self.process_tof_data(msg.left_head)
# 处理右侧TOF数据
right_valid, right_mean, right_touch = self.process_tof_data(msg.right_head)
# 更新触摸历史
self.update_touch_history('left', left_touch, current_time)
self.update_touch_history('right', right_touch, current_time)
# 检测同时触摸事件
if self.touch_history['left'] and self.touch_history['right']:
self.get_logger().info(f"检测到同时触摸: 左={left_mean:.3f}m, 右={right_mean:.3f}m")
self.last_detection_time = current_time
self.handle_interaction()
def update_touch_history(self, side, current_touch, current_time):
"""更新触摸历史状态"""
if current_touch:
if not self.touch_history[side]:
# 触摸开始
self.touch_start_time[side] = current_time
self.touch_history[side] = True
else:
# 检测触摸持续时间是否达到阈值
if self.touch_history[side]:
duration = current_time - self.touch_start_time[side]
if duration < self.touch_duration_threshold:
self.get_logger().debug(f"{side}触摸时间不足: {duration:.2f}s")
self.touch_history[side] = False
def handle_interaction(self):
speak("触摸")
self.get_logger().info("handleinteraction")
"""处理交互事件"""
# 创建状态消息
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.data_available = True
if self.current_state == RobotState.PREPARING_LOADING:
self.get_logger().info("✅ 配货完成!开始运输...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [1.0] # 表示配货完成
self.current_state = RobotState.TRANSPORTING
elif self.current_state == RobotState.PREPARING_UNLOADING:
self.get_logger().info("✅ 卸货完成!准备新的配货...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [2.0] # 表示卸货完成
self.current_state = RobotState.PREPARING_LOADING
else:
self.get_logger().info("⚠️ 当前状态不接收触摸指令")
state_msg.data_available = False
# 发布状态消息
self.state_pub.publish(state_msg)
self.play_audio_feedback()
def play_audio_feedback(self):
self.get_logger().info("feedback")
"""播放音频反馈"""
# 实际实现需调用机器狗音频接口
# 示例self.audio_publisher.publish(audio_msg)
self.get_logger().info("播放操作确认音效")
def publish_state_info(self):
"""定期发布状态信息"""
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data_available = True
if self.current_state in self.state_messages:
state_str = self.state_messages[self.current_state]
self.get_logger().info(state_str)
# 将状态消息编码到data字段
state_msg.data = [float(self.current_state.value)]
else:
state_msg.data_available = False
self.state_pub.publish(state_msg)
def main(args=None):
rclpy.init(args=args)
node = AdvancedTofInteractive()
try:
# 定期发布状态信息
timer = node.create_timer(3.0, node.publish_state_info)
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()