mi-task/examples/tests/ping-test.py
2025-07-05 22:41:38 +08:00

66 lines
2.2 KiB
Python

import lcm
import time
import threading
from robot_control_response_lcmt import robot_control_response_lcmt
class RobotConnectionManager:
def __init__(self):
self.is_connected = False
self.last_heartbeat = 0
self.connection_timeout = 5.0 # 5秒超时
def connect_robot(self):
"""连接机器狗"""
try:
# 尝试连接
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
# 订阅状态消息
self.lc_r.subscribe("robot_control_response", self.on_robot_response)
# 启动心跳检测
self.heartbeat_thread = threading.Thread(target=self.heartbeat_monitor)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
print("✅ 机器狗连接成功")
return True
except Exception as e:
print(f"❌ 机器狗连接失败: {e}")
return False
def on_robot_response(self, channel, data):
"""接收机器狗响应"""
msg = robot_control_response_lcmt().decode(data)
self.is_connected = True
self.last_heartbeat = time.time()
print(f"📡 收到机器狗响应: 模式={msg.mode}, 步态={msg.gait_id}")
def heartbeat_monitor(self):
"""心跳监测"""
while True:
self.lc_r.handle_timeout(1000)
# 检查连接状态
if time.time() - self.last_heartbeat > self.connection_timeout:
if self.is_connected:
print("⚠️ 机器狗连接超时")
self.is_connected = False
def get_connection_status(self):
"""获取连接状态"""
return {
'connected': self.is_connected,
'last_heartbeat': self.last_heartbeat,
'time_since_last': time.time() - self.last_heartbeat
}
# 使用示例
robot_manager = RobotConnectionManager()
if robot_manager.connect_robot():
while True:
status = robot_manager.get_connection_status()
print(f"连接状态: {status}")
time.sleep(1)