mi-task/tmp/interdemo3.py
2025-08-18 20:34:03 +08:00

255 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
# 导入ROS2消息类型
from protocol.msg import HeadTofPayload, MotionStatus, AudioPlayExtend
from protocol.srv import MotionResultCmd
from std_srvs.srv import SetBool
# 假设导入传感器API模块
# 注意实际使用时需要根据真实的API模块路径进行导入
from tof_sensor import TofSensor # 假设的TOF传感器API类
class DeliveryInteraction(Node):
def __init__(self):
super().__init__('delivery_interaction_node')
# 配置参数
self.obstacle_distance = 0.3 # 检测障碍物的距离阈值(米)
self.obstacle_percentage = 0.8 # 检测到障碍物的比例阈值(80%)
self.sensitivity_s = 1.0 # 检测灵敏度(秒)
# 状态变量
self.current_area = 0 # 0:无区域 1:配货区 2:卸货区
self.head_left_detected = False
self.head_right_detected = False
self.motion_id = 0
self.old_motion_id = 0
self.sensor_initialized = False
# 初始化TOF传感器
self.tof_sensor = TofSensor()
self.init_sensor()
# 创建回调组
tof_cb_group = MutuallyExclusiveCallbackGroup()
motion_cb_group = MutuallyExclusiveCallbackGroup()
service_cb_group = MutuallyExclusiveCallbackGroup()
# 订阅TOF数据
self.tof_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.tof_callback,
10,
callback_group=tof_cb_group
)
# 订阅运动状态
self.motion_sub = self.create_subscription(
MotionStatus,
'motion_status',
self.motion_callback,
10,
callback_group=motion_cb_group
)
# 创建区域设置服务
self.set_area_srv = self.create_service(
SetBool,
'set_delivery_area',
self.set_area_callback,
callback_group=service_cb_group
)
# 创建运动服务客户端
self.motion_client = self.create_client(
MotionResultCmd,
'motion_result_cmd'
)
# 创建语音发布者
self.speech_pub = self.create_publisher(
AudioPlayExtend,
'speech_play_extend',
10
)
# 创建定时器检查交互条件
self.timer = self.create_timer(
0.5, # 每0.5秒检查一次
self.check_interaction
)
self.get_logger().info("配货交互节点已启动,等待区域设置...")
def init_sensor(self):
"""初始化并打开TOF传感器"""
try:
# 初始化传感器simulator=False表示使用实际硬件
self.tof_sensor.Init(simulator=False)
self.get_logger().info("TOF传感器初始化成功")
# 打开传感器
if self.tof_sensor.Open():
self.sensor_initialized = True
self.get_logger().info("TOF传感器打开成功")
# 启动传感器
if self.tof_sensor.Start():
self.get_logger().info("TOF传感器启动成功")
else:
self.get_logger().error("TOF传感器启动失败")
else:
self.get_logger().error("TOF传感器打开失败")
except Exception as e:
self.get_logger().error(f"TOF传感器初始化过程出错: {str(e)}")
def tof_callback(self, msg):
"""处理TOF传感器数据"""
# 只有传感器初始化成功才处理数据
if not self.sensor_initialized:
return
# 检测左侧传感器
left_obstacle_count = sum(1 for d in msg.left_head.data if d < self.obstacle_distance)
left_percentage = left_obstacle_count / len(msg.left_head.data) if msg.left_head.data else 0
self.head_left_detected = left_percentage >= self.obstacle_percentage
# 检测右侧传感器
right_obstacle_count = sum(1 for d in msg.right_head.data if d < self.obstacle_distance)
right_percentage = right_obstacle_count / len(msg.right_head.data) if msg.right_head.data else 0
self.head_right_detected = right_percentage >= self.obstacle_percentage
def motion_callback(self, msg):
"""更新运动状态"""
if self.motion_id != msg.motion_id:
self.old_motion_id = self.motion_id
self.motion_id = msg.motion_id
def set_area_callback(self, request, response):
"""设置当前区域的服务回调"""
# 检查传感器是否已初始化
if not self.sensor_initialized:
response.success = False
response.message = "TOF传感器未初始化无法设置区域"
self.get_logger().warn(response.message)
return response
# request.data: True=配货区, False=卸货区
self.current_area = 1 if request.data else 2
area_name = "配货库位" if request.data else "卸货库位"
self.get_logger().info(f"已进入{area_name},等待交互指示...")
response.success = True
response.message = f"区域设置为{area_name}"
return response
def check_interaction(self):
"""检查交互条件并触发相应行为"""
# 传感器未初始化则不进行检查
if not self.sensor_initialized:
return
# 只在配货区或卸货区且坐下状态下检测交互
if self.current_area == 0 or self.motion_id != 214:
return
# 检测头部两侧同时触摸
if self.head_left_detected and self.head_right_detected:
if self.current_area == 1: # 配货区
self.get_logger().info("配货完成指示触发")
self.trigger_delivery_complete()
self.current_area = 0 # 重置区域
elif self.current_area == 2: # 卸货区
self.get_logger().info("卸货完成指示触发")
self.trigger_unloading_complete()
self.current_area = 0 # 重置区域
def trigger_delivery_complete(self):
"""配货完成处理"""
# 播放语音提示
self.play_audio(5000) # 假设5000是"配货完成"的语音ID
# 执行动作
self.execute_motion(212) # 相对姿势动作
# 等待动作完成
self.get_logger().info("配货已完成,开始运输...")
def trigger_unloading_complete(self):
"""卸货完成处理"""
# 播放语音提示
self.play_audio(5001) # 假设5001是"卸货完成"的语音ID
# 执行动作
self.execute_motion(150) # 摇尾巴动作
# 等待动作完成
self.get_logger().info("卸货已完成,准备新的配货...")
def play_audio(self, play_id):
"""播放语音"""
msg = AudioPlayExtend()
msg.module_name = "delivery_interaction"
msg.is_online = False
msg.speech.play_id = play_id
self.speech_pub.publish(msg)
def execute_motion(self, motion_id):
"""执行指定动作"""
if not self.motion_client.wait_for_service(timeout_sec=1.0):
self.get_logger().warn("运动服务不可用")
return
request = MotionResultCmd.Request()
request.motion_id = motion_id
request.duration = 1000 # 持续时间(毫秒)
future = self.motion_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
if future.done():
response = future.result()
if response.result:
self.get_logger().info(f"动作{motion_id}执行成功")
else:
self.get_logger().error(f"动作{motion_id}执行失败")
else:
self.get_logger().error("运动服务调用超时")
def destroy_node(self):
"""节点销毁时关闭传感器"""
if self.sensor_initialized:
self.tof_sensor.Stop()
self.tof_sensor.Close()
self.get_logger().info("TOF传感器已关闭")
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
# 创建节点
node = DeliveryInteraction()
# 使用多线程执行器
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()