#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from protocol.srv import GestureActionControl from protocol.msg import GestureActionResult from std_srvs.srv import Trigger import time class GestureControlNode(Node): def __init__(self): super().__init__('gesture_control_node') # 当前状态变量 self.current_state = "loading" # 可能的状态: waiting_for_loading, loading, transporting, waiting_for_unloading, unloading # 创建手势控制服务客户端 self.gesture_control_cli = self.create_client(GestureActionControl, '/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_control') # 创建手势识别结果订阅 self.gesture_sub = self.create_subscription( GestureActionResult, '/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_msg', self.gesture_callback, 10) # 假设存在运输开始和完成的服务 self.start_transport_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/start_transport') self.complete_unloading_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/complete_unloading') # 定时器用于状态检查 self.timer = self.create_timer(1.0, self.timer_callback) self.get_logger().info("手势控制节点已启动") # 手势识别超时时间(秒) self.gesture_timeout = 60 # 最后一次检测到手势的时间 self.last_gesture_time = 0 # 激活手势识别 self.activate_gesture_recognition() def activate_gesture_recognition(self): """激活手势识别功能""" while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0): self.get_logger().info('手势控制服务不可用,等待中...') req = GestureActionControl.Request() req.command = GestureActionControl.Request.START_ALGO req.timeout = self.gesture_timeout future = self.gesture_control_cli.call_async(req) future.add_done_callback(self.gesture_control_callback) self.get_logger().info("已激活手势识别功能") def gesture_control_callback(self, future): """手势控制服务调用回调""" try: response = future.result() if response.code == GestureActionControl.Response.RESULT_SUCCESS: self.get_logger().info("手势控制服务调用成功") else: self.get_logger().warn("手势控制服务繁忙") except Exception as e: self.get_logger().error(f"手势控制服务调用失败: {e}") def gesture_callback(self, msg): """手势识别结果回调""" self.last_gesture_time = time.time() # 手势映射 gesture_names = { 0: "无手势", 1: "手掌拉近", 2: "手掌推开", 3: "手向上抬", 4: "手向下压", 5: "手向左推", 6: "手向右推", 7: "停止手势", 8: "大拇指朝上", 9: "张开手掌或手指", 10: "闭合手掌或手指", 11: "大拇指朝下" } gesture_name = gesture_names.get(msg.id, "未知手势") if msg.id!=0 :self.get_logger().info(f"检测到手势: {gesture_name} (ID: {msg.id})") # 根据当前状态和手势执行相应操作 if self.current_state == "loading" and msg.id == 8: # 大拇指朝上表示完成配货 self.complete_loading() elif self.current_state == "unloading" and msg.id == 8: # 大拇指朝上表示完成卸货 self.complete_unloading() elif msg.id == 7: # 停止手势 self.get_logger().info("检测到停止手势") def complete_loading(self): """完成配货操作""" self.get_logger().info("配货完成,开始运输") # 调用开始运输服务 if self.start_transport_cli.wait_for_service(timeout_sec=1.0): req = Trigger.Request() future = self.start_transport_cli.call_async(req) future.add_done_callback(self.transport_start_callback) else: self.get_logger().warn("开始运输服务不可用") # 更新状态 self.current_state = "transporting" def transport_start_callback(self, future): """运输开始服务回调""" try: response = future.result() if response.success: self.get_logger().info("运输已开始") else: self.get_logger().warn("运输启动失败") except Exception as e: self.get_logger().error(f"运输服务调用失败: {e}") def complete_unloading(self): """完成卸货操作""" self.get_logger().info("卸货完成,准备新的配货") # 调用完成卸货服务 if self.complete_unloading_cli.wait_for_service(timeout_sec=1.0): req = Trigger.Request() future = self.complete_unloading_cli.call_async(req) future.add_done_callback(self.unloading_complete_callback) else: self.get_logger().warn("完成卸货服务不可用") # 更新状态 self.current_state = "waiting_for_loading" def unloading_complete_callback(self, future): """卸货完成服务回调""" try: response = future.result() if response.success: self.get_logger().info("卸货已完成确认") else: self.get_logger().warn("卸货完成确认失败") except Exception as e: self.get_logger().error(f"卸货完成服务调用失败: {e}") def timer_callback(self): """定时器回调,用于状态检查和超时处理""" # 检查手势识别是否超时 if time.time() - self.last_gesture_time > self.gesture_timeout: self.get_logger().info("手势识别超时,重新激活") self.activate_gesture_recognition() self.last_gesture_time = time.time() # 这里可以添加状态机逻辑,根据实际需求更新current_state # 例如,当机器狗到达配货区域时,设置self.current_state = "loading" # 当机器狗到达卸货区域时,设置self.current_state = "unloading" def update_state(self, new_state): """更新机器狗状态""" old_state = self.current_state self.current_state = new_state self.get_logger().info(f"状态更新: {old_state} -> {new_state}") # 如果进入需要手势交互的状态,确保手势识别已激活 if new_state in ["loading", "unloading"]: self.activate_gesture_recognition() def main(args=None): rclpy.init(args=args) gesture_control_node = GestureControlNode() try: rclpy.spin(gesture_control_node) except KeyboardInterrupt: pass finally: gesture_control_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()