mi-task/tmp/interdemo3.py

255 lines
8.8 KiB
Python
Raw Normal View History

2025-08-18 20:34:03 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
# 导入ROS2消息类型
from protocol.msg import HeadTofPayload, MotionStatus, AudioPlayExtend
from protocol.srv import MotionResultCmd
from std_srvs.srv import SetBool
# 假设导入传感器API模块
# 注意实际使用时需要根据真实的API模块路径进行导入
from tof_sensor import TofSensor # 假设的TOF传感器API类
class DeliveryInteraction(Node):
def __init__(self):
super().__init__('delivery_interaction_node')
# 配置参数
self.obstacle_distance = 0.3 # 检测障碍物的距离阈值(米)
self.obstacle_percentage = 0.8 # 检测到障碍物的比例阈值(80%)
self.sensitivity_s = 1.0 # 检测灵敏度(秒)
# 状态变量
self.current_area = 0 # 0:无区域 1:配货区 2:卸货区
self.head_left_detected = False
self.head_right_detected = False
self.motion_id = 0
self.old_motion_id = 0
self.sensor_initialized = False
# 初始化TOF传感器
self.tof_sensor = TofSensor()
self.init_sensor()
# 创建回调组
tof_cb_group = MutuallyExclusiveCallbackGroup()
motion_cb_group = MutuallyExclusiveCallbackGroup()
service_cb_group = MutuallyExclusiveCallbackGroup()
# 订阅TOF数据
self.tof_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.tof_callback,
10,
callback_group=tof_cb_group
)
# 订阅运动状态
self.motion_sub = self.create_subscription(
MotionStatus,
'motion_status',
self.motion_callback,
10,
callback_group=motion_cb_group
)
# 创建区域设置服务
self.set_area_srv = self.create_service(
SetBool,
'set_delivery_area',
self.set_area_callback,
callback_group=service_cb_group
)
# 创建运动服务客户端
self.motion_client = self.create_client(
MotionResultCmd,
'motion_result_cmd'
)
# 创建语音发布者
self.speech_pub = self.create_publisher(
AudioPlayExtend,
'speech_play_extend',
10
)
# 创建定时器检查交互条件
self.timer = self.create_timer(
0.5, # 每0.5秒检查一次
self.check_interaction
)
self.get_logger().info("配货交互节点已启动,等待区域设置...")
def init_sensor(self):
"""初始化并打开TOF传感器"""
try:
# 初始化传感器simulator=False表示使用实际硬件
self.tof_sensor.Init(simulator=False)
self.get_logger().info("TOF传感器初始化成功")
# 打开传感器
if self.tof_sensor.Open():
self.sensor_initialized = True
self.get_logger().info("TOF传感器打开成功")
# 启动传感器
if self.tof_sensor.Start():
self.get_logger().info("TOF传感器启动成功")
else:
self.get_logger().error("TOF传感器启动失败")
else:
self.get_logger().error("TOF传感器打开失败")
except Exception as e:
self.get_logger().error(f"TOF传感器初始化过程出错: {str(e)}")
def tof_callback(self, msg):
"""处理TOF传感器数据"""
# 只有传感器初始化成功才处理数据
if not self.sensor_initialized:
return
# 检测左侧传感器
left_obstacle_count = sum(1 for d in msg.left_head.data if d < self.obstacle_distance)
left_percentage = left_obstacle_count / len(msg.left_head.data) if msg.left_head.data else 0
self.head_left_detected = left_percentage >= self.obstacle_percentage
# 检测右侧传感器
right_obstacle_count = sum(1 for d in msg.right_head.data if d < self.obstacle_distance)
right_percentage = right_obstacle_count / len(msg.right_head.data) if msg.right_head.data else 0
self.head_right_detected = right_percentage >= self.obstacle_percentage
def motion_callback(self, msg):
"""更新运动状态"""
if self.motion_id != msg.motion_id:
self.old_motion_id = self.motion_id
self.motion_id = msg.motion_id
def set_area_callback(self, request, response):
"""设置当前区域的服务回调"""
# 检查传感器是否已初始化
if not self.sensor_initialized:
response.success = False
response.message = "TOF传感器未初始化无法设置区域"
self.get_logger().warn(response.message)
return response
# request.data: True=配货区, False=卸货区
self.current_area = 1 if request.data else 2
area_name = "配货库位" if request.data else "卸货库位"
self.get_logger().info(f"已进入{area_name},等待交互指示...")
response.success = True
response.message = f"区域设置为{area_name}"
return response
def check_interaction(self):
"""检查交互条件并触发相应行为"""
# 传感器未初始化则不进行检查
if not self.sensor_initialized:
return
# 只在配货区或卸货区且坐下状态下检测交互
if self.current_area == 0 or self.motion_id != 214:
return
# 检测头部两侧同时触摸
if self.head_left_detected and self.head_right_detected:
if self.current_area == 1: # 配货区
self.get_logger().info("配货完成指示触发")
self.trigger_delivery_complete()
self.current_area = 0 # 重置区域
elif self.current_area == 2: # 卸货区
self.get_logger().info("卸货完成指示触发")
self.trigger_unloading_complete()
self.current_area = 0 # 重置区域
def trigger_delivery_complete(self):
"""配货完成处理"""
# 播放语音提示
self.play_audio(5000) # 假设5000是"配货完成"的语音ID
# 执行动作
self.execute_motion(212) # 相对姿势动作
# 等待动作完成
self.get_logger().info("配货已完成,开始运输...")
def trigger_unloading_complete(self):
"""卸货完成处理"""
# 播放语音提示
self.play_audio(5001) # 假设5001是"卸货完成"的语音ID
# 执行动作
self.execute_motion(150) # 摇尾巴动作
# 等待动作完成
self.get_logger().info("卸货已完成,准备新的配货...")
def play_audio(self, play_id):
"""播放语音"""
msg = AudioPlayExtend()
msg.module_name = "delivery_interaction"
msg.is_online = False
msg.speech.play_id = play_id
self.speech_pub.publish(msg)
def execute_motion(self, motion_id):
"""执行指定动作"""
if not self.motion_client.wait_for_service(timeout_sec=1.0):
self.get_logger().warn("运动服务不可用")
return
request = MotionResultCmd.Request()
request.motion_id = motion_id
request.duration = 1000 # 持续时间(毫秒)
future = self.motion_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
if future.done():
response = future.result()
if response.result:
self.get_logger().info(f"动作{motion_id}执行成功")
else:
self.get_logger().error(f"动作{motion_id}执行失败")
else:
self.get_logger().error("运动服务调用超时")
def destroy_node(self):
"""节点销毁时关闭传感器"""
if self.sensor_initialized:
self.tof_sensor.Stop()
self.tof_sensor.Close()
self.get_logger().info("TOF传感器已关闭")
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
# 创建节点
node = DeliveryInteraction()
# 使用多线程执行器
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()