# ''' # This demo show the communication interface of MR813 motion control board based on Lcm. # Dependency: # - robot_control_cmd_lcmt.py # - robot_control_response_lcmt.py # 运行代码之前需要执行: # cd /home/cyberdog_sim # source /opt/ros/galactic/setup.bash # cd localpath # python3 main.py # 在项目中,我们使用了以下版本的NumPy和OpenCV开源库: # NumPy 版本:1.24.4 # OpenCV 版本:4.10.0.84 # 感谢NumPy和OpenCV社区的贡献,使我们能够更高效地完成本次设计。 # ''' ''' 这段代码使用雷达和相机,在的起始位置 完成石板路+直角转弯4+连续碰撞 正确无失误地返回起点 ''' import lcm import sys import os import time from threading import Thread, Lock from robot_control_cmd_lcmt import robot_control_cmd_lcmt from robot_control_response_lcmt import robot_control_response_lcmt from localization_lcmt import localization_lcmt import toml import copy import math from file_send_lcmt import file_send_lcmt import rclpy from rclpy.executors import MultiThreadedExecutor import numpy from rclpy.node import Node from sensor_msgs.msg import LaserScan,Image from rclpy.qos import QoSProfile, qos_profile_sensor_data from pprint import pprint import cv2 from cv_bridge import CvBridge # 自定义步态发送数据的结构 robot_cmd = { 'mode': 0, 'gait_id': 0, 'contact': 0, 'life_count': 0, 'vel_des': [0.0, 0.0, 0.0], 'rpy_des': [0.0, 0.0, 0.0], 'pos_des': [0.0, 0.0, 0.0], 'acc_des': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 'ctrl_point': [0.0, 0.0, 0.0], 'foot_pose': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 'step_height': [0.0, 0.0], 'value': 0, 'duration': 0 } # 读取到的运动数据 # 使用 self.rec_msg.xxx 代替motion_rec 如: # motion_rec['mode'] 替换为 self.rec_msg.mode # motion_rec = { # 'mode':0, # 'gait_id':0, # 'contact':0, # 'order_process_bar':0, # 'switch_status':0, # 'ori_error':0, # 'footpos_error':0, # 'motor_error':(0) * 12, # } # 里程计数据 # 使用 self.odo_msg.xxx 代替odo_rec 如: # odo_rec['xyz'] 替换为 self.odo_msg.xyz # odo_rec = { # 'xyz' : (0.0, 0.0, 0.0), # 'vxyz' : (0.0, 0.0, 0.0), # 'rpy' : (0.0, 0.0, 0.0), # 'omegaBody' : (0.0, 0.0, 0.0), # 'vBody' : (0.0, 0.0, 0.0), # 'timestamp' : 0 # } # 雷达数据 laser_rec = { 'data': [0] * 180 } RGB_res = 0 def main(): Ctrl = Robot_Ctrl() msg = robot_control_cmd_lcmt() # pos = -1 try: Ctrl.run() ##############################################通过RGB代码##################################################################### # Ctrl.Recovery_stand(msg) # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3000 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # if pos < 0: # print(RGB_res) # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # pos = 1 # else: # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # pos = -1 # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3333 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) # if pos < 0: # print(RGB_res) # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # pos = 1 # else: # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # pos = -1 # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3333 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) # if pos < 0: # print(RGB_res) # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # pos = 1 # else: # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # pos = -1 # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3333 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) # if pos < 0: # print(RGB_res) # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # pos = 1 # else: # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # pos = -1 # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3333 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) # if pos < 0: # print(RGB_res) # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # pos = 1 # else: # if RGB_res > 0: # Ctrl.rightside_align(msg=msg,expect_dist=0.4) # else: # Ctrl.rightside_align(msg=msg,expect_dist=0.9) # pos = -1 # msg.mode = 11 # Locomotion # msg.gait_id = 27 # msg.vel_des = [0.3, 0, 0] # msg.duration = 3333 # msg.step_height = [0.06, 0.06] # ground clearness of swing leg # msg.life_count += 1 # Ctrl.Send_cmd(msg) # time.sleep(10) except KeyboardInterrupt: pass Ctrl.quit() sys.exit() # pid 控制 class DeltaPid(object): ''' PID calculate pwm = pre_pwm + kp*(err-pre_ee) + ki*err + kd*(err-2*pre_err+pre_pre_ee) ''' def __init__(self, target_val, max_val, min_val, p, i, d): self.max_val = max_val self.min_val = min_val self.k_p = p self.k_i = i self.k_d = d self.target_val = target_val self._pre_val = target_val self._pre_pre_val = target_val - 1 def calculate(self, cur_val, pwm_in): # pwm = pre_pwm + kp*(err-pre_ee) + ki*err + kd*(err-2*pre_err+pre_pre_ee) pwm_out = 0 p_change = self.k_p * (cur_val - self._pre_val) i_change = self.k_i * (cur_val - self.target_val) d_change = self.k_d * (cur_val - 2 * self._pre_val + self._pre_pre_val) print(f"p:{p_change} i:{i_change} d:{d_change}") delta_output = p_change + i_change + d_change print(f"p+i+d output={delta_output}") pwm_out = delta_output + pwm_in print(f"calculate pwm={pwm_out}") self._pre_pre_val = self._pre_val self._pre_val = cur_val pwm_out = self.max_val if pwm_out > self.max_val else (self.min_val if pwm_out < self.min_val else pwm_out) print(f"actual output pwm={pwm_out}") return pwm_out # 激光雷达订阅 class LaserScanSubscriber(Node): def __init__(self): super().__init__('laser_scan_subscriber') # 使用SensorDataQoS作为QoS配置文件 qos_profile = qos_profile_sensor_data # 这就是SensorDataQoS self.subscription = self.create_subscription( LaserScan, 'scan', # 替换为你的激光雷达数据发布的话题名 self.listener_callback, qos_profile) def listener_callback(self, msg): global laser_rec laser_rec['data'] = msg.ranges.tolist() class RGB_Subscriber(Node): def __init__(self): super().__init__('RGB_subscriber') qos_profile = qos_profile_sensor_data self.subscription = self.create_subscription( Image, '/image_rgb', self.listener_callback, qos_profile) self.bridge = CvBridge() def find_green_location(self, image, red_range = [20, 35], green_range = [50, 80]): #调参部分1 height, width = image.shape[:2] central_part = [2 * width // 8, 4 * width // 8, 0, 4 * height // 8] #调参部分2 central_image = image[central_part[2]:central_part[3], central_part[0]:central_part[1]] # cv2.imshow("image",central_image) # cv2.waitKey(0) _,g,r = cv2.split(central_image) mean_green = numpy.mean(g) mean_red = numpy.mean(r) if green_range[0] <= mean_green <= green_range[1] and red_range[0] <= mean_red <= red_range[1]: # 如果x坐标的平均值大于图像宽度的一半 return 1 else: return -1 def mid_before_bridge(): return def listener_callback(self, msg): # Convert the ROS Image message to an OpenCV image cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8') global RGB_res RGB_res = self.find_green_location(cv_image) cv2.imwrite("image.jpg",cv_image) time.sleep(3) def get_image(self, msg): cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8') cv2.imshow("image",cv_image) cv2.waitKey() class Robot_Ctrl(object): def __init__(self): # 反馈线程初始化 self.rec_thread = Thread(target=self.rec_responce) self.send_thread = Thread(target=self.send_publish) self.odo_thread = Thread(target=self.rec_responce_o) self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255") self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.cmd_msg = robot_control_cmd_lcmt() self.rec_msg = robot_control_response_lcmt() self.odo_msg = localization_lcmt() self.send_lock = Lock() self.delay_cnt = 0 self.mode_ok = 0 self.gait_ok = 0 self.runing = 1 self.RGB_res = 0 self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") ###里程计 # self.handle_lock = Lock() ###当前读数据只允许一个handle self.lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255") self.usergait_msg = file_send_lcmt() rclpy.init(args=None) self.laser_scan_subscriber = LaserScanSubscriber() self.RGB_subscriber = RGB_Subscriber() self.executor = MultiThreadedExecutor() self.laser_thread = Thread(target=self.spin_func, args=(self.laser_scan_subscriber,)) self.RGB_thread = Thread(target=self.spin_func, args=(self.RGB_subscriber,)) # 控制pid self.yaw_pid = DeltaPid(0.0, 1.2, 0.01, 10, 0.7, 0.3) # 控制yaw角pid初始化 def spin_func(self, subscriber): self.executor.add_node(subscriber) self.executor.spin() time.sleep(0.02) def run(self): self.lc_r.subscribe("robot_control_response", self.msg_handler) self.lc_o.subscribe("global_to_robot", self.msg_handler_o) ###里程计订阅话题 self.send_thread.start() self.rec_thread.start() self.odo_thread.start() ###启动里程计 self.laser_thread.start() self.RGB_thread.start() # self.sub_thread def msg_handler(self, channel, data): self.rec_msg = robot_control_response_lcmt().decode(data) if (self.rec_msg.order_process_bar >= 95): self.mode_ok = self.rec_msg.mode # motion_rec['mode'] = self.rec_msg.mode # motion_rec['gait_id'] = self.rec_msg.gait_id # motion_rec['contact'] = self.rec_msg.contact # motion_rec['order_process_bar'] = self.rec_msg.order_process_bar # motion_rec['switch_status'] = self.rec_msg.switch_status # motion_rec['ori_error'] =self.rec_msg.ori_error # motion_rec['footpos_error'] = self.rec_msg.footpos_error # motion_rec['motor_error'] = self.rec_msg.motor_error # print(motion_rec) else: self.mode_ok = 0 def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用 self.odo_msg = localization_lcmt().decode(data) # odo_rec['xyz'] = self.odo_msg.xyz # odo_rec['vxyz'] = self.odo_msg.vxyz # odo_rec['rpy'] = self.odo_msg.rpy # odo_rec['omegaBody'] = self.odo_msg.omegaBody # odo_rec['vBody'] = self.odo_msg.vBody # odo_rec['timestamp'] = self.odo_msg.timestamp # print(odo_rec['rpy']) def rec_responce(self): while self.runing: # self.handle_lock.acquire() self.lc_r.handle() # self.handle_lock.release() time.sleep(0.002) def rec_responce_o(self): ###里程计handle 注意一时刻只能有一个handle while self.runing: # self.handle_lock.acquire() self.lc_o.handle() # self.handle_lock.release() time.sleep(0.002) def selaction(self, mode): # 自定义步态选择参数 try: self.send_lock.acquire() if mode == 0: # 石子路 steps = toml.load("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk.toml") elif mode == 1: # 上下坡 steps = toml.load("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk.toml") full_steps = {'step': [robot_cmd]} k = 0 for i in steps['step']: cmd = copy.deepcopy(robot_cmd) cmd['duration'] = i['duration'] if i['type'] == 'usergait': cmd['mode'] = 11 # LOCOMOTION cmd['gait_id'] = 110 # USERGAIT cmd['vel_des'] = i['body_vel_des'] cmd['rpy_des'] = i['body_pos_des'][0:3] cmd['pos_des'] = i['body_pos_des'][3:6] cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2] cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5] cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8] cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11] cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil( i['step_height'][1] * 1e3) * 1e3 cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil( i['step_height'][3] * 1e3) * 1e3 cmd['acc_des'] = i['weight'] cmd['value'] = i['use_mpc_traj'] cmd['contact'] = math.floor(i['landing_gain'] * 1e1) cmd['ctrl_point'][2] = i['mu'] if k == 0: full_steps['step'] = [cmd] else: full_steps['step'].append(cmd) k = k + 1 if mode == 0: # 石子路 f = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk_full.toml", 'w') elif mode == 1: # 上下坡 f = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk_full.toml", 'w') f.write("# Gait Params\n") f.writelines(toml.dumps(full_steps)) f.close() if mode == 0: # 石子路 file_obj_gait_def = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Def_walk.toml", 'r') file_obj_gait_params = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk_full.toml", 'r') elif mode == 1: # 上下坡 file_obj_gait_def = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Def_scopewalk.toml", 'r') file_obj_gait_params = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk_full.toml", 'r') self.usergait_msg.data = file_obj_gait_def.read() self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode()) time.sleep(0.5) self.usergait_msg.data = file_obj_gait_params.read() self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode()) time.sleep(0.1) file_obj_gait_def.close() file_obj_gait_params.close() user_gait_list = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Usergait_List.toml", 'r') steps = toml.load(user_gait_list) for step in steps['step']: self.cmd_msg.mode = step['mode'] self.cmd_msg.value = step['value'] self.cmd_msg.contact = step['contact'] self.cmd_msg.gait_id = step['gait_id'] self.cmd_msg.duration = step['duration'] self.cmd_msg.life_count += 1 for i in range(3): self.cmd_msg.vel_des[i] = step['vel_des'][i] self.cmd_msg.rpy_des[i] = step['rpy_des'][i] self.cmd_msg.pos_des[i] = step['pos_des'][i] self.cmd_msg.acc_des[i] = step['acc_des'][i] self.cmd_msg.acc_des[i + 3] = step['acc_des'][i + 3] self.cmd_msg.foot_pose[i] = step['foot_pose'][i] self.cmd_msg.ctrl_point[i] = step['ctrl_point'][i] for i in range(2): self.cmd_msg.step_height[i] = step['step_height'][i] self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) time.sleep(0.1) for i in range(200): # 10s Heat beat It is used to maintain the heartbeat when life count is not updated self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) self.err_handler(msg=robot_control_cmd_lcmt()) time.sleep(0.2) self.send_lock.release() except KeyboardInterrupt: self.cmd_msg.mode = 7 # PureDamper before KeyboardInterrupt: self.cmd_msg.gait_id = 0 self.cmd_msg.duration = 0 self.cmd_msg.life_count += 1 self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode()) self.send_lock.release() pass def Wait_finish(self, mode, gait_id, timeout=2000): # timeout // 0.005 = 多少秒超时 count = 0 while self.runing and count < timeout: # 10s if self.mode_ok == mode and self.gait_ok == gait_id: return True else: time.sleep(0.005) count += 1 def send_publish(self): while self.runing: self.send_lock.acquire() if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode()) self.delay_cnt = 0 self.delay_cnt += 1 self.send_lock.release() time.sleep(0.005) def Send_cmd(self, msg): self.send_lock.acquire() self.delay_cnt = 50 self.cmd_msg = msg self.send_lock.release() def Ctrl_yaw(self, msg, angle_set): while abs(angle_set - self.odo_msg.rpy[2] - angle_set) > 0.05: msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0, 0, -0.5] # 转向 msg.duration = 300 # Zero duration means continuous motion until a new command is used. # Continuous motion can interrupt non-zero duration interpolation motion msg.life_count += 1 # Command will take effect when life_count update self.Send_cmd(msg) self.Wait_finish(11, 26) self.yaw_pid.calculate(self.odo_msg.rpy[2], ) def quit(self): self.runing = 0 self.rec_thread.join() self.send_thread.join() self.odo_thread.join() self.RGB_thread.join() self.laser_thread.join() def jump(self, msg, duration, type): # 跳跃 msg.mode = 16 # Locomotion msg.gait_id = type # 0左跳 1前跳 3右跳 6原地跳 msg.duration = duration # Zero duration means continuous motion until a new command is used. # Continuous motion can interrupt non-zero duration interpolation motion msg.life_count += 1 self.Send_cmd(msg) self.Wait_finish(16, 1) time.sleep(0.1) def Recovery_stand(self, msg): msg.mode = 12 # Recovery stand msg.gait_id = 0 msg.life_count += 1 # Command will take effect when life_count update self.Send_cmd(msg) self.Wait_finish(12, 0) def Pure_Damp(self, msg, mode): msg.mode = 7 # PureDamp msg.gait_id = mode # 0为自然倒下,1为受控倒下 msg.life_count += 1 # Command will take effect when life_count update self.Send_cmd(msg) self.Wait_finish(12, 0) def stone(self, msg): self.Recovery_stand(msg) self.selaction(0) time.sleep(0.1) # def locomotion(s) def slope(self, msg): self.Recovery_stand(msg) self.selaction(1) def err_handler(self, msg): if self.rec_msg.switch_status == 3: # 如果状态为高祖尼保护模式 msg.mode = 12 # Recovery stand msg.gait_id = 0 msg.life_count += 1 # Command will take effect when life_count update self.Send_cmd(msg) self.Wait_finish(12, 0) # 雷达以及里程计 # 探测直线的时候需要的二分查找 def bio_select(self, k, b, start, end, dist_limit, data_with_order_x, data_with_order_y): left, right = start, end - 1 # 二分查找 while left < right: mid = (left + right) // 2 x_mid, y_mid = data_with_order_x[mid], data_with_order_y[mid] # 计算点到直线的距离 distance = abs(k * x_mid - y_mid + b) / (k ** 2 + 1) ** 0.5 if distance <= dist_limit: # 点在直线附近,继续向右搜索 left = mid + 1 else: # 点不在直线附近,缩小搜索范围 right = mid # 返回直线附近点的范围 return left def detect_lines(self, data=laser_rec['data']): # data 180个的数组 dist_limit = 0.10 init_points = 5 data_with_order_x = [] data_with_order_y = [] data = laser_rec['data'] # degree = self.odo_msg.rpy[2] # 过滤无效值 for i in range(180): if data[i] < 0.12 or data[i] > 12: continue # data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离 data_with_order_x.append(data[i] * math.cos(math.radians(i - 90))) data_with_order_y.append(data[i] * math.sin(math.radians(i - 90))) # x y 轴 len_filtered = len(data_with_order_x) # print(len(data_with_order_x)) # for i in range(0, len(data_with_order_x)): # print('(', end='') # print(data_with_order_x[i], data_with_order_y[i], sep=' ,', end='') # print('),', end='\n') # 储存直线方程 lines = [] # 拐动位置 corners = [0] while corners[-1] < len_filtered - init_points - 1: start = corners[-1] k1_init, b_init = numpy.polyfit(data_with_order_x[start:start + init_points], data_with_order_y[start:start + init_points], 1) # print(k1_init, b_init) # 二分查找 end 是范围内最后一个点 end = self.bio_select(k1_init, b_init, start, len_filtered, dist_limit, data_with_order_x, data_with_order_y) # 允许一次偏差出现,此时end + 1 被击毙了,用现有的start - end 再拟合新直线,看 end + 1 还在不在外边, # 如果还在外边,大概率是救不回了 # 在里面还可以二分一下 k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end], data_with_order_y[start:end], 1) dist_bias = (abs(k1_fixed * data_with_order_x[end] - data_with_order_y[end] + b1_fixed) / (k1_fixed ** 2 + 1) ** 0.5) if dist_bias < dist_limit: end_new = self.bio_select(k1_fixed, b1_fixed, end, len_filtered, dist_limit, data_with_order_x, data_with_order_y) k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end_new], data_with_order_y[start:end_new], 1) else: end_new = end corners.append(end_new) # print(end) lines.append([k1_fixed, b1_fixed]) if len(lines) >= 3: print(corners) break # for i in lines: # # print(f'{i[0]}x - y + {i[1]} = 0') # print(f'[{i[0]}, {i[1]}],') return lines def circle_dist(self,target,location): ###3.1416 -3.1416 是同一个值 value1 = abs(target - location) value2 = 6.2832 - value1 direction1 = 1 if target > location else 0 # ##按value1转动,1为逆时针 0 为顺时针 if value1 < value2: return direction1,value1 else: return 1-direction1,value2 def odo_verticalturn(self, target, msg, limit=0.04, timesleep_s=5): ##允许误差0.04弧度,大概是2.29度 const_int = 2470 # 转 1.57 弧度 大概要 3875 duration 每弧度大概这个值 持续时间大概6.5秒 loc = self.odo_msg.rpy[2] direction, dist = self.circle_dist(target=target, location=loc) print('odo_verticalturn', self.odo_msg.rpy[2], target) if abs(dist) > limit: msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0, 0, 0.5 if direction > 0 else -0.5] # 转向 msg.duration = int(const_int * abs(dist)) # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(7 * abs(dist) / 1.57) # print('1 times finish angle',self.odo_msg.rpy[2]) # return dist = target - self.odo_msg.rpy[2] print('odo_verticalturn', self.odo_msg.rpy[2], target) def side_equal(self, msg, limit=0.08, off_set=0.0, timeout=600): ## 允许最大误差0.08米,即离最中线左右偏向4厘米 ###先里程计回正再算距离 const_int = 6666 # 对 差的距离 进行倍数放大 控制左右移动时间 ###2600 lines = self.detect_lines() while len(lines) < 3: time.sleep(0.04) lines = self.detect_lines() dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3) # print(f'before move to equal: r :{dist1} l: {dist2}') dist_diff = dist1 - dist2 - 2 * off_set while abs(dist_diff) > limit: msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0, -0.15, 0] if dist_diff > 0 else [0, 0.15, 0] msg.duration = int(const_int * abs(dist_diff) / 2) msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 msg.rpy_des = [0, 0.3, 0] self.Send_cmd(msg) time.sleep(msg.duration / 1000 + 0.5) lines = self.detect_lines() while len(lines) < 3: time.sleep(0.04) lines = self.detect_lines() dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3) dist_diff = dist1 - dist2 def walk_forward(self, msg, left_dist, limit=0.04, timeout_s=6, k_bound=5, exp_speed = 0.6): ##在对正之后才能调用,left_dist指定前面预留多长的距离 const_int = 1667 # 对 差的距离 进行倍数放大 控制前进时间 lines = self.detect_lines() dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3) print(dist) # for i in lines: # print(i) while (len(lines) <= 1 or (lines[1][0] < k_bound and lines[1][0] > -k_bound)): time.sleep(0.04) lines = self.detect_lines() dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3) dist_need_walk = dist - left_dist while abs(dist_need_walk) > limit: msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [exp_speed if dist_need_walk > 0 else -exp_speed, 0, 0] msg.duration = int(const_int * abs(dist_need_walk) * 0.6/exp_speed) msg.step_height = [0.06, 0.06] # ground clearness of swing leg # if dist_need_walk > 3: # msg.duration = 4500 # msg.life_count += 1 # self.Wait_finish(11,27,timeout=1200) # print('固定运动') # return self.walk_forward(msg,left_dist,limit,timeout) msg.life_count += 1 self.Send_cmd(msg) time.sleep(timeout_s) lines = self.detect_lines() dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3) # while (len(lines) <= 1 or (lines[1][0] -k_bound)): # time.sleep(0.04) # lines = self.detect_lines() # dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3) # dist_need_walk = dist - left_dist print('walk_forward dis_after', dist) return def turnleft_corner(self, msg, target1=0, target2=1.57, dist1_right=0.85, dist2_right=0.35): '''左转时定位我们需要的位置,在接近拐角的任意位置启动函数,可以顺利运动到下一条赛道的起点''' '''target1是第一条赛道的正方向,target2是第二条赛道的正方向,dist1_right是第一条赛道右侧保持距离,dist2_right是第二条赛道右侧保持的距离''' self.odo_changeback(target=target1, msg=msg) time.sleep(3) self.rightside_align(msg=msg, expect_dist=dist1_right) time.sleep(3) self.walk_forward(msg, left_dist=dist2_right) time.sleep(4) self.odo_changeback(target=target2, msg=msg) def rightside_align(self, msg, expect_dist=0.65, limit=0.04, timeout_s=4): const_int = 5000 lines = self.detect_lines() dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist_need_walk = dist - expect_dist while abs(dist_need_walk) > limit: msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0, -0.2 if dist_need_walk > 0 else 0.2, 0] msg.duration = int(const_int * abs(dist_need_walk)) msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(timeout_s) lines = self.detect_lines() dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3) dist_need_walk = dist - expect_dist print('right align dis_after', dist) return ###cross_speedump 在 第二个转角回正之后调用,回正参数: # Ctrl.Recovery_stand(msg) # Ctrl.odo_changeback(target=0,msg=msg) # Ctrl.walk_forward(msg,left_dist= 0.40,timeout_s=3) # Ctrl.rightside_align(msg=msg,expect_dist= 1.15,timeout_s=5) # Ctrl.odo_changeback(target=1.57, msg=msg) # Ctrl.cross_speedump(msg=msg) def cross_speedump(self, msg): const_int1 = 5000 #######在走向圆柱的情况下,使用的duration常数 const_int2 = 5000 #######在侧向靠近圆柱,进入赛道位置的情况下,使用的duration常数,调大 -> 机械狗横向后更加靠近圆柱 y_sharp_dist = -0.18 #######给最后的对齐使用的参数 绝对值是对正之后拐点和狗y轴的差距 正常要负数 x_sharp_dist = 0.2 #######给最后的对齐使用的参数 #####开环走过大部分减速带 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0.2, 0, 0] # 直走 msg.rpy_des = [0, 0, 0] # Zero duration means continuous motion until a new command is used. msg.duration = 14000 # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(11) laser_mid = laser_rec['data'] lines_mid = self.detect_lines(data=laser_mid) time.sleep(13) ###找到方向处理剩余的不确定距离,主要是向半径方向靠近圆柱 # self.odo_changeback(1.57,msg) time.sleep(0.04) laser_keep = laser_rec['data'] idx_min = laser_keep[70:110].index(min(laser_keep[70:110])) + 70 # 最小值点 dist_min = laser_keep[idx_min] angel_diff = idx_min / 90 * 1.57 print(idx_min, dist_min, angel_diff) if dist_min > 0.15: msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0.1 * math.sin(angel_diff), 0.1 * math.cos(angel_diff), 0] # 直走 msg.rpy_des = [0, 0, 0] # Zero duration means continuous motion until a new command is used. msg.duration = int((dist_min - 0.15) * const_int1) # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) print('tiny walk') ####转向0度,回正 time.sleep(0.5) self.odo_changeback(0, msg=msg) time.sleep(2) ###########################找右边直线和圆环赛道的交界点,以这个点为基准去进行对齐 ###########################多次扫描去除错误点 laser_valid = [0] * 180 for i in range(10): time.sleep(0.04) laser = laser_rec['data'] for j in range(0, 180): if laser_valid[j] == 0 and laser[j] > 0.15: laser_valid[j] = laser[j] ##########################根据X轴的值去找到拐角点,序号idx data_with_order_x = [] data_with_order_y = [] validist_store = [] for i in range(180): if laser_valid[i] < 0.12 or laser_valid[i] > 12: continue # data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离 data_with_order_x.append(laser_valid[i] * math.cos(math.radians(i - 90))) data_with_order_y.append(laser_valid[i] * math.sin(math.radians(i - 90))) # x y 轴 validist_store.append(laser_valid[i]) len_filtered = len(data_with_order_x) # print('len_filtered',len_filtered) # print(data_with_order_x) idx = -1 i = 60 while i < len_filtered - 10: cnt = 0 for k in range(i + 1, i + 9): if data_with_order_x[k] > data_with_order_x[k - 1]: cnt += 1 else: break if cnt > 6: idx = i break i += 1 ###################读出拐角点的值 point_sharp_x = data_with_order_x[idx] point_sharp_y = data_with_order_y[idx] print('idx pt_sharp_x', idx, point_sharp_x) print('idx pt_sharp_y', idx, point_sharp_y) time.sleep(1) #####根据拐角点矫正方位,先在y轴上矫正 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0, 0.1 if (point_sharp_y - y_sharp_dist) > 0 else -0.1, 0] msg.rpy_des = [0, 0, 0] # Zero duration means continuous motion until a new command is used. msg.duration = int(const_int2 * abs(y_sharp_dist - point_sharp_y)) # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(5) ####再矫正x轴 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [-0.1 if (point_sharp_x - x_sharp_dist) < 0 else 0.1, 0, 0] msg.rpy_des = [0, 0, 0] # Zero duration means continuous motion until a new command is used. msg.duration = int(const_int2 * abs(point_sharp_x - x_sharp_dist)) # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(5) #####无限进行循环 msg.mode = 11 # Locomotion msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26 msg.vel_des = [0.165, 0, 0.3] # 转向 msg.duration = 0 # Continuous motion can interrupt non-zero duration interpolation motion msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 self.Send_cmd(msg) time.sleep(60) def pass_cloth(Ctrl ,msg ,pos = -1): Ctrl.Recovery_stand(msg) msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3000 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) Ctrl.rightside_align(msg=msg,expect_dist=0.9) if pos < 0: print(RGB_res) if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.9) else: Ctrl.rightside_align(msg=msg,expect_dist=0.4) pos = 1 else: if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.4) else: Ctrl.rightside_align(msg=msg,expect_dist=0.9) pos = -1 msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3333 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) if pos < 0: print(RGB_res) if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.9) else: Ctrl.rightside_align(msg=msg,expect_dist=0.4) pos = 1 else: if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.4) else: Ctrl.rightside_align(msg=msg,expect_dist=0.9) pos = -1 msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3333 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) if pos < 0: print(RGB_res) if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.9) else: Ctrl.rightside_align(msg=msg,expect_dist=0.4) pos = 1 else: if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.4) else: Ctrl.rightside_align(msg=msg,expect_dist=0.9) pos = -1 msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3333 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) if pos < 0: print(RGB_res) if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.9) else: Ctrl.rightside_align(msg=msg,expect_dist=0.4) pos = 1 else: if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.4) else: Ctrl.rightside_align(msg=msg,expect_dist=0.9) pos = -1 msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3333 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) if pos < 0: print(RGB_res) if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.9) else: Ctrl.rightside_align(msg=msg,expect_dist=0.4) pos = 1 else: if RGB_res > 0: Ctrl.rightside_align(msg=msg,expect_dist=0.4) else: Ctrl.rightside_align(msg=msg,expect_dist=0.9) pos = -1 msg.mode = 11 # Locomotion msg.gait_id = 27 msg.vel_des = [0.3, 0, 0] msg.duration = 3333 msg.step_height = [0.06, 0.06] # ground clearness of swing leg msg.life_count += 1 Ctrl.Send_cmd(msg) time.sleep(10) # Main function if __name__ == '__main__': main()