''' This demo show the communication interface of MR813 motion control board based on Lcm. Dependency: - robot_control_cmd_lcmt.py - robot_control_response_lcmt.py before running: cd /home/cyberdog_sim source /opt/ros/galactic/setup.bash 实现闭环感知直角拐弯: Ctrl 类中: detect_lines 是扫描直线的函数 odo_changeback 通过读里程计和转向实现朝向一个特定方向 side_equal用来对齐两侧直线的距离 walk_forward 输入一个期望保留的距离,然后通过直线前进到达位置,使狗前方能剩下预定距离 参数还调得不算好,尤其walk_forward函数里面的一个常参,可能需要多跑,选更加合适的一个值。 ''' import lcm import sys import os import time from threading import Thread, Lock from robot_control_cmd_lcmt import robot_control_cmd_lcmt from robot_control_response_lcmt import robot_control_response_lcmt from localization_lcmt import localization_lcmt import toml import copy import math from file_send_lcmt import file_send_lcmt import rclpy import numpy from rclpy.node import Node from sensor_msgs.msg import LaserScan from rclpy.qos import QoSProfile, qos_profile_sensor_data from pprint import pprint # 自定义步态发送数据的结构 robot_cmd = { 'mode': 0, 'gait_id': 0, 'contact': 0, 'life_count': 0, 'vel_des': [0.0, 0.0, 0.0], 'rpy_des': [0.0, 0.0, 0.0], 'pos_des': [0.0, 0.0, 0.0], 'acc_des': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 'ctrl_point': [0.0, 0.0, 0.0], 'foot_pose': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 'step_height': [0.0, 0.0], 'value': 0, 'duration': 0 } # 读取到的运动数据 # 使用 self.rec_msg.xxx 代替motion_rec 如: # motion_rec['mode'] 替换为 self.rec_msg.mode motion_rec = { 'mode': 0, 'gait_id': 0, 'contact': 0, 'order_process_bar': 0, 'switch_status': 0, 'ori_error': 0, 'footpos_error': 0, 'motor_error': (0) * 12, } # 里程计数据 # 使用 self.odo_msg.xxx 代替odo_rec 如: # odo_rec['xyz'] 替换为 self.odo_msg.xyz odo_rec = { 'xyz': (0.0, 0.0, 0.0), 'vxyz': (0.0, 0.0, 0.0), 'rpy': (0.0, 0.0, 0.0), 'omegaBody': (0.0, 0.0, 0.0), 'vBody': (0.0, 0.0, 0.0), 'timestamp': 0 } # 雷达数据 data 是 180个的list 每个度数一个点 laser_rec = { 'data': [0] * 180 } def main(): Ctrl = Robot_Ctrl() Ctrl.run() msg = robot_control_cmd_lcmt() try: msg.mode = 12 # Recovery stand msg.gait_id = 0 msg.life_count += 1 # Command will take effect when life_count update Ctrl.Send_cmd(msg) Ctrl.Wait_finish(12, 0) # print('init',Ctrl.odo_msg.rpy[2]) # Ctrl.odo_changeback(-0.785,msg=msg) Ctrl.odo_changeback(0, msg=msg) # time.sleep(2) # lines = Ctrl.detect_lines() Ctrl.side_equal(msg=msg) # dist = round(abs(lines[0][1]) / ((lines[0][0] * lines[0][0] + 1) ** 0.5),2) # print('initline',dist) # dist = round(abs(lines[2][1]) / ((lines[2][0] * lines[2][0] + 1) ** 0.5),2) # print('initline',dist) # print(Ctrl.odo_msg.rpy[2]) time.sleep(0.2) Ctrl.walk_forward(msg=msg, left_dist=0.70) time.sleep(0.2) Ctrl.odo_changeback(target=1.5708, msg=msg) time.sleep(0.2) msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.6, 0, 0] msg.duration = 3000 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) Ctrl.Wait_finish(11, 27) time.sleep(60) except KeyboardInterrupt: pass Ctrl.quit() sys.exit() class LaserScanSubscriber(Node): def __init__(self): super().__init__('laser_scan_subscriber') # 使用SensorDataQoS作为QoS配置文件 qos_profile = qos_profile_sensor_data # 这就是SensorDataQoS self.subscription = self.create_subscription( LaserScan, 'scan', # 替换为你的激光雷达数据发布的话题名 self.listener_callback, qos_profile) def listener_callback(self, msg): # self.get_logger().info('Received laser scan message:') # 在这里处理激光雷达数据,例如打印出范围的最大值 global laser_rec laser_rec['data'] = msg.ranges.tolist() # print(type(laser_rec['data'])) # print(laser_rec['data']) # print(odo_rec['rpy'][2]) time.sleep(3) class Robot_Ctrl(object): def __init__(self): self.rec_thread = Thread(target=self.rec_responce) self.send_thread = Thread(target=self.send_publish) self.odo_thread = Thread(target=self.rec_responce_o) self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.cmd_msg = robot_control_cmd_lcmt() self.rec_msg = robot_control_response_lcmt() self.odo_msg = localization_lcmt() self.send_lock = Lock() self.delay_cnt = 0 self.mode_ok = 0 self.gait_ok = 0 self.runing = 1 self.laser_scan_data = [0] * 180 self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") ###里程计 # self.handle_lock = Lock() ###当前读数据只允许一个handle self.lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.usergait_msg = file_send_lcmt() rclpy.init(args=None) self.laser_scan_subscriber = LaserScanSubscriber() self.laser_thread = Thread(target=self.laser_spin_func, kwargs={'subscriber': self.laser_scan_subscriber}) def laser_spin_func(self, subscriber): rclpy.spin(subscriber) time.sleep(0.2) def run(self): self.lc_r.subscribe("robot_control_response", self.msg_handler) self.lc_o.subscribe("global_to_robot", self.msg_handler_o) ###里程计订阅话题 self.send_thread.start() self.rec_thread.start() self.odo_thread.start() ###启动里程计 self.laser_thread.start() def msg_handler(self, channel, data): # 运动反馈解码函数。 self.rec_msg = robot_control_response_lcmt().decode(data) if (self.rec_msg.order_process_bar >= 95): self.mode_ok = self.rec_msg.mode # motion_rec['mode'] = self.rec_msg.mode # motion_rec['gait_id'] = self.rec_msg.gait_id # motion_rec['contact'] = self.rec_msg.contact # motion_rec['order_process_bar'] = self.rec_msg.order_process_bar # motion_rec['switch_status'] = self.rec_msg.switch_status # motion_rec['ori_error'] =self.rec_msg.ori_error # motion_rec['footpos_error'] = self.rec_msg.footpos_error # motion_rec['motor_error'] = self.rec_msg.motor_error # print(motion_rec) else: self.mode_ok = 0 def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用 self.odo_msg = localization_lcmt().decode(data) # odo_rec['xyz'] = self.odo_msg.xyz # odo_rec['vxyz'] = self.odo_msg.vxyz # odo_rec['rpy'] = self.odo_msg.rpy # odo_rec['omegaBody'] = self.odo_msg.omegaBody # odo_rec['vBody'] = self.odo_msg.vBody # odo_rec['timestamp'] = self.odo_msg.timestamp # print(odo_rec) def rec_responce(self): while self.runing: # self.handle_lock.acquire() self.lc_r.handle() # self.handle_lock.release() time.sleep(0.002) def rec_responce_o(self): ###里程计函数,调用msg_hadler_o while self.runing: # self.handle_lock.acquire() self.lc_o.handle() # self.handle_lock.release() time.sleep(0.002) # 探测直线的时候需要的二分查找 def bio_select(self, k, b, start, end, dist_limit, data_with_order_x, data_with_order_y): left, right = start, end - 1 # 二分查找 while left < right: mid = (left + right) // 2 x_mid, y_mid = data_with_order_x[mid], data_with_order_y[mid] # 计算点到直线的距离 distance = abs(k * x_mid - y_mid + b) / (k ** 2 + 1) ** 0.5 if distance <= dist_limit: # 点在直线附近,继续向右搜索 left = mid + 1 else: # 点不在直线附近,缩小搜索范围 right = mid # 返回直线附近点的范围 return left def detect_lines(self): # data 180个的数组 dist_limit = 0.10 init_points = 5 data_with_order_x = [] data_with_order_y = [] data = laser_rec['data'] # degree = self.odo_msg.rpy[2] # 过滤无效值 for i in range(180): if data[i] < 0.12 or data[i] > 12: continue # data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离 data_with_order_x.append(data[i] * math.cos(math.radians(i - 90))) data_with_order_y.append(data[i] * math.sin(math.radians(i - 90))) # x y 轴 len_filtered = len(data_with_order_x) # print(len(data_with_order_x)) # for i in range(0, len(data_with_order_x)): # print('(', end='') # print(data_with_order_x[i], data_with_order_y[i], sep=' ,', end='') # print('),', end='\n') # 储存直线方程 lines = [] # 拐动位置 corners = [0] while corners[-1] < len_filtered - init_points - 1: start = corners[-1] k1_init, b_init = numpy.polyfit(data_with_order_x[start:start + init_points], data_with_order_y[start:start + init_points], 1) # print(k1_init, b_init) # 二分查找 end 是范围内最后一个点 end = self.bio_select(k1_init, b_init, start, len_filtered, dist_limit, data_with_order_x, data_with_order_y) # 允许一次偏差出现,此时end + 1 被击毙了,用现有的start - end 再拟合新直线,看 end + 1 还在不在外边, # 如果还在外边,大概率是救不回了 # 在里面还可以二分一下 k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end], data_with_order_y[start:end], 1) dist_bias = (abs(k1_fixed * data_with_order_x[end] - data_with_order_y[end] + b1_fixed) / (k1_fixed ** 2 + 1) ** 0.5) if dist_bias < dist_limit: end_new = self.bio_select(k1_fixed, b1_fixed, end, len_filtered, dist_limit, data_with_order_x, data_with_order_y) k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end_new], data_with_order_y[start:end_new], 1) else: end_new = end corners.append(end_new) # print(end) lines.append([k1_fixed, b1_fixed]) if len(lines) >= 3: # print(corners) break # for i in lines: # print(f'{i[0]}x - y + {i[2]} = 0') # print(f'[{i[0]}, {i[1]}],') return lines def selaction(self): try: self.send_lock.acquire() steps = toml.load("Gait_Params_moonwalk.toml") full_steps = {'step': [robot_cmd]} k = 0 for i in steps['step']: cmd = copy.deepcopy(robot_cmd) cmd['duration'] = i['duration'] if i['type'] == 'usergait': cmd['mode'] = 11 # LOCOMOTION cmd['gait_id'] = 110 # USERGAIT cmd['vel_des'] = i['body_vel_des'] cmd['rpy_des'] = i['body_pos_des'][0:3] cmd['pos_des'] = i['body_pos_des'][3:6] cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2] cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5] cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8] cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11] cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil( i['step_height'][1] * 1e3) * 1e3 cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil( i['step_height'][3] * 1e3) * 1e3 cmd['acc_des'] = i['weight'] cmd['value'] = i['use_mpc_traj'] cmd['contact'] = math.floor(i['landing_gain'] * 1e1) cmd['ctrl_point'][2] = i['mu'] if k == 0: full_steps['step'] = [cmd] else: full_steps['step'].append(cmd) k = k + 1 f = open("Gait_Params_moonwalk_full.toml", 'w') f.write("# Gait Params\n") f.writelines(toml.dumps(full_steps)) f.close() file_obj_gait_def = open("Gait_Def_moonwalk.toml", 'r') file_obj_gait_params = open("Gait_Params_moonwalk_full.toml", 'r') self.usergait_msg.data = file_obj_gait_def.read() self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode()) time.sleep(0.5) self.usergait_msg.data = file_obj_gait_params.read() self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode()) time.sleep(0.1) file_obj_gait_def.close() file_obj_gait_params.close() user_gait_list = open("Usergait_List.toml", 'r') steps = toml.load(user_gait_list) for step in steps['step']: self.cmd_msg.mode = step['mode'] self.cmd_msg.value = step['value'] self.cmd_msg.contact = step['contact'] self.cmd_msg.gait_id = step['gait_id'] self.cmd_msg.duration = step['duration'] self.cmd_msg.life_count += 1 for i in range(3): self.cmd_msg.vel_des[i] = step['vel_des'][i] self.cmd_msg.rpy_des[i] = step['rpy_des'][i] self.cmd_msg.pos_des[i] = step['pos_des'][i] self.cmd_msg.acc_des[i] = step['acc_des'][i] self.cmd_msg.acc_des[i + 3] = step['acc_des'][i + 3] self.cmd_msg.foot_pose[i] = step['foot_pose'][i] self.cmd_msg.ctrl_point[i] = step['ctrl_point'][i] for i in range(2): self.cmd_msg.step_height[i] = step['step_height'][i] self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) time.sleep(0.1) for i in range(75): # 15s Heat beat It is used to maintain the heartbeat when life count is not updated self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) time.sleep(0.2) self.send_lock.release() except KeyboardInterrupt: self.cmd_msg.mode = 7 # PureDamper before KeyboardInterrupt: self.cmd_msg.gait_id = 0 self.cmd_msg.duration = 0 self.cmd_msg.life_count += 1 self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) self.send_lock.release() pass def odo_changeback(self, target, msg, limit=0.05): ##允许误差0.05弧度,大概是2.86度 const_int = 2400 # 对 差的角度 进行倍数放大 控制旋转时间 dist = target - self.odo_msg.rpy[2] while abs(dist) > limit: msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0, 0, 0.5 if dist > 0 else -0.5] # 转向 msg.duration = int(const_int * abs(dist)) # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) self.Wait_finish(11, 26) time.sleep(0.2) # print('1 times finish angle',self.odo_msg.rpy[2]) # return dist = target - self.odo_msg.rpy[2] def side_equal(self, msg, limit=0.08): ## 允许最大误差0.08米,即离最中线左右偏向4厘米 ###先里程计回正再算距离 const_int = 2600 # 对 差的距离 进行倍数放大 控制左右移动时间 lines = self.detect_lines() dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3) dist_diff = dist1 - dist2 while abs(dist_diff) > limit: msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0, -0.15 if dist_diff > 0 else 0.15, 0] msg.duration = int(const_int * abs(dist_diff)) msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 msg.rpy_des = [0, 0.3, 0] self.Send_cmd(msg) self.Wait_finish(11, 27) lines = self.detect_lines() dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3) dist_diff = dist1 - dist2 def walk_forward(self, msg, left_dist, limit=0.04): ##在对正之后才能调用,left_dist指定前面预留多长的距离 const_int = 1800 # 对 差的距离 进行倍数放大 控制前进时间 lines = self.detect_lines() dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3) dist_need_walk = dist - left_dist while abs(dist_need_walk) > limit: msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.6 if dist_need_walk > 0 else -0.6, 0, 0] msg.duration = int(const_int * abs(dist_need_walk)) msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) self.Wait_finish(11, 27) lines = self.detect_lines() dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3) dist_need_walk = dist - left_dist print('forward dis_after', dist) return def Wait_finish(self, mode, gait_id): count = 0 while self.runing and count < 2000: # 10s if self.mode_ok == mode and self.gait_ok == gait_id: return True else: time.sleep(0.005) count += 1 def send_publish(self): while self.runing: self.send_lock.acquire() if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode()) self.delay_cnt = 0 self.delay_cnt += 1 self.send_lock.release() time.sleep(0.005) def Send_cmd(self, msg): self.send_lock.acquire() self.delay_cnt = 50 self.cmd_msg = msg self.send_lock.release() def quit(self): self.runing = 0 self.rec_thread.join() self.send_thread.join() self.odo_thread.join() self.laser_thread.join() # Main function if __name__ == '__main__': main()