mi-task/tmp/interdemo4.py

271 lines
10 KiB
Python
Raw Normal View History

2025-08-19 22:11:14 +08:00
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, RearTofPayload, SingleTofPayload
2025-08-21 18:19:04 +08:00
from std_msgs.msg import Header
2025-08-19 22:11:14 +08:00
import time
2025-08-21 18:19:04 +08:00
from enum import Enum
2025-08-19 22:11:14 +08:00
import numpy as np
2025-08-21 18:19:04 +08:00
# TOF传感器位置枚举 (与协议一致)
class TofPosition(Enum):
LEFT_HEAD = 0
RIGHT_HEAD = 1
LEFT_REAR = 2
RIGHT_REAR = 3
HEAD = 4
REAR = 5
# 机器状态枚举
class RobotState(Enum):
PREPARING_LOADING = 1 # 准备配货状态
TRANSPORTING = 2 # 运输中状态
PREPARING_UNLOADING = 3 # 准备卸货状态
class AdvancedTofInteractive(Node):
2025-08-19 22:11:14 +08:00
def __init__(self):
2025-08-21 18:19:04 +08:00
super().__init__('advanced_tof_interactive')
self.get_logger().info("高级TOF交互节点启动")
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# 机器狗当前状态 (初始为准备配货状态)
self.current_state = RobotState.PREPARING_LOADING
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# TOF检测参数
self.last_head_detection_time = 0.0
self.last_rear_detection_time = 0.0
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
self.touch_threshold = 0.15 # 触摸检测阈值(米)
self.touch_percentage = 0.3 # 有效触摸点比例
self.cooldown_time = 2.0 # 触摸事件冷却时间(秒)
self.last_detection_time = 0.0
self.min_valid_distance = 0 # 最小有效距离(150mm)
self.max_valid_distance = 0.6 # 最大有效距离(660mm)
# 触摸模式检测参数
self.head_touch_history = {"left": False, "right": False}
self.rear_touch_history = {"left": False, "right": False}
self.touch_duration_threshold = 0.5 # 持续触摸时间阈值(秒)
self.head_touch_start_time = {"left": 0.0, "right": 0.0}
self.rear_touch_start_time = {"left": 0.0, "right": 0.0}
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# 创建TOF数据订阅器
self.head_tof_sub = self.create_subscription(
2025-08-19 22:11:14 +08:00
HeadTofPayload,
2025-08-21 18:19:04 +08:00
'/custom_namespace/head_tof_payload', # 使用正确的topic名称
self.head_tof_callback,
10
)
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
self.rear_tof_sub = self.create_subscription(
2025-08-19 22:11:14 +08:00
RearTofPayload,
2025-08-21 18:19:04 +08:00
'/custom_namespace/rear_tof_payload', # 使用正确的topic名称
self.rear_tof_callback,
10
)
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# 创建状态提示发布器
self.state_pub = self.create_publisher(
SingleTofPayload, # 使用TOF消息作为载体
'/custom_namespace/interactive_state',
10
)
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# 状态提示消息
self.state_messages = {
RobotState.PREPARING_LOADING: "准备配货:请同时触摸头部两侧完成配货",
RobotState.PREPARING_UNLOADING: "准备卸货:请同时触摸头部两侧完成卸货",
RobotState.TRANSPORTING: "运输中:请勿触摸"
}
def process_tof_data(self, tof_data):
"""
处理单个TOF传感器的8x8数据矩阵
返回:
valid_matrix: 有效数据点矩阵 (True/False)
mean_distance: 平均距离
touch_detected: 是否检测到触摸
"""
# 转换为numpy数组
data_array = np.array(tof_data.data).reshape(8, 8)
# 创建有效数据掩码
valid_mask = (data_array > self.min_valid_distance) & (data_array < self.max_valid_distance)
# 计算有效点的平均距离
if np.any(valid_mask):
mean_distance = np.mean(data_array[valid_mask])
else:
mean_distance = self.max_valid_distance
# 检测触摸
touch_mask = (data_array < self.touch_threshold) & valid_mask
touch_points = np.sum(touch_mask)
touch_detected = (touch_points / valid_mask.size) >= self.touch_percentage
return valid_mask, mean_distance, touch_detected
def head_tof_callback(self, msg):
"""处理头部TOF数据回调"""
2025-08-19 22:11:14 +08:00
current_time = time.time()
2025-08-21 18:19:04 +08:00
# 检查冷却时间
if current_time - self.last_head_detection_time < self.cooldown_time:
2025-08-19 22:11:14 +08:00
return
2025-08-21 18:19:04 +08:00
# 处理左侧TOF数据
left_valid, left_mean, left_touch = self.process_tof_data(msg.left_head)
# 处理右侧TOF数据
right_valid, right_mean, right_touch = self.process_tof_data(msg.right_head)
# 更新触摸历史
self.update_touch_history('head', 'left', left_touch, current_time)
self.update_touch_history('head', 'right', right_touch, current_time)
# 检测同时触摸事件
if self.head_touch_history['left'] and self.head_touch_history['right']:
self.get_logger().info(f"检测到头部同时触摸: 左={left_mean:.3f}m, 右={right_mean:.3f}m")
self.last_head_detection_time = current_time
self.handle_head_interaction()
def rear_tof_callback(self, msg):
"""处理尾部TOF数据回调"""
current_time = time.time()
# 检查冷却时间
if current_time - self.last_rear_detection_time < self.cooldown_time:
2025-08-19 22:11:14 +08:00
return
2025-08-21 18:19:04 +08:00
# 处理左侧TOF数据
left_valid, left_mean, left_touch = self.process_tof_data(msg.left_rear)
# 处理右侧TOF数据
right_valid, right_mean, right_touch = self.process_tof_data(msg.right_rear)
# 更新触摸历史
self.update_touch_history('rear', 'left', left_touch, current_time)
self.update_touch_history('rear', 'right', right_touch, current_time)
# 检测同时触摸事件
if self.rear_touch_history['left'] and self.rear_touch_history['right']:
self.get_logger().info(f"检测到尾部同时触摸: 左={left_mean:.3f}m, 右={right_mean:.3f}m")
self.last_rear_detection_time = current_time
self.handle_rear_interaction()
def update_touch_history(self, sensor_type, side, current_touch, current_time):
"""更新触摸历史状态"""
if sensor_type == 'head':
history = self.head_touch_history
start_time = self.head_touch_start_time
else: # rear
history = self.rear_touch_history
start_time = self.rear_touch_start_time
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
if current_touch:
if not history[side]:
# 触摸开始
start_time[side] = current_time
history[side] = True
else:
# 检测触摸持续时间是否达到阈值
if history[side]:
duration = current_time - start_time[side]
if duration < self.touch_duration_threshold:
self.get_logger().debug(f"{sensor_type} {side}触摸时间不足: {duration:.2f}s")
history[side] = False
def handle_head_interaction(self):
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
"""处理头部交互事件"""
# 创建状态消息
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.data_available = True
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
if self.current_state == RobotState.PREPARING_LOADING:
self.get_logger().info("✅ 配货完成!开始运输...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [1.0] # 表示配货完成
self.current_state = RobotState.PREPARING_UNLOADING
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
elif self.current_state == RobotState.PREPARING_UNLOADING:
self.get_logger().info("✅ 卸货完成!准备新的配货...")
state_msg.tof_position = TofPosition.HEAD.value
state_msg.data = [2.0] # 表示卸货完成
self.current_state = RobotState.PREPARING_LOADING
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
else:
self.get_logger().info("⚠️ 当前状态不接收头部触摸指令")
state_msg.data_available = False
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
# 发布状态消息
self.state_pub.publish(state_msg)
self.play_audio_feedback("head")
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
def handle_rear_interaction(self):
"""处理尾部交互事件"""
# 创建状态消息
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.data_available = True
if self.current_state == RobotState.PREPARING_LOADING:
self.get_logger().info("✅ 到达目的地!准备卸货...")
state_msg.tof_position = TofPosition.REAR.value
state_msg.data = [3.0] # 表示到达目的地
self.current_state = RobotState.PREPARING_UNLOADING
2025-08-19 22:11:14 +08:00
else:
2025-08-21 18:19:04 +08:00
self.get_logger().info("⚠️ 当前状态不接收尾部触摸指令")
state_msg.data_available = False
# 发布状态消息
self.state_pub.publish(state_msg)
self.play_audio_feedback("rear")
def play_audio_feedback(self, sensor_type):
"""播放音频反馈"""
# 实际实现需调用机器狗音频接口
if sensor_type == "head":
self.get_logger().info("播放头部操作确认音效")
2025-08-19 22:11:14 +08:00
else:
2025-08-21 18:19:04 +08:00
self.get_logger().info("播放尾部操作确认音效")
2025-08-19 22:11:14 +08:00
2025-08-21 18:19:04 +08:00
def publish_state_info(self):
"""定期发布状态信息"""
state_msg = SingleTofPayload()
state_msg.header.stamp = self.get_clock().now().to_msg()
state_msg.data_available = True
if self.current_state in self.state_messages:
state_str = self.state_messages[self.current_state]
self.get_logger().info(f"当前状态: {state_str}")
# 将状态消息编码到data字段
state_msg.data = [float(self.current_state.value)]
if self.current_state == RobotState.PREPARING_LOADING or \
self.current_state == RobotState.PREPARING_UNLOADING:
state_msg.tof_position = TofPosition.HEAD.value
else:
state_msg.tof_position = TofPosition.REAR.value
else:
state_msg.data_available = False
self.state_pub.publish(state_msg)
2025-08-19 22:11:14 +08:00
def main(args=None):
rclpy.init(args=args)
2025-08-21 18:19:04 +08:00
node = AdvancedTofInteractive()
2025-08-19 22:11:14 +08:00
try:
2025-08-21 18:19:04 +08:00
# 定期发布状态信息
timer = node.create_timer(3.0, node.publish_state_info)
2025-08-19 22:11:14 +08:00
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()