mi-task/tmp/interdemo4.py
2025-08-19 22:11:14 +08:00

215 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from protocol.msg import HeadTofPayload, RearTofPayload, SingleTofPayload
from std_msgs.msg import String
from std_srvs.srv import Trigger
import time
import numpy as np
import csv
import os
from datetime import datetime
class TofInteractive(Node):
def __init__(self):
super().__init__('tof_interactive_node')
self.get_logger().info("TOF交互节点启动...")
# 当前状态:'loading' 配货中, 'unloading' 卸货中, 'transporting' 运输中
self.current_state = 'loading' # 初始状态设为配货状态
# 触摸检测参数
self.obstacle_distance = 0.3 # 检测障碍物的距离阈值 (米)
self.obstacle_percentage = 0.8 # 检测到障碍物的点比例阈值
self.sensitivity_s = 1.0 # 检测灵敏度 (秒)
self.last_detection_time = time.time() - self.sensitivity_s
# 存储TOF数据
self.head_tof_data = None
self.rear_tof_data = None
# 订阅头部和尾部TOF数据
self.head_sub = self.create_subscription(
HeadTofPayload,
'head_tof_payload',
self.head_callback,
10)
self.rear_sub = self.create_subscription(
RearTofPayload,
'rear_tof_payload',
self.rear_callback,
10)
# 发布状态消息
self.status_pub = self.create_publisher(String, 'interactive_status', 10)
# 创建完成配货/卸货的服务
self.complete_loading_srv = self.create_service(
Trigger,
'complete_loading',
self.complete_loading_callback)
self.complete_unloading_srv = self.create_service(
Trigger,
'complete_unloading',
self.complete_unloading_callback)
# 定时器用于状态检测
self.timer = self.create_timer(0.1, self.detection_loop)
self.get_logger().info(f"初始状态: {self.current_state}")
self.publish_status()
def head_callback(self, msg):
"""头部TOF数据回调"""
self.get_logger().info("head")
self.head_tof_data = msg
# self.get_logger().info(f"头部TOF数据接收: L={len(msg.left_head.data)}, R={len(msg.right_head.data)}")
def rear_callback(self, msg):
"""尾部TOF数据回调"""
self.get_logger().info("rear")
self.rear_tof_data = msg
# self.get_logger().info(f"尾部TOF数据接收: L={len(msg.left_rear.data)}, R={len(msg.right_rear.data)}")
def detect_obstacle(self, single_tof):
"""检测单个TOF传感器是否有障碍物"""
self.get_logger().info("obstacle")
if not single_tof.data_available or len(single_tof.data) == 0:
return False
obstacle_point = 0
for ranging in single_tof.data:
if 0.15 <= ranging <= 0.66 and ranging < self.obstacle_distance:
obstacle_point += 1
obstacle_percentage = obstacle_point / len(single_tof.data)
return obstacle_percentage >= self.obstacle_percentage
def detection_loop(self):
"""检测循环 - 检查触摸事件"""
self.get_logger().info("loop")
current_time = time.time()
if current_time - self.last_detection_time < self.sensitivity_s:
return
self.get_logger().info("lllll")
if self.head_tof_data is None:
return
self.get_logger().info("ppppp")
# 检测左右头部TOF
left_detected = self.detect_obstacle(self.head_tof_data.left_head)
right_detected = self.detect_obstacle(self.head_tof_data.right_head)
# 检测到触摸事件
if left_detected or right_detected:
self.last_detection_time = current_time
self.get_logger().info("检测到触摸下巴事件!")
# 记录TOF数据
self.record_tof_data()
# 根据当前状态处理
if self.current_state == 'loading':
self.get_logger().info("配货完成!")
self.current_state = 'transporting'
self.play_sound("配货完成,开始运输")
elif self.current_state == 'unloading':
self.get_logger().info("卸货完成!")
self.current_state = 'loading' # 回到配货状态
self.play_sound("卸货完成,准备下一次配货")
self.publish_status()
def record_tof_data(self):
"""记录当前的TOF数据"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"tof_data_{timestamp}.csv"
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['sensor_position', 'index', 'distance', 'intensity'])
# 记录头部TOF数据
if self.head_tof_data:
self.write_single_tof(writer, 'left_head', self.head_tof_data.left_head)
self.write_single_tof(writer, 'right_head', self.head_tof_data.right_head)
# 记录尾部TOF数据
if self.rear_tof_data:
self.write_single_tof(writer, 'left_rear', self.rear_tof_data.left_rear)
self.write_single_tof(writer, 'right_rear', self.rear_tof_data.right_rear)
self.get_logger().info(f"TOF数据已保存到文件: {filename}")
def write_single_tof(self, writer, position_name, single_tof):
"""将单个TOF传感器的数据写入CSV"""
if not single_tof.data_available:
return
# 如果intensity数据不可用创建一个相同长度的零列表
if len(single_tof.intensity) != len(single_tof.data):
intensity_list = [0.0] * len(single_tof.data)
else:
intensity_list = single_tof.intensity
for i, (distance, intensity) in enumerate(zip(single_tof.data, intensity_list)):
writer.writerow([position_name, i, distance, intensity])
def play_sound(self, text):
"""播放提示音 (模拟)"""
self.get_logger().info(f"播放声音: {text}")
# 实际实现中应调用机器狗的音频播放接口
# 例如: self.audio_pub.publish(text)
def publish_status(self):
"""发布当前状态"""
msg = String()
msg.data = self.current_state
self.status_pub.publish(msg)
self.get_logger().info(f"状态更新: {self.current_state}")
def complete_loading_callback(self, request, response):
"""完成配货服务回调"""
if self.current_state == 'loading':
self.get_logger().info("通过服务调用完成配货")
self.current_state = 'transporting'
self.publish_status()
response.success = True
response.message = "配货完成,开始运输"
else:
response.success = False
response.message = f"当前状态 {self.current_state} 不允许配货"
return response
def complete_unloading_callback(self, request, response):
"""完成卸货服务回调"""
if self.current_state == 'unloading':
self.get_logger().info("通过服务调用完成卸货")
self.current_state = 'loading'
self.publish_status()
response.success = True
response.message = "卸货完成,准备下一次配货"
else:
response.success = False
response.message = f"当前状态 {self.current_state} 不允许卸货"
return response
def main(args=None):
rclpy.init(args=args)
node = TofInteractive()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()