diff --git a/examples/basic_motion/main.py b/examples/basic_motion/main.py index 1d9a52b..3e7d78f 100644 --- a/examples/basic_motion/main.py +++ b/examples/basic_motion/main.py @@ -6,7 +6,6 @@ Dependency: ''' import lcm import sys -import os import time from threading import Thread, Lock @@ -17,6 +16,8 @@ def main(): Ctrl = Robot_Ctrl() Ctrl.run() msg = robot_control_cmd_lcmt() + msg.life_count = 0 # Initialize life_count to ensure command updates + try: print("Recovery stand") @@ -26,19 +27,19 @@ def main(): Ctrl.Send_cmd(msg) Ctrl.Wait_finish(12, 0) - # print("Shake hand") - # msg.mode = 62 # Shake hand, based on position interpolation control - # msg.gait_id = 2 - # msg.life_count += 1 - # Ctrl.Send_cmd(msg) - # Ctrl.Wait_finish(62, 2) + print("Shake hand") + msg.mode = 62 # Shake hand, based on position interpolation control + msg.gait_id = 2 + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(62, 2) - # print("Twoleg Stand") - # msg.mode = 64 # Twoleg Stand - # msg.gait_id = 0 - # msg.life_count += 1 - # Ctrl.Send_cmd(msg) - # Ctrl.Wait_finish(64, 0) + print("Twoleg Stand") + msg.mode = 64 # Twoleg Stand + msg.gait_id = 0 + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(64, 0) print("Head up") @@ -83,13 +84,13 @@ def main(): Ctrl.Send_cmd(msg) time.sleep( 5 ) - # print("Pure damper") + print("Pure damper") - # msg.mode = 7 # PureDamper - # msg.gait_id = 0 - # msg.life_count += 1 - # Ctrl.Send_cmd(msg) - # Ctrl.Wait_finish(7, 0) + msg.mode = 7 # PureDamper + msg.gait_id = 0 + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(7, 0) except KeyboardInterrupt: pass @@ -142,6 +143,19 @@ class Robot_Ctrl(object): self.send_lock.acquire() if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated self.lc_s.publish("robot_control_cmd",self.cmd_msg.encode()) + # 解析并格式化 cmd_msg 内容 + parsed_msg = ( + f"Heartbeat sent:\n" + f" mode: {self.cmd_msg.mode}\n" + f" gait_id: {self.cmd_msg.gait_id}\n" + f" life_count: {self.cmd_msg.life_count}\n" + f" rpy_des: {self.cmd_msg.rpy_des}\n" + f" pos_des: {self.cmd_msg.pos_des}\n" + f" vel_des: {self.cmd_msg.vel_des}\n" + f" step_height: {self.cmd_msg.step_height}\n" + f" duration: {self.cmd_msg.duration}" + ) + print(parsed_msg) # 格式化输出 cmd_msg self.delay_cnt = 0 self.delay_cnt += 1 self.send_lock.release() diff --git a/examples/tests/ping-test.py b/examples/tests/ping-test.py new file mode 100644 index 0000000..34c3f3f --- /dev/null +++ b/examples/tests/ping-test.py @@ -0,0 +1,66 @@ +import lcm +import time +import threading +from robot_control_response_lcmt import robot_control_response_lcmt + +class RobotConnectionManager: + def __init__(self): + self.is_connected = False + self.last_heartbeat = 0 + self.connection_timeout = 5.0 # 5秒超时 + + def connect_robot(self): + """连接机器狗""" + try: + # 尝试连接 + self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") + self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") + + # 订阅状态消息 + self.lc_r.subscribe("robot_control_response", self.on_robot_response) + + # 启动心跳检测 + self.heartbeat_thread = threading.Thread(target=self.heartbeat_monitor) + self.heartbeat_thread.daemon = True + self.heartbeat_thread.start() + + print("✅ 机器狗连接成功") + return True + + except Exception as e: + print(f"❌ 机器狗连接失败: {e}") + return False + + def on_robot_response(self, channel, data): + """接收机器狗响应""" + msg = robot_control_response_lcmt().decode(data) + self.is_connected = True + self.last_heartbeat = time.time() + print(f"📡 收到机器狗响应: 模式={msg.mode}, 步态={msg.gait_id}") + + def heartbeat_monitor(self): + """心跳监测""" + while True: + self.lc_r.handle_timeout(1000) + + # 检查连接状态 + if time.time() - self.last_heartbeat > self.connection_timeout: + if self.is_connected: + print("⚠️ 机器狗连接超时") + self.is_connected = False + + def get_connection_status(self): + """获取连接状态""" + return { + 'connected': self.is_connected, + 'last_heartbeat': self.last_heartbeat, + 'time_since_last': time.time() - self.last_heartbeat + } + +# 使用示例 +robot_manager = RobotConnectionManager() +if robot_manager.connect_robot(): + while True: + status = robot_manager.get_connection_status() + print(f"连接状态: {status}") + time.sleep(1) \ No newline at end of file diff --git a/examples/tests/robot_control_response_lcmt.py b/examples/tests/robot_control_response_lcmt.py new file mode 100644 index 0000000..cc61b8c --- /dev/null +++ b/examples/tests/robot_control_response_lcmt.py @@ -0,0 +1,73 @@ +"""LCM type definitions +This file automatically generated by lcm. +DO NOT MODIFY BY HAND!!!! +""" + +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] + diff --git a/test/plugins/qrcode/qreader-test.py b/test/plugins/qrcode/qreader-test.py index 2a87b41..dee90a4 100644 --- a/test/plugins/qrcode/qreader-test.py +++ b/test/plugins/qrcode/qreader-test.py @@ -5,7 +5,7 @@ import cv2 qreader = QReader() # 读取包含二维码的图像 -image = cv2.cvtColor(cv2.imread("captured_images/qrcode-2/image_20250511_103244.png"), cv2.COLOR_BGR2RGB) +image = cv2.cvtColor(cv2.imread("./qrcode-A1.jpg"), cv2.COLOR_BGR2RGB) # 检测并解码二维码 decoded_text = qreader.detect_and_decode(image=image)