#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from protocol.msg import HeadTofPayload, SingleTofPayload import time from enum import Enum class OperationState(Enum): READY_FOR_LOADING = 1 # 准备配货状态 READY_FOR_UNLOADING = 2 # 准备卸货状态 class TouchInteractionNode(Node): def __init__(self): super().__init__('touch_interaction_node') # 订阅头部TOF数据 self.tof_subscription = self.create_subscription( HeadTofPayload, '/mi_desktop_48_b0_2d_7b_05_1d/head_tof_payload', self.tof_callback, 10) # 初始化状态(假设从外部获取状态,这里默认设为配货状态) self.current_state = OperationState.READY_FOR_LOADING # 触摸检测参数 self.touch_threshold = 0.15 # 触摸距离阈值(米) self.touch_duration = 1.0 # 触摸持续时间(秒) self.last_touch_time = 0 self.touch_start_time = 0 self.is_touching = False # 创建定时器用于状态显示 self.timer = self.create_timer(0.5, self.status_timer_callback) self.get_logger().info("触摸交互节点已启动,当前状态: {}".format( "准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货")) def tof_callback(self, msg): """处理TOF传感器数据""" current_time = time.time() # 检查所有TOF传感器数据 min_distance = float('inf') for tof_data in msg.sensors: if tof_data.distance > 0 and tof_data.distance < min_distance: min_distance = tof_data.distance # 检测触摸事件 if min_distance < self.touch_threshold: if not self.is_touching: # 开始触摸 self.is_touching = True self.touch_start_time = current_time else: # 持续触摸 touch_duration = current_time - self.touch_start_time if touch_duration >= self.touch_duration and current_time - self.last_touch_time > 2.0: # 触摸持续时间足够,且距离上次触发有一定间隔 self.handle_touch_event() self.last_touch_time = current_time else: self.is_touching = False def handle_touch_event(self): """处理触摸事件""" if self.current_state == OperationState.READY_FOR_LOADING: self.get_logger().info("配货完成确认!") # 这里可以添加配货完成后的操作,如发布消息、改变状态等 self.current_state = OperationState.READY_FOR_UNLOADING elif self.current_state == OperationState.READY_FOR_UNLOADING: self.get_logger().info("卸货完成确认!") # 这里可以添加卸货完成后的操作 self.current_state = OperationState.READY_FOR_LOADING # 提供反馈(可以是声音、灯光等,这里用日志代替) self.get_logger().info("状态已更新: {}".format( "准备卸货" if self.current_state == OperationState.READY_FOR_UNLOADING else "准备配货")) def status_timer_callback(self): """定时显示状态信息""" status_msg = "当前状态: {} | 等待触摸交互".format( "准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货") self.get_logger().info(status_msg, throttle_duration_sec=2.0) def set_operation_state(self, state): """设置当前操作状态(从外部调用)""" if state in [OperationState.READY_FOR_LOADING, OperationState.READY_FOR_UNLOADING]: self.current_state = state self.get_logger().info("操作状态已设置为: {}".format( "准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货")) def main(args=None): rclpy.init(args=args) touch_node = TouchInteractionNode() try: rclpy.spin(touch_node) except KeyboardInterrupt: touch_node.get_logger().info("节点被手动终止") finally: touch_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()