commit db35c7a2825338518c196aacda948e67ad9f97c7 Author: havoc420ubuntu <2993167370@qq.com> Date: Sun May 11 15:44:54 2025 +0000 :tada: diff --git a/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc b/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc new file mode 100644 index 0000000..257800f Binary files /dev/null and b/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc differ diff --git a/__pycache__/robot_control_response_lcmt.cpython-38.pyc b/__pycache__/robot_control_response_lcmt.cpython-38.pyc new file mode 100644 index 0000000..2c77023 Binary files /dev/null and b/__pycache__/robot_control_response_lcmt.cpython-38.pyc differ diff --git a/base_move/go_straight/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc b/base_move/go_straight/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc new file mode 100644 index 0000000..7677d1f Binary files /dev/null and b/base_move/go_straight/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc differ diff --git a/base_move/go_straight/__pycache__/robot_control_response_lcmt.cpython-38.pyc b/base_move/go_straight/__pycache__/robot_control_response_lcmt.cpython-38.pyc new file mode 100644 index 0000000..a37df86 Binary files /dev/null and b/base_move/go_straight/__pycache__/robot_control_response_lcmt.cpython-38.pyc differ diff --git a/base_move/go_straight/main.py b/base_move/go_straight/main.py new file mode 100644 index 0000000..980221d --- /dev/null +++ b/base_move/go_straight/main.py @@ -0,0 +1,110 @@ +''' +This demo show the communication interface of MR813 motion control board based on Lcm. +Dependency: +- robot_control_cmd_lcmt.py +- robot_control_response_lcmt.py +''' +import lcm +import sys +import os +import time +from threading import Thread, Lock + +from robot_control_cmd_lcmt import robot_control_cmd_lcmt +from robot_control_response_lcmt import robot_control_response_lcmt + +def main(): + Ctrl = Robot_Ctrl() + Ctrl.run() + msg = robot_control_cmd_lcmt() + try: + print("Recovery stand") + + msg.mode = 12 # Recovery stand + msg.gait_id = 0 + msg.life_count += 1 # Command will take effect when life_count update + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(12, 0) + + print("Go forward") + + msg.mode = 11 + msg.gait_id = 26 # 26 表示快速 trot 步态 + msg.vel_des = [1.0, 1.0, -1.0] + msg.duration = 2000 + msg.step_height = [0.06, 0.06] + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(11, 26) + + + except KeyboardInterrupt: + pass + Ctrl.quit() + sys.exit() + + +class Robot_Ctrl(object): + def __init__(self): + self.rec_thread = Thread(target=self.rec_responce) + self.send_thread = Thread(target=self.send_publish) + self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") + self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") + self.cmd_msg = robot_control_cmd_lcmt() + self.rec_msg = robot_control_response_lcmt() + self.send_lock = Lock() + self.delay_cnt = 0 + self.mode_ok = 0 + self.gait_ok = 0 + self.runing = 1 + + def run(self): + self.lc_r.subscribe("robot_control_response", self.msg_handler) + self.send_thread.start() + self.rec_thread.start() + + def msg_handler(self, channel, data): + self.rec_msg = robot_control_response_lcmt().decode(data) + if(self.rec_msg.order_process_bar >= 95): + self.mode_ok = self.rec_msg.mode + else: + self.mode_ok = 0 + + def rec_responce(self): + while self.runing: + self.lc_r.handle() + time.sleep( 0.002 ) + + def Wait_finish(self, mode, gait_id): + count = 0 + while self.runing and count < 2000: #10s + if self.mode_ok == mode and self.gait_ok == gait_id: + return True + else: + time.sleep(0.005) + count += 1 + + def send_publish(self): + while self.runing: + self.send_lock.acquire() + if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated + self.lc_s.publish("robot_control_cmd",self.cmd_msg.encode()) + self.delay_cnt = 0 + self.delay_cnt += 1 + self.send_lock.release() + time.sleep( 0.005 ) + + def Send_cmd(self, msg): + self.send_lock.acquire() + self.delay_cnt = 50 + self.cmd_msg = msg + self.send_lock.release() + + def quit(self): + self.runing = 0 + self.rec_thread.join() + self.send_thread.join() + +# Main function +if __name__ == '__main__': + main() diff --git a/base_move/go_straight/robot_control_cmd_lcmt.py b/base_move/go_straight/robot_control_cmd_lcmt.py new file mode 100644 index 0000000..addedac --- /dev/null +++ b/base_move/go_straight/robot_control_cmd_lcmt.py @@ -0,0 +1,149 @@ +# LCM type definitions This file automatically generated by lcm. +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_cmd_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "life_count", "vel_des", "rpy_des", "pos_des", "acc_des", "ctrl_point", "foot_pose", "step_height", "value", "duration"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "float", "float", "float", "float", "float", "float", "float", "int32_t", "int32_t"] + + __dimensions__ = [None, None, None, None, [3], [3], [3], [6], [3], [6], [2], None, None] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.life_count = 0 + self.vel_des = [ 0.0 for dim0 in range(3) ] + self.rpy_des = [ 0.0 for dim0 in range(3) ] + self.pos_des = [ 0.0 for dim0 in range(3) ] + self.acc_des = [ 0.0 for dim0 in range(6) ] + self.ctrl_point = [ 0.0 for dim0 in range(3) ] + self.foot_pose = [ 0.0 for dim0 in range(6) ] + self.step_height = [ 0.0 for dim0 in range(2) ] + self.value = 0 + self.duration = 0 + + def encode(self): + buf = BytesIO() + buf.write(robot_control_cmd_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbb", self.mode, self.gait_id, self.contact, self.life_count)) + buf.write(struct.pack('>3f', *self.vel_des[:3])) + buf.write(struct.pack('>3f', *self.rpy_des[:3])) + buf.write(struct.pack('>3f', *self.pos_des[:3])) + buf.write(struct.pack('>6f', *self.acc_des[:6])) + buf.write(struct.pack('>3f', *self.ctrl_point[:3])) + buf.write(struct.pack('>6f', *self.foot_pose[:6])) + buf.write(struct.pack('>2f', *self.step_height[:2])) + buf.write(struct.pack(">ii", self.value, self.duration)) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_cmd_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_cmd_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_cmd_lcmt() + self.mode, self.gait_id, self.contact, self.life_count = struct.unpack(">bbbb", buf.read(4)) + self.vel_des = struct.unpack('>3f', buf.read(12)) + self.rpy_des = struct.unpack('>3f', buf.read(12)) + self.pos_des = struct.unpack('>3f', buf.read(12)) + self.acc_des = struct.unpack('>6f', buf.read(24)) + self.ctrl_point = struct.unpack('>3f', buf.read(12)) + self.foot_pose = struct.unpack('>6f', buf.read(24)) + self.step_height = struct.unpack('>2f', buf.read(8)) + self.value, self.duration = struct.unpack(">ii", buf.read(8)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_cmd_lcmt in parents: return 0 + tmphash = (0x476b61e296af96f5) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_cmd_lcmt._packed_fingerprint is None: + robot_control_cmd_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_cmd_lcmt._get_hash_recursive([])) + return robot_control_cmd_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_cmd_lcmt._get_packed_fingerprint())[0] + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] diff --git a/base_move/go_straight/robot_control_response_lcmt.py b/base_move/go_straight/robot_control_response_lcmt.py new file mode 100644 index 0000000..cc61b8c --- /dev/null +++ b/base_move/go_straight/robot_control_response_lcmt.py @@ -0,0 +1,73 @@ +"""LCM type definitions +This file automatically generated by lcm. +DO NOT MODIFY BY HAND!!!! +""" + +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] + diff --git a/base_move/move/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc b/base_move/move/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc new file mode 100644 index 0000000..b526274 Binary files /dev/null and b/base_move/move/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc differ diff --git a/base_move/move/__pycache__/robot_control_response_lcmt.cpython-38.pyc b/base_move/move/__pycache__/robot_control_response_lcmt.cpython-38.pyc new file mode 100644 index 0000000..d3b0620 Binary files /dev/null and b/base_move/move/__pycache__/robot_control_response_lcmt.cpython-38.pyc differ diff --git a/base_move/move/main.py b/base_move/move/main.py new file mode 100644 index 0000000..5afe8f1 --- /dev/null +++ b/base_move/move/main.py @@ -0,0 +1,161 @@ +''' +This demo show the communication interface of MR813 motion control board based on Lcm. +Dependency: +- robot_control_cmd_lcmt.py +- robot_control_response_lcmt.py +''' +import lcm +import sys +import os +import time +from threading import Thread, Lock + +# For keyboard input +import tty +import termios +import sys + +from robot_control_cmd_lcmt import robot_control_cmd_lcmt +from robot_control_response_lcmt import robot_control_response_lcmt + + +def getch(): + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + + +def send_movement_command(ctrl, msg_template, direction): + msg = robot_control_cmd_lcmt() + msg.mode = 11 # Walk mode + msg.gait_id = 26 # Fast trot + msg.duration = 1000 # ms + msg.step_height = [0.06, 0.06] + msg.life_count = msg_template.life_count + 1 + + if direction == 'forward': + msg.vel_des = [1.0, 0.0, 0.0] + elif direction == 'backward': + msg.vel_des = [-1.0, 0.0, 0.0] + elif direction == 'left': + msg.vel_des = [0.0, 1.0, 0.0] + elif direction == 'right': + msg.vel_des = [0.0, -1.0, 0.0] + + ctrl.Send_cmd(msg) + print(f"Sent command: {direction.capitalize()}") + ctrl.Wait_finish(11, 26) + + +def main(): + Ctrl = Robot_Ctrl() + Ctrl.run() + msg = robot_control_cmd_lcmt() + + try: + print("Recovery stand") + msg.mode = 12 # Recovery stand + msg.gait_id = 0 + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(12, 0) + + print("\nReady to accept commands:") + print("Use W (Forward), S (Backward), A (Left), D (Right), Q (Quit)\n") + + while True: + char = getch().lower() + # char = 'w' + if char == 'w': + send_movement_command(Ctrl, msg, 'forward') + elif char == 's': + send_movement_command(Ctrl, msg, 'backward') + elif char == 'a': + send_movement_command(Ctrl, msg, 'left') + elif char == 'd': + send_movement_command(Ctrl, msg, 'right') + elif char == 'q': + print("\nExiting...") + break + else: + print("Invalid key pressed:", char) + + except KeyboardInterrupt: + pass + + Ctrl.quit() + sys.exit() + + +class Robot_Ctrl(object): + def __init__(self): + self.rec_thread = Thread(target=self.rec_responce) + self.send_thread = Thread(target=self.send_publish) + self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") + self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") + self.cmd_msg = robot_control_cmd_lcmt() + self.rec_msg = robot_control_response_lcmt() + self.send_lock = Lock() + self.delay_cnt = 0 + self.mode_ok = 0 + self.gait_ok = 0 + self.runing = 1 + + def run(self): + self.lc_r.subscribe("robot_control_response", self.msg_handler) + self.send_thread.start() + self.rec_thread.start() + + def msg_handler(self, channel, data): + self.rec_msg = robot_control_response_lcmt().decode(data) + if self.rec_msg.order_process_bar >= 95: + self.mode_ok = self.rec_msg.mode + else: + self.mode_ok = 0 + + def rec_responce(self): + while self.runing: + self.lc_r.handle() + time.sleep(0.002) + + def Wait_finish(self, mode, gait_id): + count = 0 + while self.runing and count < 2000: # 10s + if self.mode_ok == mode and self.gait_ok == gait_id: + return True + else: + time.sleep(0.005) + count += 1 + print("Wait_finish timeout.") + return False + + def send_publish(self): + while self.runing: + self.send_lock.acquire() + if self.delay_cnt > 20: # Heartbeat signal 10HZ + self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode()) + self.delay_cnt = 0 + self.delay_cnt += 1 + self.send_lock.release() + time.sleep(0.005) + + def Send_cmd(self, msg): + self.send_lock.acquire() + self.delay_cnt = 50 + self.cmd_msg = msg + self.send_lock.release() + + def quit(self): + self.runing = 0 + self.rec_thread.join() + self.send_thread.join() + + +# Main function +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/base_move/move/robot_control_cmd_lcmt.py b/base_move/move/robot_control_cmd_lcmt.py new file mode 100644 index 0000000..addedac --- /dev/null +++ b/base_move/move/robot_control_cmd_lcmt.py @@ -0,0 +1,149 @@ +# LCM type definitions This file automatically generated by lcm. +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_cmd_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "life_count", "vel_des", "rpy_des", "pos_des", "acc_des", "ctrl_point", "foot_pose", "step_height", "value", "duration"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "float", "float", "float", "float", "float", "float", "float", "int32_t", "int32_t"] + + __dimensions__ = [None, None, None, None, [3], [3], [3], [6], [3], [6], [2], None, None] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.life_count = 0 + self.vel_des = [ 0.0 for dim0 in range(3) ] + self.rpy_des = [ 0.0 for dim0 in range(3) ] + self.pos_des = [ 0.0 for dim0 in range(3) ] + self.acc_des = [ 0.0 for dim0 in range(6) ] + self.ctrl_point = [ 0.0 for dim0 in range(3) ] + self.foot_pose = [ 0.0 for dim0 in range(6) ] + self.step_height = [ 0.0 for dim0 in range(2) ] + self.value = 0 + self.duration = 0 + + def encode(self): + buf = BytesIO() + buf.write(robot_control_cmd_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbb", self.mode, self.gait_id, self.contact, self.life_count)) + buf.write(struct.pack('>3f', *self.vel_des[:3])) + buf.write(struct.pack('>3f', *self.rpy_des[:3])) + buf.write(struct.pack('>3f', *self.pos_des[:3])) + buf.write(struct.pack('>6f', *self.acc_des[:6])) + buf.write(struct.pack('>3f', *self.ctrl_point[:3])) + buf.write(struct.pack('>6f', *self.foot_pose[:6])) + buf.write(struct.pack('>2f', *self.step_height[:2])) + buf.write(struct.pack(">ii", self.value, self.duration)) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_cmd_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_cmd_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_cmd_lcmt() + self.mode, self.gait_id, self.contact, self.life_count = struct.unpack(">bbbb", buf.read(4)) + self.vel_des = struct.unpack('>3f', buf.read(12)) + self.rpy_des = struct.unpack('>3f', buf.read(12)) + self.pos_des = struct.unpack('>3f', buf.read(12)) + self.acc_des = struct.unpack('>6f', buf.read(24)) + self.ctrl_point = struct.unpack('>3f', buf.read(12)) + self.foot_pose = struct.unpack('>6f', buf.read(24)) + self.step_height = struct.unpack('>2f', buf.read(8)) + self.value, self.duration = struct.unpack(">ii", buf.read(8)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_cmd_lcmt in parents: return 0 + tmphash = (0x476b61e296af96f5) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_cmd_lcmt._packed_fingerprint is None: + robot_control_cmd_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_cmd_lcmt._get_hash_recursive([])) + return robot_control_cmd_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_cmd_lcmt._get_packed_fingerprint())[0] + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] diff --git a/base_move/move/robot_control_response_lcmt.py b/base_move/move/robot_control_response_lcmt.py new file mode 100644 index 0000000..cc61b8c --- /dev/null +++ b/base_move/move/robot_control_response_lcmt.py @@ -0,0 +1,73 @@ +"""LCM type definitions +This file automatically generated by lcm. +DO NOT MODIFY BY HAND!!!! +""" + +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] + diff --git a/main.py b/main.py new file mode 100644 index 0000000..66abbd8 --- /dev/null +++ b/main.py @@ -0,0 +1,113 @@ +''' +This demo show the communication interface of MR813 motion control board based on Lcm. +Dependency: +- robot_control_cmd_lcmt.py +- robot_control_response_lcmt.py +''' +import lcm +import sys +import os +import time +from threading import Thread, Lock + +# For keyboard input +import tty +import termios +import sys + +from robot_control_cmd_lcmt import robot_control_cmd_lcmt +from robot_control_response_lcmt import robot_control_response_lcmt + +from task_1.task_1 import run_task_1 + +def main(): + Ctrl = Robot_Ctrl() + Ctrl.run() + msg = robot_control_cmd_lcmt() + + try: + print("Recovery stand") + msg.mode = 12 # Recovery stand + msg.gait_id = 0 + msg.life_count += 1 + Ctrl.Send_cmd(msg) + Ctrl.Wait_finish(12, 0) + + # run_task_1(Ctrl, msg) + + time.sleep(100) + + except KeyboardInterrupt: + pass + + Ctrl.quit() + sys.exit() + + +class Robot_Ctrl(object): + def __init__(self): + self.rec_thread = Thread(target=self.rec_responce) + self.send_thread = Thread(target=self.send_publish) + self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") + self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") + self.cmd_msg = robot_control_cmd_lcmt() + self.rec_msg = robot_control_response_lcmt() + self.send_lock = Lock() + self.delay_cnt = 0 + self.mode_ok = 0 + self.gait_ok = 0 + self.runing = 1 + + def run(self): + self.lc_r.subscribe("robot_control_response", self.msg_handler) + self.send_thread.start() + self.rec_thread.start() + + def msg_handler(self, channel, data): + self.rec_msg = robot_control_response_lcmt().decode(data) + if self.rec_msg.order_process_bar >= 95: + self.mode_ok = self.rec_msg.mode + else: + self.mode_ok = 0 + + def rec_responce(self): + while self.runing: + self.lc_r.handle() + time.sleep(0.002) + + def Wait_finish(self, mode, gait_id): + count = 0 + while self.runing and count < 2000: # 10s + if self.mode_ok == mode and self.gait_ok == gait_id: + return True + else: + time.sleep(0.005) + count += 1 + print("Wait_finish timeout.") + return False + + def send_publish(self): + while self.runing: + self.send_lock.acquire() + if self.delay_cnt > 20: # Heartbeat signal 10HZ + self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode()) + self.delay_cnt = 0 + self.delay_cnt += 1 + self.send_lock.release() + time.sleep(0.005) + + def Send_cmd(self, msg): + self.send_lock.acquire() + self.delay_cnt = 50 + self.cmd_msg = msg + self.send_lock.release() + + def quit(self): + self.runing = 0 + self.rec_thread.join() + self.send_thread.join() + + +# Main function +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/robot_control_cmd_lcmt.py b/robot_control_cmd_lcmt.py new file mode 100644 index 0000000..addedac --- /dev/null +++ b/robot_control_cmd_lcmt.py @@ -0,0 +1,149 @@ +# LCM type definitions This file automatically generated by lcm. +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_cmd_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "life_count", "vel_des", "rpy_des", "pos_des", "acc_des", "ctrl_point", "foot_pose", "step_height", "value", "duration"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "float", "float", "float", "float", "float", "float", "float", "int32_t", "int32_t"] + + __dimensions__ = [None, None, None, None, [3], [3], [3], [6], [3], [6], [2], None, None] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.life_count = 0 + self.vel_des = [ 0.0 for dim0 in range(3) ] + self.rpy_des = [ 0.0 for dim0 in range(3) ] + self.pos_des = [ 0.0 for dim0 in range(3) ] + self.acc_des = [ 0.0 for dim0 in range(6) ] + self.ctrl_point = [ 0.0 for dim0 in range(3) ] + self.foot_pose = [ 0.0 for dim0 in range(6) ] + self.step_height = [ 0.0 for dim0 in range(2) ] + self.value = 0 + self.duration = 0 + + def encode(self): + buf = BytesIO() + buf.write(robot_control_cmd_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbb", self.mode, self.gait_id, self.contact, self.life_count)) + buf.write(struct.pack('>3f', *self.vel_des[:3])) + buf.write(struct.pack('>3f', *self.rpy_des[:3])) + buf.write(struct.pack('>3f', *self.pos_des[:3])) + buf.write(struct.pack('>6f', *self.acc_des[:6])) + buf.write(struct.pack('>3f', *self.ctrl_point[:3])) + buf.write(struct.pack('>6f', *self.foot_pose[:6])) + buf.write(struct.pack('>2f', *self.step_height[:2])) + buf.write(struct.pack(">ii", self.value, self.duration)) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_cmd_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_cmd_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_cmd_lcmt() + self.mode, self.gait_id, self.contact, self.life_count = struct.unpack(">bbbb", buf.read(4)) + self.vel_des = struct.unpack('>3f', buf.read(12)) + self.rpy_des = struct.unpack('>3f', buf.read(12)) + self.pos_des = struct.unpack('>3f', buf.read(12)) + self.acc_des = struct.unpack('>6f', buf.read(24)) + self.ctrl_point = struct.unpack('>3f', buf.read(12)) + self.foot_pose = struct.unpack('>6f', buf.read(24)) + self.step_height = struct.unpack('>2f', buf.read(8)) + self.value, self.duration = struct.unpack(">ii", buf.read(8)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_cmd_lcmt in parents: return 0 + tmphash = (0x476b61e296af96f5) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_cmd_lcmt._packed_fingerprint is None: + robot_control_cmd_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_cmd_lcmt._get_hash_recursive([])) + return robot_control_cmd_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_cmd_lcmt._get_packed_fingerprint())[0] + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] diff --git a/robot_control_response_lcmt.py b/robot_control_response_lcmt.py new file mode 100644 index 0000000..cc61b8c --- /dev/null +++ b/robot_control_response_lcmt.py @@ -0,0 +1,73 @@ +"""LCM type definitions +This file automatically generated by lcm. +DO NOT MODIFY BY HAND!!!! +""" + +try: + import cStringIO.StringIO as BytesIO +except ImportError: + from io import BytesIO +import struct + +class robot_control_response_lcmt(object): + __slots__ = ["mode", "gait_id", "contact", "order_process_bar", "switch_status", "ori_error", "footpos_error", "motor_error"] + + __typenames__ = ["int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int8_t", "int16_t", "int32_t"] + + __dimensions__ = [None, None, None, None, None, None, None, [12]] + + def __init__(self): + self.mode = 0 + self.gait_id = 0 + self.contact = 0 + self.order_process_bar = 0 + self.switch_status = 0 + self.ori_error = 0 + self.footpos_error = 0 + self.motor_error = [ 0 for dim0 in range(12) ] + + def encode(self): + buf = BytesIO() + buf.write(robot_control_response_lcmt._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf): + buf.write(struct.pack(">bbbbbbh", self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error)) + buf.write(struct.pack('>12i', *self.motor_error[:12])) + + def decode(data): + if hasattr(data, 'read'): + buf = data + else: + buf = BytesIO(data) + if buf.read(8) != robot_control_response_lcmt._get_packed_fingerprint(): + raise ValueError("Decode error") + return robot_control_response_lcmt._decode_one(buf) + decode = staticmethod(decode) + + def _decode_one(buf): + self = robot_control_response_lcmt() + self.mode, self.gait_id, self.contact, self.order_process_bar, self.switch_status, self.ori_error, self.footpos_error = struct.unpack(">bbbbbbh", buf.read(8)) + self.motor_error = struct.unpack('>12i', buf.read(48)) + return self + _decode_one = staticmethod(_decode_one) + + def _get_hash_recursive(parents): + if robot_control_response_lcmt in parents: return 0 + tmphash = (0x485da98216eda8c7) & 0xffffffffffffffff + tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff + return tmphash + _get_hash_recursive = staticmethod(_get_hash_recursive) + _packed_fingerprint = None + + def _get_packed_fingerprint(): + if robot_control_response_lcmt._packed_fingerprint is None: + robot_control_response_lcmt._packed_fingerprint = struct.pack(">Q", robot_control_response_lcmt._get_hash_recursive([])) + return robot_control_response_lcmt._packed_fingerprint + _get_packed_fingerprint = staticmethod(_get_packed_fingerprint) + + def get_hash(self): + """Get the LCM hash of the struct""" + return struct.unpack(">Q", robot_control_response_lcmt._get_packed_fingerprint())[0] + diff --git a/task_1/__pycache__/task_1.cpython-38.pyc b/task_1/__pycache__/task_1.cpython-38.pyc new file mode 100644 index 0000000..94e0eb9 Binary files /dev/null and b/task_1/__pycache__/task_1.cpython-38.pyc differ diff --git a/task_1/task_1.py b/task_1/task_1.py new file mode 100644 index 0000000..f5860b9 --- /dev/null +++ b/task_1/task_1.py @@ -0,0 +1,39 @@ +import time + + +def run_task_1(ctrl, msg): + print('Running task 1...') + + # 右前方 + msg.mode = 11 + msg.gait_id = 26 # 26 表示快速 trot 步态 + msg.vel_des = [0.5, 0.5, -1.0] + msg.duration = 1800 + msg.step_height = [0.06, 0.06] + msg.life_count += 1 + ctrl.Send_cmd(msg) + time.sleep(1.8) + + msg.mode = 11 + msg.gait_id = 26 + msg.vel_des = [1, 0, 0] + msg.duration = 200 + msg.life_count += 1 + ctrl.Send_cmd(msg) + time.sleep(0.2) + + # TAG take photo + # while True: + # pass + + # msg.mode = 11 + # msg.gait_id = 26 + # msg.vel_des = [1, 0, 0] + # msg.duration = 1000 + # msg.life_count += 1 + # ctrl.Send_cmd(msg) + # time.sleep(1.0) + + + + diff --git a/utils/image_raw.py b/utils/image_raw.py new file mode 100644 index 0000000..854aea6 --- /dev/null +++ b/utils/image_raw.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import Image +from cv_bridge import CvBridge +import cv2 +from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy + +class ImageSubscriber(Node): + def __init__(self): + super().__init__('image_subscriber') + # 定义 QoS 配置(匹配发布者的可靠性策略) + qos_profile = QoSProfile( + reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT,取决于发布者 + history=QoSHistoryPolicy.KEEP_LAST, + depth=10 + ) + + self.subscription = self.create_subscription( + Image, + '/rgb_camera/image_raw', + self.image_callback, + qos_profile=qos_profile + ) + self.subscription # 防止未使用变量警告 + self.bridge = CvBridge() + self.cv_image = None # Store latest image + + def image_callback(self, msg): + try: + # 将ROS图像消息转换为OpenCV格式 + self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') + + except Exception as e: + self.get_logger().error('Failed to convert image: %s' % str(e)) + +class ImageProcessor: + def __init__(self): + rclpy.init() + self.image_subscriber = ImageSubscriber() + + def run(self): + try: + rclpy.spin(self.image_subscriber) + except KeyboardInterrupt: + pass + + self.image_subscriber.destroy_node() + rclpy.shutdown() + + def get_current_image(self): + return self.image_subscriber.cv_image + +""" DEBUG """ + +def main(args=None): + rclpy.init(args=args) + image_subscriber = ImageSubscriber() + + try: + rclpy.spin(image_subscriber) + except KeyboardInterrupt: + pass + + # 清理 + image_subscriber.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() \ No newline at end of file