2025-08-18 11:06:42 +08:00

134 lines
3.8 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
This demo show the communication interface of MR813 motion control board based on Lcm.
Dependency:
- robot_control_cmd_lcmt.py
- robot_control_response_lcmt.py
'''
import lcm
import sys
import os
import time
from threading import Thread, Lock
from robot_control_cmd_lcmt import robot_control_cmd_lcmt
from robot_control_response_lcmt import robot_control_response_lcmt
def main():
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
print("Recovery stand")
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
Ctrl.Send_cmd(msg)
Ctrl.Wait_finish(12, 0)
# def circle_90(ctrl, msg, direction="right"):
# msg.mode = 11
# msg.gait_id = 26
# msg.duration = 1000
# msg.vel_des = [0.0, 0.0, -1.9 if direction == "right" else 1]
# msg.life_count += 1
# ctrl.Send_cmd(msg)
# time.sleep(1)
# circle_90(Ctrl, msg)
# return True
# TAG 设置机器人头部朝下
# msg.mode = 21 # 位置插值控制
# msg.gait_id = 0
# # msg.rpy_des = [0, -0.3, 0] # 头部朝下
# msg.rpy_des = [0, 0, 0]
# msg.pos_des = [0, 0, 0.2]
# msg.duration = 400 # 期望执行时间0.3秒
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(1)
print("Go forward")
msg.mode = 11
msg.gait_id = 3
msg.vel_des = [1, 0, 0]
# msg.rpy_des = [0, -0.3, 0]
msg.duration = 20000
msg.step_height = [0.03, 0.03]
msg.life_count += 1
Ctrl.Send_cmd(msg)
Ctrl.Wait_finish(11, msg.gait_id)
except KeyboardInterrupt:
pass
Ctrl.quit()
sys.exit()
class Robot_Ctrl(object):
def __init__(self):
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.send_thread.start()
self.rec_thread.start()
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
if(self.rec_msg.order_process_bar >= 95):
self.mode_ok = self.rec_msg.mode
else:
self.mode_ok = 0
def rec_responce(self):
while self.runing:
self.lc_r.handle()
time.sleep( 0.002 )
def Wait_finish(self, mode, gait_id):
count = 0
while self.runing and count < 2000: #10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated
self.lc_s.publish("robot_control_cmd",self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep( 0.005 )
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.send_lock.release()
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
# Main function
if __name__ == '__main__':
main()