#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from protocol.srv import GestureActionControl from protocol.msg import GestureActionResult from std_srvs.srv import Trigger import time import threading class GestureControlNode(Node): def __init__(self): super().__init__('gesture_control_node') # 获取设备名称空间 self.device_ns = self.get_namespace() if self.device_ns == "/": # 如果没有指定命名空间,尝试使用默认的 self.device_ns = "/mi_desktop_48_b0_2d_7b_05_1d" self.get_logger().info(f"使用设备命名空间: {self.device_ns}") # 当前状态变量 self.current_state = "waiting_for_loading" # 可能的状态: waiting_for_loading, loading, transporting, waiting_for_unloading, unloading # 创建手势控制服务客户端 gesture_control_service = f"{self.device_ns}/gesture_action_control" self.gesture_control_cli = self.create_client(GestureActionControl, gesture_control_service) # 创建手势识别结果订阅 gesture_action_topic = f"{self.device_ns}/gesture_action_msg" self.gesture_sub = self.create_subscription( GestureActionResult, gesture_action_topic, self.gesture_callback, 10) # 假设存在运输开始和完成的服务waiting for service to become available... # 激活手势识别的尝试次数 self.activation_attempts = 0 self.max_activation_attempts = 10 # 在单独的线程中激活手势识别,避免阻塞主线程 self.activation_thread = threading.Thread(target=self.activate_gesture_recognition) self.activation_thread.start() self.get_logger().info("手势控制节点已启动") def activate_gesture_recognition(self): """激活手势识别功能""" while self.activation_attempts < self.max_activation_attempts: self.activation_attempts += 1 if not self.gesture_control_cli.wait_for_service(timeout_sec=5.0): self.get_logger().warn(f'手势控制服务不可用,尝试 {self.activation_attempts}/{self.max_activation_attempts}') time.sleep(1) continue try: req = GestureActionControl.Request() req.command = GestureActionControl.Request.START_ALGO req.timeout = self.gesture_timeout future = self.gesture_control_cli.call_async(req) # 等待服务响应 rclpy.spin_until_future_complete(self, future, timeout_sec=5.0) if future.done(): response = future.result() if response.code == GestureActionControl.Response.RESULT_SUCCESS: self.get_logger().info("手势识别功能已成功激活") self.last_gesture_time = time.time() return else: self.get_logger().warn(f"手势控制服务返回繁忙状态: {response.code}") else: self.get_logger().warn("手势控制服务调用超时") except Exception as e: self.get_logger().error(f"手势控制服务调用失败: {e}") time.sleep(1) self.get_logger().error(f"经过 {self.max_activation_attempts} 次尝试后,仍无法激活手势识别功能") def gesture_callback(self, msg): """手势识别结果回调""" self.last_gesture_time = time.time() # 手势映射 gesture_names = { 0: "无手势", 1: "手掌拉近", 2: "手掌推开", 3: "手向上抬", 4: "手向下压", 5: "手向左推", 6: "手向右推", 7: "停止手势", 8: "大拇指朝上", 9: "张开手掌或手指", 10: "闭合手掌或手指", 11: "大拇指朝下" } gesture_name = gesture_names.get(msg.id, "未知手势") self.get_logger().info(f"检测到手势: {gesture_name} (ID: {msg.id})") # 根据当前状态和手势执行相应操作 if self.current_state == "loading" and msg.id == 8: # 大拇指朝上表示完成配货 self.complete_loading() elif self.current_state == "unloading" and msg.id == 8: # 大拇指朝上表示完成卸货 self.complete_unloading() elif msg.id == 7: # 停止手势 self.get_logger().info("检测到停止手势") def complete_loading(self): """完成配货操作""" self.get_logger().info("配货完成,开始运输") # 调用开始运输服务 if self.start_transport_cli.wait_for_service(timeout_sec=1.0): req = Trigger.Request() future = self.start_transport_cli.call_async(req) future.add_done_callback(self.transport_start_callback) else: self.get_logger().warn("开始运输服务不可用") # 更新状态 self.current_state = "transporting" def transport_start_callback(self, future): """运输开始服务回调""" try: response = future.result() if response.success: self.get_logger().info("运输已开始") else: self.get_logger().warn("运输启动失败") except Exception as e: self.get_logger().error(f"运输服务调用失败: {e}") def complete_unloading(self): """完成卸货操作""" self.get_logger().info("卸货完成,准备新的配货") # 调用完成卸货服务 if self.complete_unloading_cli.wait_for_service(timeout_sec=1.0): req = Trigger.Request() future = self.complete_unloading_cli.call_async(req) future.add_done_callback(self.unloading_complete_callback) else: self.get_logger().warn("完成卸货服务不可用") # 更新状态 self.current_state = "waiting_for_loading" def unloading_complete_callback(self, future): """卸货完成服务回调""" try: response = future.result() if response.success: self.get_logger().info("卸货已完成确认") else: self.get_logger().warn("卸货完成确认失败") except Exception as e: self.get_logger().error(f"卸货完成服务调用失败: {e}") def timer_callback(self): """定时器回调,用于状态检查和超时处理""" # 检查手势识别是否超时 current_time = time.time() if self.last_gesture_time > 0 and current_time - self.last_gesture_time > self.gesture_timeout: self.get_logger().info("手势识别超时,重新激活") self.activation_attempts = 0 self.activation_thread = threading.Thread(target=self.activate_gesture_recognition) self.activation_thread.start() # 这里可以添加状态机逻辑,根据实际需求更新current_state # 例如,当机器狗到达配货区域时,设置self.current_state = "loading" # 当机器狗到达卸货区域时,设置self.current_state = "unloading" def update_state(self, new_state): """更新机器狗状态""" old_state = self.current_state self.current_state = new_state self.get_logger().info(f"状态更新: {old_state} -> {new_state}") # 如果进入需要手势交互的状态,确保手势识别已激活 if new_state in ["loading", "unloading"]: self.get_logger().info(f"进入{new_state}状态,需要手势交互") if self.activation_attempts >= self.max_activation_attempts: self.get_logger().info("重新尝试激活手势识别") self.activation_attempts = 0 self.activation_thread = threading.Thread(target=self.activate_gesture_recognition) self.activation_thread.start() def main(args=None): rclpy.init(args=args) gesture_control_node = GestureControlNode() try: rclpy.spin(gesture_control_node) except KeyboardInterrupt: pass finally: gesture_control_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()