66 lines
2.2 KiB
Python
66 lines
2.2 KiB
Python
import lcm
|
|
import time
|
|
import threading
|
|
from robot_control_response_lcmt import robot_control_response_lcmt
|
|
|
|
class RobotConnectionManager:
|
|
def __init__(self):
|
|
self.is_connected = False
|
|
self.last_heartbeat = 0
|
|
self.connection_timeout = 5.0 # 5秒超时
|
|
|
|
def connect_robot(self):
|
|
"""连接机器狗"""
|
|
try:
|
|
# 尝试连接
|
|
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
|
|
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
|
|
|
|
# 订阅状态消息
|
|
self.lc_r.subscribe("robot_control_response", self.on_robot_response)
|
|
|
|
# 启动心跳检测
|
|
self.heartbeat_thread = threading.Thread(target=self.heartbeat_monitor)
|
|
self.heartbeat_thread.daemon = True
|
|
self.heartbeat_thread.start()
|
|
|
|
print("✅ 机器狗连接成功")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ 机器狗连接失败: {e}")
|
|
return False
|
|
|
|
def on_robot_response(self, channel, data):
|
|
"""接收机器狗响应"""
|
|
msg = robot_control_response_lcmt().decode(data)
|
|
self.is_connected = True
|
|
self.last_heartbeat = time.time()
|
|
print(f"📡 收到机器狗响应: 模式={msg.mode}, 步态={msg.gait_id}")
|
|
|
|
def heartbeat_monitor(self):
|
|
"""心跳监测"""
|
|
while True:
|
|
self.lc_r.handle_timeout(1000)
|
|
|
|
# 检查连接状态
|
|
if time.time() - self.last_heartbeat > self.connection_timeout:
|
|
if self.is_connected:
|
|
print("⚠️ 机器狗连接超时")
|
|
self.is_connected = False
|
|
|
|
def get_connection_status(self):
|
|
"""获取连接状态"""
|
|
return {
|
|
'connected': self.is_connected,
|
|
'last_heartbeat': self.last_heartbeat,
|
|
'time_since_last': time.time() - self.last_heartbeat
|
|
}
|
|
|
|
# 使用示例
|
|
robot_manager = RobotConnectionManager()
|
|
if robot_manager.connect_robot():
|
|
while True:
|
|
status = robot_manager.get_connection_status()
|
|
print(f"连接状态: {status}")
|
|
time.sleep(1) |