#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from rclpy.executors import MultiThreadedExecutor from rclpy.callback_groups import MutuallyExclusiveCallbackGroup # 导入ROS2消息类型 from protocol.msg import HeadTofPayload, MotionStatus, AudioPlayExtend from protocol.srv import MotionResultCmd from std_srvs.srv import SetBool # 假设导入传感器API模块 # 注意:实际使用时需要根据真实的API模块路径进行导入 from tof_sensor import TofSensor # 假设的TOF传感器API类 class DeliveryInteraction(Node): def __init__(self): super().__init__('delivery_interaction_node') # 配置参数 self.obstacle_distance = 0.3 # 检测障碍物的距离阈值(米) self.obstacle_percentage = 0.8 # 检测到障碍物的比例阈值(80%) self.sensitivity_s = 1.0 # 检测灵敏度(秒) # 状态变量 self.current_area = 0 # 0:无区域 1:配货区 2:卸货区 self.head_left_detected = False self.head_right_detected = False self.motion_id = 0 self.old_motion_id = 0 self.sensor_initialized = False # 初始化TOF传感器 self.tof_sensor = TofSensor() self.init_sensor() # 创建回调组 tof_cb_group = MutuallyExclusiveCallbackGroup() motion_cb_group = MutuallyExclusiveCallbackGroup() service_cb_group = MutuallyExclusiveCallbackGroup() # 订阅TOF数据 self.tof_sub = self.create_subscription( HeadTofPayload, 'head_tof_payload', self.tof_callback, 10, callback_group=tof_cb_group ) # 订阅运动状态 self.motion_sub = self.create_subscription( MotionStatus, 'motion_status', self.motion_callback, 10, callback_group=motion_cb_group ) # 创建区域设置服务 self.set_area_srv = self.create_service( SetBool, 'set_delivery_area', self.set_area_callback, callback_group=service_cb_group ) # 创建运动服务客户端 self.motion_client = self.create_client( MotionResultCmd, 'motion_result_cmd' ) # 创建语音发布者 self.speech_pub = self.create_publisher( AudioPlayExtend, 'speech_play_extend', 10 ) # 创建定时器检查交互条件 self.timer = self.create_timer( 0.5, # 每0.5秒检查一次 self.check_interaction ) self.get_logger().info("配货交互节点已启动,等待区域设置...") def init_sensor(self): """初始化并打开TOF传感器""" try: # 初始化传感器,simulator=False表示使用实际硬件 self.tof_sensor.Init(simulator=False) self.get_logger().info("TOF传感器初始化成功") # 打开传感器 if self.tof_sensor.Open(): self.sensor_initialized = True self.get_logger().info("TOF传感器打开成功") # 启动传感器 if self.tof_sensor.Start(): self.get_logger().info("TOF传感器启动成功") else: self.get_logger().error("TOF传感器启动失败") else: self.get_logger().error("TOF传感器打开失败") except Exception as e: self.get_logger().error(f"TOF传感器初始化过程出错: {str(e)}") def tof_callback(self, msg): """处理TOF传感器数据""" # 只有传感器初始化成功才处理数据 if not self.sensor_initialized: return # 检测左侧传感器 left_obstacle_count = sum(1 for d in msg.left_head.data if d < self.obstacle_distance) left_percentage = left_obstacle_count / len(msg.left_head.data) if msg.left_head.data else 0 self.head_left_detected = left_percentage >= self.obstacle_percentage # 检测右侧传感器 right_obstacle_count = sum(1 for d in msg.right_head.data if d < self.obstacle_distance) right_percentage = right_obstacle_count / len(msg.right_head.data) if msg.right_head.data else 0 self.head_right_detected = right_percentage >= self.obstacle_percentage def motion_callback(self, msg): """更新运动状态""" if self.motion_id != msg.motion_id: self.old_motion_id = self.motion_id self.motion_id = msg.motion_id def set_area_callback(self, request, response): """设置当前区域的服务回调""" # 检查传感器是否已初始化 if not self.sensor_initialized: response.success = False response.message = "TOF传感器未初始化,无法设置区域" self.get_logger().warn(response.message) return response # request.data: True=配货区, False=卸货区 self.current_area = 1 if request.data else 2 area_name = "配货库位" if request.data else "卸货库位" self.get_logger().info(f"已进入{area_name},等待交互指示...") response.success = True response.message = f"区域设置为{area_name}" return response def check_interaction(self): """检查交互条件并触发相应行为""" # 传感器未初始化则不进行检查 if not self.sensor_initialized: return # 只在配货区或卸货区且坐下状态下检测交互 if self.current_area == 0 or self.motion_id != 214: return # 检测头部两侧同时触摸 if self.head_left_detected and self.head_right_detected: if self.current_area == 1: # 配货区 self.get_logger().info("配货完成指示触发") self.trigger_delivery_complete() self.current_area = 0 # 重置区域 elif self.current_area == 2: # 卸货区 self.get_logger().info("卸货完成指示触发") self.trigger_unloading_complete() self.current_area = 0 # 重置区域 def trigger_delivery_complete(self): """配货完成处理""" # 播放语音提示 self.play_audio(5000) # 假设5000是"配货完成"的语音ID # 执行动作 self.execute_motion(212) # 相对姿势动作 # 等待动作完成 self.get_logger().info("配货已完成,开始运输...") def trigger_unloading_complete(self): """卸货完成处理""" # 播放语音提示 self.play_audio(5001) # 假设5001是"卸货完成"的语音ID # 执行动作 self.execute_motion(150) # 摇尾巴动作 # 等待动作完成 self.get_logger().info("卸货已完成,准备新的配货...") def play_audio(self, play_id): """播放语音""" msg = AudioPlayExtend() msg.module_name = "delivery_interaction" msg.is_online = False msg.speech.play_id = play_id self.speech_pub.publish(msg) def execute_motion(self, motion_id): """执行指定动作""" if not self.motion_client.wait_for_service(timeout_sec=1.0): self.get_logger().warn("运动服务不可用") return request = MotionResultCmd.Request() request.motion_id = motion_id request.duration = 1000 # 持续时间(毫秒) future = self.motion_client.call_async(request) rclpy.spin_until_future_complete(self, future, timeout_sec=2.0) if future.done(): response = future.result() if response.result: self.get_logger().info(f"动作{motion_id}执行成功") else: self.get_logger().error(f"动作{motion_id}执行失败") else: self.get_logger().error("运动服务调用超时") def destroy_node(self): """节点销毁时关闭传感器""" if self.sensor_initialized: self.tof_sensor.Stop() self.tof_sensor.Close() self.get_logger().info("TOF传感器已关闭") super().destroy_node() def main(args=None): rclpy.init(args=args) # 创建节点 node = DeliveryInteraction() # 使用多线程执行器 executor = MultiThreadedExecutor() executor.add_node(node) try: executor.spin() except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()