mi-task/tmp/new.py
2025-08-21 11:25:38 +08:00

109 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, SingleTofPayload
import time
from enum import Enum
class OperationState(Enum):
READY_FOR_LOADING = 1 # 准备配货状态
READY_FOR_UNLOADING = 2 # 准备卸货状态
class TouchInteractionNode(Node):
def __init__(self):
super().__init__('touch_interaction_node')
# 订阅头部TOF数据
self.tof_subscription = self.create_subscription(
HeadTofPayload,
'/mi_desktop_48_b0_2d_7b_05_1d/head_tof_payload',
self.tof_callback,
10)
# 初始化状态(假设从外部获取状态,这里默认设为配货状态)
self.current_state = OperationState.READY_FOR_LOADING
# 触摸检测参数
self.touch_threshold = 0.15 # 触摸距离阈值(米)
self.touch_duration = 1.0 # 触摸持续时间(秒)
self.last_touch_time = 0
self.touch_start_time = 0
self.is_touching = False
# 创建定时器用于状态显示
self.timer = self.create_timer(0.5, self.status_timer_callback)
self.get_logger().info("触摸交互节点已启动,当前状态: {}".format(
"准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货"))
def tof_callback(self, msg):
"""处理TOF传感器数据"""
current_time = time.time()
# 检查所有TOF传感器数据
min_distance = float('inf')
for tof_data in msg.sensors:
if tof_data.distance > 0 and tof_data.distance < min_distance:
min_distance = tof_data.distance
# 检测触摸事件
if min_distance < self.touch_threshold:
if not self.is_touching:
# 开始触摸
self.is_touching = True
self.touch_start_time = current_time
else:
# 持续触摸
touch_duration = current_time - self.touch_start_time
if touch_duration >= self.touch_duration and current_time - self.last_touch_time > 2.0:
# 触摸持续时间足够,且距离上次触发有一定间隔
self.handle_touch_event()
self.last_touch_time = current_time
else:
self.is_touching = False
def handle_touch_event(self):
"""处理触摸事件"""
if self.current_state == OperationState.READY_FOR_LOADING:
self.get_logger().info("配货完成确认!")
# 这里可以添加配货完成后的操作,如发布消息、改变状态等
self.current_state = OperationState.READY_FOR_UNLOADING
elif self.current_state == OperationState.READY_FOR_UNLOADING:
self.get_logger().info("卸货完成确认!")
# 这里可以添加卸货完成后的操作
self.current_state = OperationState.READY_FOR_LOADING
# 提供反馈(可以是声音、灯光等,这里用日志代替)
self.get_logger().info("状态已更新: {}".format(
"准备卸货" if self.current_state == OperationState.READY_FOR_UNLOADING else "准备配货"))
def status_timer_callback(self):
"""定时显示状态信息"""
status_msg = "当前状态: {} | 等待触摸交互".format(
"准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货")
self.get_logger().info(status_msg, throttle_duration_sec=2.0)
def set_operation_state(self, state):
"""设置当前操作状态(从外部调用)"""
if state in [OperationState.READY_FOR_LOADING, OperationState.READY_FOR_UNLOADING]:
self.current_state = state
self.get_logger().info("操作状态已设置为: {}".format(
"准备配货" if self.current_state == OperationState.READY_FOR_LOADING else "准备卸货"))
def main(args=None):
rclpy.init(args=args)
touch_node = TouchInteractionNode()
try:
rclpy.spin(touch_node)
except KeyboardInterrupt:
touch_node.get_logger().info("节点被手动终止")
finally:
touch_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()