230 lines
8.1 KiB
Python
Executable File
230 lines
8.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
from protocol.srv import GestureActionControl
|
||
from protocol.msg import GestureActionResult
|
||
from std_srvs.srv import Trigger
|
||
import time
|
||
import sys
|
||
|
||
class GestureControlNode(Node):
|
||
def __init__(self):
|
||
super().__init__('gesture_control_node')
|
||
|
||
# 当前状态变量
|
||
self.current_state = "waiting_for_loading" # 可能的状态: waiting_for_loading, loading, transporting, waiting_for_unloading, unloading
|
||
|
||
# 创建手势控制服务客户端
|
||
self.gesture_control_cli = self.create_client(GestureActionControl, '/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_control')
|
||
|
||
# 创建手势识别结果订阅
|
||
self.gesture_sub = self.create_subscription(
|
||
GestureActionResult,
|
||
'/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_msg',
|
||
self.gesture_callback,
|
||
10)
|
||
|
||
# 假设存在运输开始和完成的服务
|
||
# self.start_transport_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/start_transport')
|
||
# self.complete_unloading_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/complete_unloading')
|
||
|
||
# 定时器用于状态检查
|
||
self.timer = self.create_timer(1.0, self.timer_callback)
|
||
|
||
self.get_logger().info("手势控制节点已启动")
|
||
|
||
# 手势识别超时时间(秒)
|
||
self.gesture_timeout = 60
|
||
|
||
# 最后一次检测到手势的时间
|
||
self.last_gesture_time = 0
|
||
|
||
# 手势识别是否激活的标志
|
||
self.is_gesture_active = False
|
||
|
||
def activate_gesture_recognition(self):
|
||
"""激活手势识别功能"""
|
||
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
|
||
self.get_logger().info('手势控制服务不可用,等待中...')
|
||
|
||
req = GestureActionControl.Request()
|
||
req.command = GestureActionControl.Request.START_ALGO
|
||
req.timeout = self.gesture_timeout
|
||
|
||
future = self.gesture_control_cli.call_async(req)
|
||
future.add_done_callback(self.gesture_control_callback)
|
||
|
||
self.get_logger().info("已激活手势识别功能")
|
||
self.is_gesture_active = True
|
||
|
||
def deactivate_gesture_recognition(self):
|
||
"""关闭手势识别功能"""
|
||
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
|
||
self.get_logger().info('手势控制服务不可用,等待中...')
|
||
|
||
req = GestureActionControl.Request()
|
||
req.command = GestureActionControl.Request.STOP_ALGO
|
||
|
||
future = self.gesture_control_cli.call_async(req)
|
||
future.add_done_callback(self.gesture_control_callback)
|
||
|
||
self.get_logger().info("已关闭手势识别功能")
|
||
self.is_gesture_active = False
|
||
|
||
def gesture_control_callback(self, future):
|
||
"""手势控制服务调用回调"""
|
||
try:
|
||
response = future.result()
|
||
if response.code == GestureActionControl.Response.RESULT_SUCCESS:
|
||
self.get_logger().info("手势控制服务调用成功")
|
||
else:
|
||
self.get_logger().warn("手势控制服务繁忙")
|
||
except Exception as e:
|
||
self.get_logger().error(f"手势控制服务调用失败: {e}")
|
||
|
||
def gesture_callback(self, msg):
|
||
"""手势识别结果回调"""
|
||
if not self.is_gesture_active:
|
||
return
|
||
|
||
self.last_gesture_time = time.time()
|
||
|
||
# 手势映射
|
||
gesture_names = {
|
||
0: "无手势",
|
||
1: "手掌拉近",
|
||
2: "手掌推开",
|
||
3: "手向上抬",
|
||
4: "手向下压",
|
||
5: "手向左推",
|
||
6: "手向右推",
|
||
7: "停止手势",
|
||
8: "大拇指朝上",
|
||
9: "张开手掌或手指",
|
||
10: "闭合手掌或手指",
|
||
11: "大拇指朝下"
|
||
}
|
||
|
||
gesture_name = gesture_names.get(msg.id, "未知手势")
|
||
self.get_logger().info(f"检测到手势: {gesture_name} (ID: {msg.id})")
|
||
|
||
# 根据当前状态和手势执行相应操作
|
||
if self.current_state == "loading" and msg.id == 8: # 大拇指朝上表示完成配货
|
||
self.complete_loading()
|
||
elif self.current_state == "unloading" and msg.id == 8: # 大拇指朝上表示完成卸货
|
||
self.complete_unloading()
|
||
elif msg.id == 7: # 停止手势
|
||
self.get_logger().info("检测到停止手势")
|
||
|
||
def timer_callback(self):
|
||
"""定时器回调,用于状态检查和超时处理"""
|
||
# 检查手势识别是否激活且超时
|
||
if self.is_gesture_active and time.time() - self.last_gesture_time > self.gesture_timeout:
|
||
self.get_logger().info("手势识别超时,重新激活")
|
||
self.activate_gesture_recognition()
|
||
self.last_gesture_time = time.time()
|
||
|
||
# 这里可以添加状态机逻辑,根据实际需求更新current_state
|
||
# 例如,当机器狗到达配货区域时,设置self.current_state = "loading"
|
||
# 当机器狗到达卸货区域时,设置self.current_state = "unloading"
|
||
def update_state(self, new_state):
|
||
"""更新机器狗状态"""
|
||
old_state = self.current_state
|
||
self.current_state = new_state
|
||
self.get_logger().info(f"状态更新: {old_state} -> {new_state}")
|
||
|
||
# 如果进入需要手势交互的状态,确保手势识别已激活
|
||
if new_state in ["loading", "unloading"]:
|
||
self.activate_gesture_recognition()
|
||
|
||
|
||
def start_gesture_recognition(timeout=60):
|
||
"""启动手势识别功能(供外部调用)"""
|
||
rclpy.init()
|
||
node = GestureControlNode()
|
||
|
||
try:
|
||
# 设置超时时间
|
||
node.gesture_timeout = timeout
|
||
|
||
# 激活手势识别
|
||
node.activate_gesture_recognition()
|
||
|
||
# 保持节点运行
|
||
rclpy.spin(node)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
node.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
|
||
def stop_gesture_recognition():
|
||
"""关闭手势识别功能(供外部调用)"""
|
||
rclpy.init()
|
||
node = GestureControlNode()
|
||
|
||
try:
|
||
# 关闭手势识别
|
||
node.deactivate_gesture_recognition()
|
||
|
||
# 短暂延迟确保完成
|
||
time.sleep(2)
|
||
finally:
|
||
node.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
|
||
def main(args=None):
|
||
# 检查命令行参数
|
||
if 1==1:
|
||
command = "start"
|
||
if command == "start":
|
||
# 启动手势识别
|
||
timeout = 60
|
||
# if len(sys.argv) > 2:
|
||
# try:
|
||
# timeout = int(60)
|
||
# except ValueError:
|
||
# print(f"无效的超时时间: {sys.argv[2]},使用默认值60秒")
|
||
|
||
print(f"启动手势识别,超时时间: {timeout}秒")
|
||
start_gesture_recognition(timeout)
|
||
return
|
||
elif command == "stop":
|
||
# 停止手势识别
|
||
print("停止手势识别")
|
||
stop_gesture_recognition()
|
||
return
|
||
elif command == "test":
|
||
# 测试模式:启动手势识别,运行一段时间后停止
|
||
print("测试模式:启动手势识别,5秒后停止")
|
||
start_gesture_recognition(5)
|
||
time.sleep(5)
|
||
stop_gesture_recognition()
|
||
return
|
||
else:
|
||
print(f"未知命令: {command}")
|
||
print("可用命令: start [timeout], stop, test")
|
||
return
|
||
|
||
# 如果没有参数,运行原始的主函数
|
||
rclpy.init(args=args)
|
||
gesture_control_node = GestureControlNode()
|
||
|
||
try:
|
||
# 激活手势识别
|
||
gesture_control_node.activate_gesture_recognition()
|
||
|
||
# 运行节点
|
||
rclpy.spin(gesture_control_node)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
gesture_control_node.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
if __name__ == '__main__':
|
||
main() |