mi-task/tmp/interdemo4.py

215 lines
8.0 KiB
Python
Raw Normal View History

2025-08-19 22:11:14 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, RearTofPayload, SingleTofPayload
from std_msgs.msg import String
from std_srvs.srv import Trigger
import time
import numpy as np
import csv
import os
from datetime import datetime
class TofInteractive(Node):
def __init__(self):
super().__init__('tof_interactive_node')
self.get_logger().info("TOF交互节点启动...")
# 当前状态:'loading' 配货中, 'unloading' 卸货中, 'transporting' 运输中
self.current_state = 'loading' # 初始状态设为配货状态
# 触摸检测参数
self.obstacle_distance = 0.3 # 检测障碍物的距离阈值 (米)
self.obstacle_percentage = 0.8 # 检测到障碍物的点比例阈值
self.sensitivity_s = 1.0 # 检测灵敏度 (秒)
self.last_detection_time = time.time() - self.sensitivity_s
# 存储TOF数据
self.head_tof_data = None
self.rear_tof_data = None
# 订阅头部和尾部TOF数据
self.head_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.head_callback,
10)
self.rear_sub = self.create_subscription(
RearTofPayload,
'rear_tof_payload',
self.rear_callback,
10)
# 发布状态消息
self.status_pub = self.create_publisher(String, 'interactive_status', 10)
# 创建完成配货/卸货的服务
self.complete_loading_srv = self.create_service(
Trigger,
'complete_loading',
self.complete_loading_callback)
self.complete_unloading_srv = self.create_service(
Trigger,
'complete_unloading',
self.complete_unloading_callback)
# 定时器用于状态检测
self.timer = self.create_timer(0.1, self.detection_loop)
self.get_logger().info(f"初始状态: {self.current_state}")
self.publish_status()
def head_callback(self, msg):
"""头部TOF数据回调"""
self.get_logger().info("head")
self.head_tof_data = msg
# self.get_logger().info(f"头部TOF数据接收: L={len(msg.left_head.data)}, R={len(msg.right_head.data)}")
def rear_callback(self, msg):
"""尾部TOF数据回调"""
self.get_logger().info("rear")
self.rear_tof_data = msg
# self.get_logger().info(f"尾部TOF数据接收: L={len(msg.left_rear.data)}, R={len(msg.right_rear.data)}")
def detect_obstacle(self, single_tof):
"""检测单个TOF传感器是否有障碍物"""
self.get_logger().info("obstacle")
if not single_tof.data_available or len(single_tof.data) == 0:
return False
obstacle_point = 0
for ranging in single_tof.data:
if 0.15 <= ranging <= 0.66 and ranging < self.obstacle_distance:
obstacle_point += 1
obstacle_percentage = obstacle_point / len(single_tof.data)
return obstacle_percentage >= self.obstacle_percentage
def detection_loop(self):
"""检测循环 - 检查触摸事件"""
self.get_logger().info("loop")
current_time = time.time()
if current_time - self.last_detection_time < self.sensitivity_s:
return
self.get_logger().info("lllll")
if self.head_tof_data is None:
return
self.get_logger().info("ppppp")
# 检测左右头部TOF
left_detected = self.detect_obstacle(self.head_tof_data.left_head)
right_detected = self.detect_obstacle(self.head_tof_data.right_head)
# 检测到触摸事件
if left_detected or right_detected:
self.last_detection_time = current_time
self.get_logger().info("检测到触摸下巴事件!")
# 记录TOF数据
self.record_tof_data()
# 根据当前状态处理
if self.current_state == 'loading':
self.get_logger().info("配货完成!")
self.current_state = 'transporting'
self.play_sound("配货完成,开始运输")
elif self.current_state == 'unloading':
self.get_logger().info("卸货完成!")
self.current_state = 'loading' # 回到配货状态
self.play_sound("卸货完成,准备下一次配货")
self.publish_status()
def record_tof_data(self):
"""记录当前的TOF数据"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"tof_data_{timestamp}.csv"
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['sensor_position', 'index', 'distance', 'intensity'])
# 记录头部TOF数据
if self.head_tof_data:
self.write_single_tof(writer, 'left_head', self.head_tof_data.left_head)
self.write_single_tof(writer, 'right_head', self.head_tof_data.right_head)
# 记录尾部TOF数据
if self.rear_tof_data:
self.write_single_tof(writer, 'left_rear', self.rear_tof_data.left_rear)
self.write_single_tof(writer, 'right_rear', self.rear_tof_data.right_rear)
self.get_logger().info(f"TOF数据已保存到文件: {filename}")
def write_single_tof(self, writer, position_name, single_tof):
"""将单个TOF传感器的数据写入CSV"""
if not single_tof.data_available:
return
# 如果intensity数据不可用创建一个相同长度的零列表
if len(single_tof.intensity) != len(single_tof.data):
intensity_list = [0.0] * len(single_tof.data)
else:
intensity_list = single_tof.intensity
for i, (distance, intensity) in enumerate(zip(single_tof.data, intensity_list)):
writer.writerow([position_name, i, distance, intensity])
def play_sound(self, text):
"""播放提示音 (模拟)"""
self.get_logger().info(f"播放声音: {text}")
# 实际实现中应调用机器狗的音频播放接口
# 例如: self.audio_pub.publish(text)
def publish_status(self):
"""发布当前状态"""
msg = String()
msg.data = self.current_state
self.status_pub.publish(msg)
self.get_logger().info(f"状态更新: {self.current_state}")
def complete_loading_callback(self, request, response):
"""完成配货服务回调"""
if self.current_state == 'loading':
self.get_logger().info("通过服务调用完成配货")
self.current_state = 'transporting'
self.publish_status()
response.success = True
response.message = "配货完成,开始运输"
else:
response.success = False
response.message = f"当前状态 {self.current_state} 不允许配货"
return response
def complete_unloading_callback(self, request, response):
"""完成卸货服务回调"""
if self.current_state == 'unloading':
self.get_logger().info("通过服务调用完成卸货")
self.current_state = 'loading'
self.publish_status()
response.success = True
response.message = "卸货完成,准备下一次配货"
else:
response.success = False
response.message = f"当前状态 {self.current_state} 不允许卸货"
return response
def main(args=None):
rclpy.init(args=args)
node = TofInteractive()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()