#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from std_msgs.msg import String, Bool from protocol.srv import AudioTextPlay, AudioVolumeGet, AudioVolumeSet,AudioExecute from protocol.msg import AudioPlay, AudioStatus from std_srvs.srv import SetBool import time import threading class VoiceInteractionSystem(Node): def __init__(self): super().__init__('voice_interaction_system') # 机器狗状态:'preparing_pickup'(准备配货)或 'preparing_delivery'(准备卸货) self.current_state = 'preparing_pickup' # 初始状态设为准备配货 # 创建服务客户端 self.audio_play_client = self.create_client(AudioTextPlay, '/custom_namespace/speech_text_play') self.volume_get_client = self.create_client(AudioVolumeGet, '/custom_namespace/audio_volume_get') self.volume_set_client = self.create_client(AudioVolumeSet, '/custom_namespace/audio_volume_set') # 新增服务客户端:麦克风开关和语音控制开关 self.mic_control_client = self.create_client(AudioExecute, '/custom_namespace/set_audio_state') self.voice_control_client = self.create_client(SetBool, '/custom_namespace/audio_action_set') # 等待服务可用 while not self.audio_play_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('语音播放服务不可用,等待中...') while not self.volume_get_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('音量获取服务不可用,等待中...') while not self.volume_set_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('音量设置服务不可用,等待中...') while not self.mic_control_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('麦克风控制服务不可用,等待中...') while not self.voice_control_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('语音控制服务不可用,等待中...') # 创建订阅器,监听语音指令 self.voice_command_sub = self.create_subscription( String, '/custom_namespace/asr_text', # 语音识别结果的主题 self.voice_command_callback, 10 ) # 创建状态发布器 self.state_pub = self.create_publisher(String, 'task_state', 10) # 设置初始音量(确保语音清晰可听) self.set_volume(20) # 打开麦克风和语音控制 self.enable_microphone(True) self.enable_voice_control(True) self.get_logger().info("语音交互系统已启动,当前状态:准备配货") # 播放欢迎提示 self.play_voice_prompt("准备就绪,请开始配货任务") def enable_microphone(self, enable): """打开/关闭麦克风""" req = AudioExecute.Request() req.client = "task_system" req.status = AudioStatus() req.status.state = AudioStatus.AUDIO_STATUS_NORMAL if enable else AudioStatus.AUDIO_STATUS_OFFMIC future = self.mic_control_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() and future.result().result: self.get_logger().info("麦克风已打开" if enable else "麦克风已关闭") else: self.get_logger().warn("麦克风控制失败") def enable_voice_control(self, enable): """启用/禁用语音控制""" req = SetBool.Request() req.data = enable future = self.voice_control_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() and future.result().success: self.get_logger().info("语音控制已启用" if enable else "语音控制已禁用") else: self.get_logger().warn("语音控制设置失败") def set_volume(self, volume_level): """设置语音音量""" req = AudioVolumeSet.Request() req.volume = volume_level future = self.volume_set_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() and future.result().success: self.get_logger().info(f"音量已设置为: {volume_level}") else: self.get_logger().warn("音量设置失败") def play_voice_prompt(self, text, online=True): """播放语音提示""" req = AudioTextPlay.Request() req.module_name = "task_system" req.is_online = online req.text = text if online else "" # 如果是离线语音,设置对应的语音ID if not online: req.speech = AudioPlay() # 这里使用示例ID,实际应根据协议定义设置 req.speech.play_id = 101 if "配货" in text else 102 future = self.audio_play_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() and future.result().status == 0: self.get_logger().info(f"语音播放成功: '{text}'") else: self.get_logger().warn("语音播放失败") def voice_command_callback(self, msg): self.get_logger().info(f"收到语音指令: ") """处理语音指令""" command = msg.data.lower() self.get_logger().info(f"收到语音指令: {command}") if "完成" in command or "结束" in command: if "配货" in command and self.current_state == 'preparing_pickup': self.handle_pickup_complete() elif "卸货" in command and self.current_state == 'preparing_delivery': self.handle_delivery_complete() else: self.play_voice_prompt("当前状态不支持此操作") elif "状态" in command: self.report_current_state() else: self.play_voice_prompt("未识别的指令,请重试") def handle_pickup_complete(self): """处理配货完成""" self.play_voice_prompt("配货完成,开始运输") self.get_logger().info("配货完成,开始运输") # 更新状态 self.current_state = 'in_transit' self.publish_state("in_transit") # 模拟运输过程 threading.Thread(target=self.simulate_transport).start() def handle_delivery_complete(self): """处理卸货完成""" self.play_voice_prompt("卸货完成,准备返回") self.get_logger().info("卸货完成,准备返回") # 更新状态 self.current_state = 'returning' self.publish_state("returning") # 模拟返回过程 threading.Thread(target=self.simulate_return).start() def simulate_transport(self): """模拟运输过程""" time.sleep(5) # 模拟运输时间 # 到达卸货点 self.play_voice_prompt("已到达卸货点,请开始卸货") self.get_logger().info("已到达卸货点") # 更新状态 self.current_state = 'preparing_delivery' self.publish_state("preparing_delivery") def simulate_return(self): """模拟返回过程""" time.sleep(5) # 模拟返回时间 # 返回配货点 self.play_voice_prompt("已返回配货点,准备新的配货任务") self.get_logger().info("已返回配货点") # 更新状态 self.current_state = 'preparing_pickup' self.publish_state("preparing_pickup") def report_current_state(self): """报告当前状态""" states = { 'preparing_pickup': "准备配货", 'in_transit': "运输中", 'preparing_delivery': "准备卸货", 'returning': "返回中" } state_name = states.get(self.current_state, "未知状态") self.play_voice_prompt(f"当前状态: {state_name}") def publish_state(self, state): """发布当前状态""" msg = String() msg.data = state self.state_pub.publish(msg) self.get_logger().info(f"状态更新: {state}") def main(args=None): rclpy.init(args=args) node = VoiceInteractionSystem() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()