mi-task/utils/shoushi_demo.py
2025-08-21 11:25:38 +08:00

284 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from protocol.srv import GestureActionControl
from protocol.msg import GestureActionResult
from std_srvs.srv import Trigger
import time
import sys
class GestureControlNode(Node):
def __init__(self):
super().__init__('gesture_control_node')
# 当前状态变量
self.current_state = "waiting_for_loading" # 可能的状态: waiting_for_loading, loading, transporting, waiting_for_unloading, unloading
# 创建手势控制服务客户端
self.gesture_control_cli = self.create_client(GestureActionControl, '/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_control')
# 创建手势识别结果订阅
self.gesture_sub = self.create_subscription(
GestureActionResult,
'/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_msg',
self.gesture_callback,
10)
# 假设存在运输开始和完成的服务
self.start_transport_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/start_transport')
self.complete_unloading_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/complete_unloading')
# 定时器用于状态检查
self.timer = self.create_timer(1.0, self.timer_callback)
self.get_logger().info("手势控制节点已启动")
# 手势识别超时时间(秒)
self.gesture_timeout = 60
# 最后一次检测到手势的时间
self.last_gesture_time = 0
# 手势识别是否激活的标志
self.is_gesture_active = False
def activate_gesture_recognition(self):
"""激活手势识别功能"""
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('手势控制服务不可用,等待中...')
req = GestureActionControl.Request()
req.command = GestureActionControl.Request.START_ALGO
req.timeout = self.gesture_timeout
future = self.gesture_control_cli.call_async(req)
future.add_done_callback(self.gesture_control_callback)
self.get_logger().info("已激活手势识别功能")
self.is_gesture_active = True
def deactivate_gesture_recognition(self):
"""关闭手势识别功能"""
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('手势控制服务不可用,等待中...')
req = GestureActionControl.Request()
req.command = GestureActionControl.Request.STOP_ALGO
future = self.gesture_control_cli.call_async(req)
future.add_done_callback(self.gesture_control_callback)
self.get_logger().info("已关闭手势识别功能")
self.is_gesture_active = False
def gesture_control_callback(self, future):
"""手势控制服务调用回调"""
try:
response = future.result()
if response.code == GestureActionControl.Response.RESULT_SUCCESS:
self.get_logger().info("手势控制服务调用成功")
else:
self.get_logger().warn("手势控制服务繁忙")
except Exception as e:
self.get_logger().error(f"手势控制服务调用失败: {e}")
def gesture_callback(self, msg):
"""手势识别结果回调"""
if not self.is_gesture_active:
return
self.last_gesture_time = time.time()
# 手势映射
gesture_names = {
0: "无手势",
1: "手掌拉近",
2: "手掌推开",
3: "手向上抬",
4: "手向下压",
5: "手向左推",
6: "手向右推",
7: "停止手势",
8: "大拇指朝上",
9: "张开手掌或手指",
10: "闭合手掌或手指",
11: "大拇指朝下"
}
gesture_name = gesture_names.get(msg.id, "未知手势")
self.get_logger().info(f"检测到手势: {gesture_name} (ID: {msg.id})")
# 根据当前状态和手势执行相应操作
if self.current_state == "loading" and msg.id == 8: # 大拇指朝上表示完成配货
self.complete_loading()
elif self.current_state == "unloading" and msg.id == 8: # 大拇指朝上表示完成卸货
self.complete_unloading()
elif msg.id == 7: # 停止手势
self.get_logger().info("检测到停止手势")
def complete_loading(self):
"""完成配货操作"""
self.get_logger().info("配货完成,开始运输")
# 调用开始运输服务
if self.start_transport_cli.wait_for_service(timeout_sec=1.0):
req = Trigger.Request()
future = self.start_transport_cli.call_async(req)
future.add_done_callback(self.transport_start_callback)
else:
self.get_logger().warn("开始运输服务不可用")
# 更新状态
self.current_state = "transporting"
def transport_start_callback(self, future):
"""运输开始服务回调"""
try:
response = future.result()
if response.success:
self.get_logger().info("运输已开始")
else:
self.get_logger().warn("运输启动失败")
except Exception as e:
self.get_logger().error(f"运输服务调用失败: {e}")
def complete_unloading(self):
"""完成卸货操作"""
self.get_logger().info("卸货完成,准备新的配货")
# 调用完成卸货服务
if self.complete_unloading_cli.wait_for_service(timeout_sec=1.0):
req = Trigger.Request()
future = self.complete_unloading_cli.call_async(req)
future.add_done_callback(self.unloading_complete_callback)
else:
self.get_logger().warn("完成卸货服务不可用")
# 更新状态
self.current_state = "waiting_for_loading"
def unloading_complete_callback(self, future):
"""卸货完成服务回调"""
try:
response = future.result()
if response.success:
self.get_logger().info("卸货已完成确认")
else:
self.get_logger().warn("卸货完成确认失败")
except Exception as e:
self.get_logger().error(f"卸货完成服务调用失败: {e}")
def timer_callback(self):
"""定时器回调,用于状态检查和超时处理"""
# 检查手势识别是否激活且超时
if self.is_gesture_active and time.time() - self.last_gesture_time > self.gesture_timeout:
self.get_logger().info("手势识别超时,重新激活")
self.activate_gesture_recognition()
self.last_gesture_time = time.time()
# 这里可以添加状态机逻辑根据实际需求更新current_state
# 例如当机器狗到达配货区域时设置self.current_state = "loading"
# 当机器狗到达卸货区域时设置self.current_state = "unloading"
def update_state(self, new_state):
"""更新机器狗状态"""
old_state = self.current_state
self.current_state = new_state
self.get_logger().info(f"状态更新: {old_state} -> {new_state}")
# 如果进入需要手势交互的状态,确保手势识别已激活
if new_state in ["loading", "unloading"]:
self.activate_gesture_recognition()
def start_gesture_recognition(timeout=60):
"""启动手势识别功能(供外部调用)"""
rclpy.init()
node = GestureControlNode()
try:
# 设置超时时间
node.gesture_timeout = timeout
# 激活手势识别
node.activate_gesture_recognition()
# 保持节点运行
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
def stop_gesture_recognition():
"""关闭手势识别功能(供外部调用)"""
rclpy.init()
node = GestureControlNode()
try:
# 关闭手势识别
node.deactivate_gesture_recognition()
# 短暂延迟确保完成
time.sleep(2)
finally:
node.destroy_node()
rclpy.shutdown()
def main(args=None):
# 检查命令行参数
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "start":
# 启动手势识别
timeout = 60
if len(sys.argv) > 2:
try:
timeout = int(sys.argv[2])
except ValueError:
print(f"无效的超时时间: {sys.argv[2]}使用默认值60秒")
print(f"启动手势识别,超时时间: {timeout}")
start_gesture_recognition(timeout)
return
elif command == "stop":
# 停止手势识别
print("停止手势识别")
stop_gesture_recognition()
return
elif command == "test":
# 测试模式:启动手势识别,运行一段时间后停止
print("测试模式启动手势识别5秒后停止")
start_gesture_recognition(5)
time.sleep(5)
stop_gesture_recognition()
return
else:
print(f"未知命令: {command}")
print("可用命令: start [timeout], stop, test")
return
# 如果没有参数,运行原始的主函数
rclpy.init(args=args)
gesture_control_node = GestureControlNode()
try:
# 激活手势识别
gesture_control_node.activate_gesture_recognition()
# 运行节点
rclpy.spin(gesture_control_node)
except KeyboardInterrupt:
pass
finally:
gesture_control_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()