#!/usr/bin/env python3 import rclpy from rclpy.node import Node from protocol.srv import AudioTextPlay from std_srvs.srv import SetBool import time import string class AudioDemo(Node): def __init__(self): super().__init__('audio_demo') # 创建音频播放服务客户端 self.audio_play_client = self.create_client( AudioTextPlay, '/mi_desktop_48_b0_2d_7b_05_1d/speech_text_play' ) # 创建语音开关服务客户端 self.audio_switch_client = self.create_client( SetBool, '/mi_desktop_48_b0_2d_7b_05_1d/audio_action_set' ) # 等待服务可用 while not self.audio_play_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('语音播放服务不可用,等待中...') while not self.audio_switch_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('语音开关服务不可用,等待中...') def enable_audio(self): """启用语音功能""" req = SetBool.Request() req.data = True # 开启语音功能 future = self.audio_switch_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result().success: self.get_logger().info('语音功能已启用') return True else: self.get_logger().error(f'启用语音失败: {future.result().message}') return False def play_audio(self, text): """播放指定文本的语音""" req = AudioTextPlay.Request() req.module_name = "demo_script" # 播放者名称 req.is_online = True # 使用在线TTS req.text = text # 要播放的文本 # 注意:由于我们使用在线播放,speech字段可以留空 # 根据服务定义,speech字段是AudioPlay类型,但在线播放不需要它 future = self.audio_play_client.call_async(req) rclpy.spin_until_future_complete(self, future) response = future.result() if response.status == 0: self.get_logger().info('语音播放完成') else: self.get_logger().error('语音播放失败') def speak(text,args=None): """语音播放函数(供外部调用)""" rclpy.init() demo = AudioDemo() try: # 步骤1: 启用语音功能 if demo.enable_audio(): # 步骤2: 播放语音 demo.play_audio(text) # 短暂延迟确保完成 time.sleep(2) finally: demo.destroy_node() rclpy.shutdown() # def main(text,args=None): # rclpy.init(args=args) # demo = AudioDemo() # # 步骤1: 启用语音功能 # if demo.enable_audio(): # # 步骤2: 播放语音 # demo.play_audio(text) # # 短暂延迟确保完成 # time.sleep(2) # demo.destroy_node() # rclpy.shutdown() # if __name__ == '__main__': # main('nihao')