508 lines
20 KiB
Python
Raw Normal View History

2025-05-13 09:09:54 +00:00
'''
This demo show the communication interface of MR813 motion control board based on Lcm.
Dependency:
- robot_control_cmd_lcmt.py
- robot_control_response_lcmt.py
before running:
cd /home/cyberdog_sim
source /opt/ros/galactic/setup.bash
实现闭环感知直角拐弯
Ctrl 类中
detect_lines 是扫描直线的函数
odo_changeback 通过读里程计和转向实现朝向一个特定方向
side_equal用来对齐两侧直线的距离
walk_forward 输入一个期望保留的距离然后通过直线前进到达位置使狗前方能剩下预定距离
参数还调得不算好尤其walk_forward函数里面的一个常参可能需要多跑选更加合适的一个值
'''
import lcm
import sys
import os
import time
from threading import Thread, Lock
from robot_control_cmd_lcmt import robot_control_cmd_lcmt
from robot_control_response_lcmt import robot_control_response_lcmt
from localization_lcmt import localization_lcmt
import toml
import copy
import math
from file_send_lcmt import file_send_lcmt
import rclpy
import numpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from rclpy.qos import QoSProfile, qos_profile_sensor_data
from pprint import pprint
# 自定义步态发送数据的结构
robot_cmd = {
'mode': 0, 'gait_id': 0, 'contact': 0, 'life_count': 0,
'vel_des': [0.0, 0.0, 0.0],
'rpy_des': [0.0, 0.0, 0.0],
'pos_des': [0.0, 0.0, 0.0],
'acc_des': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point': [0.0, 0.0, 0.0],
'foot_pose': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height': [0.0, 0.0],
'value': 0, 'duration': 0
}
# 读取到的运动数据
# 使用 self.rec_msg.xxx 代替motion_rec 如:
# motion_rec['mode'] 替换为 self.rec_msg.mode
motion_rec = {
'mode': 0,
'gait_id': 0,
'contact': 0,
'order_process_bar': 0,
'switch_status': 0,
'ori_error': 0,
'footpos_error': 0,
'motor_error': (0) * 12,
}
# 里程计数据
# 使用 self.odo_msg.xxx 代替odo_rec 如:
# odo_rec['xyz'] 替换为 self.odo_msg.xyz
odo_rec = {
'xyz': (0.0, 0.0, 0.0),
'vxyz': (0.0, 0.0, 0.0),
'rpy': (0.0, 0.0, 0.0),
'omegaBody': (0.0, 0.0, 0.0),
'vBody': (0.0, 0.0, 0.0),
'timestamp': 0
}
# 雷达数据 data 是 180个的list 每个度数一个点
laser_rec = {
'data': [0] * 180
}
def main():
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
Ctrl.Send_cmd(msg)
Ctrl.Wait_finish(12, 0)
# print('init',Ctrl.odo_msg.rpy[2])
# Ctrl.odo_changeback(-0.785,msg=msg)
Ctrl.odo_changeback(0, msg=msg)
# time.sleep(2)
# lines = Ctrl.detect_lines()
Ctrl.side_equal(msg=msg)
# dist = round(abs(lines[0][1]) / ((lines[0][0] * lines[0][0] + 1) ** 0.5),2)
# print('initline',dist)
# dist = round(abs(lines[2][1]) / ((lines[2][0] * lines[2][0] + 1) ** 0.5),2)
# print('initline',dist)
# print(Ctrl.odo_msg.rpy[2])
time.sleep(0.2)
Ctrl.walk_forward(msg=msg, left_dist=0.70)
time.sleep(0.2)
Ctrl.odo_changeback(target=1.5708, msg=msg)
time.sleep(0.2)
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.6, 0, 0]
msg.duration = 3000
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
Ctrl.Wait_finish(11, 27)
time.sleep(60)
except KeyboardInterrupt:
pass
Ctrl.quit()
sys.exit()
class LaserScanSubscriber(Node):
def __init__(self):
super().__init__('laser_scan_subscriber')
# 使用SensorDataQoS作为QoS配置文件
qos_profile = qos_profile_sensor_data # 这就是SensorDataQoS
self.subscription = self.create_subscription(
LaserScan,
'scan', # 替换为你的激光雷达数据发布的话题名
self.listener_callback,
qos_profile)
def listener_callback(self, msg):
# self.get_logger().info('Received laser scan message:')
# 在这里处理激光雷达数据,例如打印出范围的最大值
global laser_rec
laser_rec['data'] = msg.ranges.tolist()
# print(type(laser_rec['data']))
# print(laser_rec['data'])
# print(odo_rec['rpy'][2])
time.sleep(3)
class Robot_Ctrl(object):
def __init__(self):
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
self.laser_scan_data = [0] * 180
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") ###里程计
# self.handle_lock = Lock() ###当前读数据只允许一个handle
self.lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.usergait_msg = file_send_lcmt()
rclpy.init(args=None)
self.laser_scan_subscriber = LaserScanSubscriber()
self.laser_thread = Thread(target=self.laser_spin_func, kwargs={'subscriber': self.laser_scan_subscriber})
def laser_spin_func(self, subscriber):
rclpy.spin(subscriber)
time.sleep(0.2)
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o) ###里程计订阅话题
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start() ###启动里程计
self.laser_thread.start()
def msg_handler(self, channel, data): # 运动反馈解码函数。
self.rec_msg = robot_control_response_lcmt().decode(data)
if (self.rec_msg.order_process_bar >= 95):
self.mode_ok = self.rec_msg.mode
# motion_rec['mode'] = self.rec_msg.mode
# motion_rec['gait_id'] = self.rec_msg.gait_id
# motion_rec['contact'] = self.rec_msg.contact
# motion_rec['order_process_bar'] = self.rec_msg.order_process_bar
# motion_rec['switch_status'] = self.rec_msg.switch_status
# motion_rec['ori_error'] =self.rec_msg.ori_error
# motion_rec['footpos_error'] = self.rec_msg.footpos_error
# motion_rec['motor_error'] = self.rec_msg.motor_error
# print(motion_rec)
else:
self.mode_ok = 0
def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用
self.odo_msg = localization_lcmt().decode(data)
# odo_rec['xyz'] = self.odo_msg.xyz
# odo_rec['vxyz'] = self.odo_msg.vxyz
# odo_rec['rpy'] = self.odo_msg.rpy
# odo_rec['omegaBody'] = self.odo_msg.omegaBody
# odo_rec['vBody'] = self.odo_msg.vBody
# odo_rec['timestamp'] = self.odo_msg.timestamp
# print(odo_rec)
def rec_responce(self):
while self.runing:
# self.handle_lock.acquire()
self.lc_r.handle()
# self.handle_lock.release()
time.sleep(0.002)
def rec_responce_o(self): ###里程计函数调用msg_hadler_o
while self.runing:
# self.handle_lock.acquire()
self.lc_o.handle()
# self.handle_lock.release()
time.sleep(0.002)
# 探测直线的时候需要的二分查找
def bio_select(self, k, b, start, end, dist_limit, data_with_order_x, data_with_order_y):
left, right = start, end - 1
# 二分查找
while left < right:
mid = (left + right) // 2
x_mid, y_mid = data_with_order_x[mid], data_with_order_y[mid]
# 计算点到直线的距离
distance = abs(k * x_mid - y_mid + b) / (k ** 2 + 1) ** 0.5
if distance <= dist_limit:
# 点在直线附近,继续向右搜索
left = mid + 1
else:
# 点不在直线附近,缩小搜索范围
right = mid
# 返回直线附近点的范围
return left
def detect_lines(self): # data 180个的数组
dist_limit = 0.10
init_points = 5
data_with_order_x = []
data_with_order_y = []
data = laser_rec['data']
# degree = self.odo_msg.rpy[2]
# 过滤无效值
for i in range(180):
if data[i] < 0.12 or data[i] > 12:
continue
# data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离
data_with_order_x.append(data[i] * math.cos(math.radians(i - 90)))
data_with_order_y.append(data[i] * math.sin(math.radians(i - 90))) # x y 轴
len_filtered = len(data_with_order_x)
# print(len(data_with_order_x))
# for i in range(0, len(data_with_order_x)):
# print('(', end='')
# print(data_with_order_x[i], data_with_order_y[i], sep=' ,', end='')
# print('),', end='\n')
# 储存直线方程
lines = []
# 拐动位置
corners = [0]
while corners[-1] < len_filtered - init_points - 1:
start = corners[-1]
k1_init, b_init = numpy.polyfit(data_with_order_x[start:start + init_points],
data_with_order_y[start:start + init_points], 1)
# print(k1_init, b_init)
# 二分查找 end 是范围内最后一个点
end = self.bio_select(k1_init, b_init, start, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
# 允许一次偏差出现此时end + 1 被击毙了,用现有的start - end 再拟合新直线,看 end + 1 还在不在外边,
# 如果还在外边,大概率是救不回了
# 在里面还可以二分一下
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end], data_with_order_y[start:end], 1)
dist_bias = (abs(k1_fixed * data_with_order_x[end] - data_with_order_y[end] + b1_fixed) /
(k1_fixed ** 2 + 1) ** 0.5)
if dist_bias < dist_limit:
end_new = self.bio_select(k1_fixed, b1_fixed, end, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end_new],
data_with_order_y[start:end_new], 1)
else:
end_new = end
corners.append(end_new)
# print(end)
lines.append([k1_fixed, b1_fixed])
if len(lines) >= 3:
# print(corners)
break
# for i in lines:
# print(f'{i[0]}x - y + {i[2]} = 0')
# print(f'[{i[0]}, {i[1]}],')
return lines
def selaction(self):
try:
self.send_lock.acquire()
steps = toml.load("Gait_Params_moonwalk.toml")
full_steps = {'step': [robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(
i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(
i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k = k + 1
f = open("Gait_Params_moonwalk_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
file_obj_gait_def = open("Gait_Def_moonwalk.toml", 'r')
file_obj_gait_params = open("Gait_Params_moonwalk_full.toml", 'r')
self.usergait_msg.data = file_obj_gait_def.read()
self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode())
time.sleep(0.5)
self.usergait_msg.data = file_obj_gait_params.read()
self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
user_gait_list = open("Usergait_List.toml", 'r')
steps = toml.load(user_gait_list)
for step in steps['step']:
self.cmd_msg.mode = step['mode']
self.cmd_msg.value = step['value']
self.cmd_msg.contact = step['contact']
self.cmd_msg.gait_id = step['gait_id']
self.cmd_msg.duration = step['duration']
self.cmd_msg.life_count += 1
for i in range(3):
self.cmd_msg.vel_des[i] = step['vel_des'][i]
self.cmd_msg.rpy_des[i] = step['rpy_des'][i]
self.cmd_msg.pos_des[i] = step['pos_des'][i]
self.cmd_msg.acc_des[i] = step['acc_des'][i]
self.cmd_msg.acc_des[i + 3] = step['acc_des'][i + 3]
self.cmd_msg.foot_pose[i] = step['foot_pose'][i]
self.cmd_msg.ctrl_point[i] = step['ctrl_point'][i]
for i in range(2):
self.cmd_msg.step_height[i] = step['step_height'][i]
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
time.sleep(0.1)
for i in range(75): # 15s Heat beat It is used to maintain the heartbeat when life count is not updated
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
time.sleep(0.2)
self.send_lock.release()
except KeyboardInterrupt:
self.cmd_msg.mode = 7 # PureDamper before KeyboardInterrupt:
self.cmd_msg.gait_id = 0
self.cmd_msg.duration = 0
self.cmd_msg.life_count += 1
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
self.send_lock.release()
pass
def odo_changeback(self, target, msg, limit=0.05): ##允许误差0.05弧度大概是2.86度
const_int = 2400 # 对 差的角度 进行倍数放大 控制旋转时间
dist = target - self.odo_msg.rpy[2]
while abs(dist) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0, 0.5 if dist > 0 else -0.5] # 转向
msg.duration = int(const_int * abs(dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(11, 26)
time.sleep(0.2)
# print('1 times finish angle',self.odo_msg.rpy[2])
# return
dist = target - self.odo_msg.rpy[2]
def side_equal(self, msg, limit=0.08): ## 允许最大误差0.08米即离最中线左右偏向4厘米
###先里程计回正再算距离
const_int = 2600 # 对 差的距离 进行倍数放大 控制左右移动时间
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3)
dist_diff = dist1 - dist2
while abs(dist_diff) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0, -0.15 if dist_diff > 0 else 0.15, 0]
msg.duration = int(const_int * abs(dist_diff))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0, 0.3, 0]
self.Send_cmd(msg)
self.Wait_finish(11, 27)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3)
dist_diff = dist1 - dist2
def walk_forward(self, msg, left_dist, limit=0.04): ##在对正之后才能调用left_dist指定前面预留多长的距离
const_int = 1800 # 对 差的距离 进行倍数放大 控制前进时间
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3)
dist_need_walk = dist - left_dist
while abs(dist_need_walk) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.6 if dist_need_walk > 0 else -0.6, 0, 0]
msg.duration = int(const_int * abs(dist_need_walk))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(11, 27)
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3)
dist_need_walk = dist - left_dist
print('forward dis_after', dist)
return
def Wait_finish(self, mode, gait_id):
count = 0
while self.runing and count < 2000: # 10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated
self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep(0.005)
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.send_lock.release()
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
self.odo_thread.join()
self.laser_thread.join()
# Main function
if __name__ == '__main__':
main()