2025-05-13 09:09:54 +00:00

812 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
This demo show the communication interface of MR813 motion control board based on Lcm.
Dependency:
- robot_control_cmd_lcmt.py
- robot_control_response_lcmt.py
雷达参数已经能够读取
替换basic_motion_test_comb中的main.py
运行代码之前需要执行:
cd /home/cyberdog_sim
source /opt/ros/galactic/setup.bash
'''
import lcm
import sys
import os
import time
from threading import Thread, Lock
from robot_control_cmd_lcmt import robot_control_cmd_lcmt
from robot_control_response_lcmt import robot_control_response_lcmt
from localization_lcmt import localization_lcmt
import toml
import copy
import math
from file_send_lcmt import file_send_lcmt
import rclpy
import numpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from rclpy.qos import QoSProfile, qos_profile_sensor_data
from pprint import pprint
#自定义步态发送数据的结构
robot_cmd = {
'mode':0, 'gait_id':0, 'contact':0, 'life_count':0,
'vel_des':[0.0, 0.0, 0.0],
'rpy_des':[0.0, 0.0, 0.0],
'pos_des':[0.0, 0.0, 0.0],
'acc_des':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point':[0.0, 0.0, 0.0],
'foot_pose':[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height':[0.0, 0.0],
'value':0, 'duration':0
}
#读取到的运动数据
motion_rec = {
'mode':0,
'gait_id':0,
'contact':0,
'order_process_bar':0,
'switch_status':0,
'ori_error':0,
'footpos_error':0,
'motor_error':(0) * 12,
}
PI = 3.1416
# R = 0.5
# T = 10
# 里程计数据
odo_rec = {
'xyz' : (0.0, 0.0, 0.0),
'vxyz' : (0.0, 0.0, 0.0),
'rpy' : (0.0, 0.0, 0.0),
'omegaBody' : (0.0, 0.0, 0.0),
'vBody' : (0.0, 0.0, 0.0),
'timestamp' : 0
}
#雷达数据
laser_rec = {
'data': [0] * 180
}
def main():
Ctrl = Robot_Ctrl()
Ctrl.run()
msg = robot_control_cmd_lcmt()
try:
Ctrl.Recovery_stand(msg)
time.sleep(0.5)
# Ctrl.selaction(1)
# Ctrl.circle(msg)
#上石子路
Ctrl.odo_changeback(0, msg=msg)
time.sleep(1)
Ctrl.walk_forward(msg=msg, left_dist=0.65)
time.sleep(0.2)
Ctrl.odo_changeback(target=1.5707, msg=msg)
msg.mode = 11 # Locomotion
msg.gait_id = 10 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.5,0,0] #forward left/rightmove rotate
msg.duration = 3600 # Zero duration means continuous motion until a new command is used.
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0,0.0,0]
Ctrl.Send_cmd(msg)
Ctrl.Wait_finish(11, 10)
Ctrl.odo_changeback(target=1.5708, msg=msg)
time.sleep( 0.1 )
Ctrl.stone(msg)
Ctrl.Recovery_stand(msg)
time.sleep(3)
Ctrl.odo_changeback(target=1.5708,msg=msg)
time.sleep(3)
Ctrl.walk_forward(msg,left_dist= 0.4)
time.sleep(3)
Ctrl.rightside_align(msg=msg,expect_dist= 1.15)
time.sleep(4)
Ctrl.odo_changeback(target=3.1415, msg=msg)
time.sleep(1)
Ctrl.speedump_and_circle(msg)
Ctrl.jump(msg,1200,1)
Ctrl.odo_changeback(target=3.1415, msg=msg)
Ctrl.side_equal(msg)
# while True:
# Ctrl.Recovery_stand(msg)
# Ctrl.jump(msg,1200,1)
# time.sleep(0.1)
except KeyboardInterrupt:
pass
Ctrl.quit()
sys.exit()
# 激光雷达订阅
class LaserScanSubscriber(Node):
def __init__(self):
super().__init__('laser_scan_subscriber')
# 使用SensorDataQoS作为QoS配置文件
qos_profile = qos_profile_sensor_data # 这就是SensorDataQoS
self.subscription = self.create_subscription(
LaserScan,
'scan', # 替换为你的激光雷达数据发布的话题名
self.listener_callback,
qos_profile)
def listener_callback(self, msg):
global laser_rec
laser_rec['data'] = msg.ranges.tolist()
time.sleep(3)
class Robot_Ctrl(object):
def __init__(self):
# 反馈线程初始化
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") ###里程计
# self.handle_lock = Lock() ###当前读数据只允许一个handle
self.lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.usergait_msg = file_send_lcmt()
rclpy.init(args=None)
self.laser_scan_subscriber = LaserScanSubscriber()
self.laser_thread = Thread(target=self.laser_spin_func, kwargs={'subscriber': self.laser_scan_subscriber})
def laser_spin_func(self,subscriber):
rclpy.spin(subscriber)
time.sleep(0.02)
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o) ###里程计订阅话题
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start() ###启动里程计
self.laser_thread.start()
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
if(self.rec_msg.order_process_bar >= 95):
self.mode_ok = self.rec_msg.mode
# motion_rec['mode'] = self.rec_msg.mode
# motion_rec['gait_id'] = self.rec_msg.gait_id
# motion_rec['contact'] = self.rec_msg.contact
# motion_rec['order_process_bar'] = self.rec_msg.order_process_bar
# motion_rec['switch_status'] = self.rec_msg.switch_status
# motion_rec['ori_error'] =self.rec_msg.ori_error
# motion_rec['footpos_error'] = self.rec_msg.footpos_error
# motion_rec['motor_error'] = self.rec_msg.motor_error
#print(motion_rec)
else:
self.mode_ok = 0
def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用
self.odo_msg = localization_lcmt().decode(data)
# odo_rec['xyz'] = self.odo_msg.xyz
# odo_rec['vxyz'] = self.odo_msg.vxyz
# odo_rec['rpy'] = self.odo_msg.rpy
# odo_rec['omegaBody'] = self.odo_msg.omegaBody
# odo_rec['vBody'] = self.odo_msg.vBody
# odo_rec['timestamp'] = self.odo_msg.timestamp
# print(odo_rec['rpy'])
def rec_responce(self):
while self.runing:
# self.handle_lock.acquire()
self.lc_r.handle()
# self.handle_lock.release()
time.sleep( 0.002 )
def rec_responce_o(self): ###里程计handle 注意一时刻只能有一个handle
while self.runing:
# self.handle_lock.acquire()
self.lc_o.handle()
# self.handle_lock.release()
time.sleep(0.2)
def selaction(self,mode):#自定义步态选择参数
try:
self.send_lock.acquire()
if mode == 0:# 石子路+上下坡
steps = toml.load("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_walk.toml")
elif mode ==1:# 待定
steps = toml.load("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_highwalk.toml")
full_steps = {'step':[robot_cmd]}
k =0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k=k+1
if mode == 0:# 石子路+上下坡
f = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_walk_full.toml", 'w')
elif mode ==1:# 待定
f = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_highwalk_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
if mode == 0:# 石子路+上下坡
file_obj_gait_def = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Def_walk.toml",'r')
file_obj_gait_params = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_walk_full.toml",'r')
elif mode ==1:# 待定
file_obj_gait_def = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Def_highwalk.toml",'r')
file_obj_gait_params = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Gait_Params_highwalk_full.toml",'r')
self.usergait_msg.data = file_obj_gait_def.read()
self.lcm_usergait.publish("user_gait_file",self.usergait_msg.encode())
time.sleep(0.5)
self.usergait_msg.data = file_obj_gait_params.read()
self.lcm_usergait.publish("user_gait_file",self.usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
if mode == 0:# 石子路+上下坡
user_gait_list = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Usergait_List.toml",'r')
if mode == 1:# 石子路+上下坡
user_gait_list = open("/home/cyberdog_sim/src/loco_hl_example/basic_motion_test_comb/Usergait_List1.toml",'r')
steps = toml.load(user_gait_list)
for step in steps['step']:
self.cmd_msg.mode = step['mode']
self.cmd_msg.value = step['value']
self.cmd_msg.contact = step['contact']
self.cmd_msg.gait_id = step['gait_id']
self.cmd_msg.duration = step['duration']
self.cmd_msg.life_count += 1
for i in range(3):
self.cmd_msg.vel_des[i] = step['vel_des'][i]
self.cmd_msg.rpy_des[i] = step['rpy_des'][i]
self.cmd_msg.pos_des[i] = step['pos_des'][i]
self.cmd_msg.acc_des[i] = step['acc_des'][i]
self.cmd_msg.acc_des[i+3] = step['acc_des'][i+3]
self.cmd_msg.foot_pose[i] = step['foot_pose'][i]
self.cmd_msg.ctrl_point[i] = step['ctrl_point'][i]
for i in range(2):
self.cmd_msg.step_height[i] = step['step_height'][i]
self.lcm_cmd.publish("robot_control_cmd",self.cmd_msg.encode())
time.sleep( 0.1 )
if mode==0:#石子路+上下坡
for i in range(550): #10s Heat beat It is used to maintain the heartbeat when life count is not updated
self.lcm_cmd.publish("robot_control_cmd",self.cmd_msg.encode())
# self.err_handler(msg = robot_control_cmd_lcmt())
time.sleep( 0.2 )
if mode==1:#台阶
for i in range(50): #10s Heat beat It is used to maintain the heartbeat when life count is not updated
self.lcm_cmd.publish("robot_control_cmd",self.cmd_msg.encode())
# self.err_handler(msg = robot_control_cmd_lcmt())
time.sleep( 0.2 )
self.send_lock.release()
except KeyboardInterrupt:
self.cmd_msg.mode = 7 #PureDamper before KeyboardInterrupt:
self.cmd_msg.gait_id = 0
self.cmd_msg.duration = 0
self.cmd_msg.life_count += 1
self.lcm_cmd.publish("robot_control_cmd",self.cmd_msg.encode())
self.send_lock.release()
pass
def Wait_finish(self, mode, gait_id,timeout = 2000): #timeout // 0.005 = 多少秒超时
count = 0
while self.runing and count < timeout: #10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated
self.lc_s.publish("robot_control_cmd",self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep( 0.005 )
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.send_lock.release()
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
self.odo_thread.join()
self.laser_thread.join()
def jump(self,msg,duration,type):#跳跃
msg.mode = 16 # Locomotion
msg.gait_id = type # 0左跳 1前跳 3右跳 6原地跳
msg.duration = duration # Zero duration means continuous motion until a new command is used.
# Continuous motion can interrupt non-zero duration interpolation motion
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(16, 1)
time.sleep(0.1)
def Recovery_stand(self,msg):
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
def Pure_Damp(self,msg,mode):
msg.mode = 7 # PureDamp
msg.gait_id = mode#0为自然倒下1为受控倒下
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
def circle(self,msg,R=0.6,T=12):#半径,时间
msg.mode = 11 # Locomotion
msg.gait_id = 10 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [PI*R*2/T,0,2*PI/T] #forward left/rightmove rotate
msg.duration =T*550 # Zero duration means continuous motion until a new command is used.
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0,0.0,0]
self.Send_cmd(msg)
self.Wait_finish(11, 10)
def stone(self,msg):
self.Recovery_stand(msg)
self.selaction(0)
time.sleep(0.1)
# def locomotion(s)
def slope(self,msg):
self.Recovery_stand(msg)
self.selaction(1)
def err_handler(self,msg):
if motion_rec['switch_status']==3:#如果状态为高祖尼保护模式
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
#雷达以及里程计
# 探测直线的时候需要的二分查找
def bio_select(self, k, b, start, end, dist_limit, data_with_order_x, data_with_order_y):
left, right = start, end - 1
# 二分查找
while left < right:
mid = (left + right) // 2
x_mid, y_mid = data_with_order_x[mid], data_with_order_y[mid]
# 计算点到直线的距离
distance = abs(k * x_mid - y_mid + b) / (k ** 2 + 1) ** 0.5
if distance <= dist_limit:
# 点在直线附近,继续向右搜索
left = mid + 1
else:
# 点不在直线附近,缩小搜索范围
right = mid
# 返回直线附近点的范围
return left
def detect_lines(self): # data 180个的数组
dist_limit = 0.10
init_points = 5
data_with_order_x = []
data_with_order_y = []
data = laser_rec['data']
# degree = self.odo_msg.rpy[2]
# 过滤无效值
for i in range(180):
if data[i] < 0.12 or data[i] > 12:
continue
# data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离
data_with_order_x.append(data[i] * math.cos(math.radians(i - 90)))
data_with_order_y.append(data[i] * math.sin(math.radians(i - 90))) # x y 轴
len_filtered = len(data_with_order_x)
# print(len(data_with_order_x))
# for i in range(0, len(data_with_order_x)):
# print('(', end='')
# print(data_with_order_x[i], data_with_order_y[i], sep=' ,', end='')
# print('),', end='\n')
# 储存直线方程
lines = []
# 拐动位置
corners = [0]
while corners[-1] < len_filtered - init_points - 1:
start = corners[-1]
k1_init, b_init = numpy.polyfit(data_with_order_x[start:start + init_points],
data_with_order_y[start:start + init_points], 1)
# print(k1_init, b_init)
# 二分查找 end 是范围内最后一个点
end = self.bio_select(k1_init, b_init, start, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
# 允许一次偏差出现此时end + 1 被击毙了,用现有的start - end 再拟合新直线,看 end + 1 还在不在外边,
# 如果还在外边,大概率是救不回了
# 在里面还可以二分一下
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end], data_with_order_y[start:end], 1)
dist_bias = (abs(k1_fixed * data_with_order_x[end] - data_with_order_y[end] + b1_fixed) /
(k1_fixed ** 2 + 1) ** 0.5)
if dist_bias < dist_limit:
end_new = self.bio_select(k1_fixed, b1_fixed, end, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end_new],
data_with_order_y[start:end_new], 1)
else:
end_new = end
corners.append(end_new)
# print(end)
lines.append([k1_fixed, b1_fixed])
if len(lines) >= 3:
print(corners)
break
# for i in lines:
# # print(f'{i[0]}x - y + {i[1]} = 0')
# print(f'[{i[0]}, {i[1]}],')
return lines
####修改后的里程计转动函数更加精确,一次性可以转到对应位置
def odo_changeback(self, target, msg, limit = 0.04,timesleep_s = 5): ##允许误差0.05弧度大概是2.86度
const_int = 2470 # 转 1.57 弧度 大概要 3875 duration 每弧度大概这个值 持续时间大概6.5秒
dist = target - self.odo_msg.rpy[2]
print('odo_changeback',self.odo_msg.rpy[2],target)
while abs(dist) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0, 0.5 if dist > 0 else -0.5] #转向
msg.duration = int(const_int * abs(dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(7 * abs(dist) / 1.57)
# print('1 times finish angle',self.odo_msg.rpy[2])
# return
dist = target - self.odo_msg.rpy[2]
print('odo_changeback',self.odo_msg.rpy[2],target)
return
def side_equal(self, msg, limit = 0.08 , off_set = 0.0, timeout = 600): ## 允许最大误差0.08米即离最中线左右偏向4厘米
###先里程计回正再算距离
const_int = 6666 #对 差的距离 进行倍数放大 控制左右移动时间
###2600
lines = self.detect_lines()
while len(lines) < 3:
time.sleep(0.04)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5),3)
# print(f'before move to equal: r :{dist1} l: {dist2}')
dist_diff = dist1 - dist2 - 2*off_set
while abs(dist_diff) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0, -0.15, 0] if dist_diff > 0 else [0, 0.15, 0]
msg.duration = int(const_int * abs(dist_diff)/2)
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0,0.3,0]
self.Send_cmd(msg)
time.sleep(msg.duration/1000+0.5)
time.sleep(1)
lines = self.detect_lines()
while len(lines) < 3:
time.sleep(0.04)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5),3)
dist_diff = dist1 - dist2
def side_equal_base_on_right(self, msg, limit = 0.08 , off_set = 0.0, timeout = 600):
const_int = 6666 #对 差的距离 进行倍数放大 控制左右移动时间
###2600
lines = self.detect_lines()
while len(lines) < 2:
time.sleep(0.04)
lines = self.detect_lines()
####修改后的前向运动函数不会出现直线扫描失误问题,但依旧需要在里程计回正之后才能使用
def walk_forward(self, msg, left_dist, limit = 0.04, timeout = 1200,k_bound = 5): ##在对正之后才能调用left_dist指定前面预留多长的距离
const_int = 1667 #对 差的距离 进行倍数放大 控制前进时间
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
print("dist :"dist)
# for i in lines:
# print(i)
while (len(lines) <= 1 or (lines[1][0] < k_bound and lines[1][0] > -k_bound)):
time.sleep(0.04)
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
dist_need_walk = dist - left_dist
while abs(dist_need_walk) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.6 if dist_need_walk > 0 else -0.6 , 0 , 0]
msg.duration = int(const_int * abs(dist_need_walk))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(11,27,timeout=timeout)
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
# while (len(lines) <= 1 or (lines[1][0] <k_bound and lines[1][0]>-k_bound)):
# time.sleep(0.04)
# lines = self.detect_lines()
# dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
# dist_need_walk = dist - left_dist
print('walk_forward dis_after',dist)
return
def turnleft_corner(self,msg,limit = 0.04,timeout = 1200):### 假设已经回正,根据外侧两条直线测算应该到的位置
###经过测试右外侧0.985米上外侧0.768米
right_dis = 0.985
up_dis = 0.65
const_int = 6600
lines = self.detect_lines()
while len(lines) < 2:
time.sleep(0.04)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
dist2 = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
print('turnleft_corner before',dist1,dist2)
self.walk_forward(msg,left_dist=up_dis,timeout=2000)
time.sleep(0.2)
right_dis_diff = right_dis - dist1
while abs(right_dis_diff) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0,0.15 if right_dis_diff > 0 else -0.15,0]
msg.duration = int(const_int * abs(right_dis_diff))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0,0.3,0]
self.Send_cmd(msg)
self.Wait_finish(11,27,timeout)
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
right_dis_diff = right_dis - dist1
print('final right dis' ,right_dis_diff)
return
# time.sleep(2)
####对部分地形,只使用扫描到的右侧直线即对正赛道
def rightside_align(self,msg,expect_dist = 0.65,limit = 0.04,timeout = 800):
const_int = 5000
lines = self.detect_lines()
dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
dist_need_walk = dist - expect_dist
while abs(dist_need_walk) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0,-0.2 if dist_need_walk > 0 else 0.2, 0]
msg.duration = int(const_int * abs(dist_need_walk))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(11,27,timeout=timeout)
lines = self.detect_lines()
dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5),3)
dist_need_walk = dist - expect_dist
print('right align dis_after',dist)
return
def odo_tinyturn(self, target, msg, limit = 0.04,timesleep_s = 5):
const_int = 1200
dist = target - self.odo_msg.rpy[2]
print('odo_tinyturn',self.odo_msg.rpy[2],target)
while abs(dist) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0, 0.5 if dist > 0 else -0.5] #转向
msg.duration = int(const_int * abs(dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(timesleep_s)
# print('1 times finish angle',self.odo_msg.rpy[2])
# return
dist = target - self.odo_msg.rpy[2]
print('odo_tinyturn',self.odo_msg.rpy[2],target)
def speedump_and_circle(self,msg):
const_int1 = 5000 #######在走向圆柱的情况下使用的duration常数
const_int2 = 5000 #######在侧向靠近圆柱进入赛道位置的情况下使用的duration常数
y_sharp_dist = -0.08 #######给最后的对齐使用的参数 绝对值是对正之后拐点和狗y轴的差距 正常要负数
x_sharp_dist = 0.30 #######给最后的对齐使用的参数 是对正之后拐点和狗x轴的差距
#####开环走过大部分减速带
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.2, 0, 0] # 直走
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = 14000
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(11)
time.sleep(13)
###找到方向处理剩余的不确定距离,主要是向半径方向靠近圆柱
time.sleep(0.04)
laser_keep = laser_rec['data']
idx_min = laser_keep[70:110].index(min(laser_keep[70:110])) + 70 #最小值点
dist_min = laser_keep[idx_min]
angel_diff = idx_min / 90 * 1.57
print(idx_min,dist_min,angel_diff)
if dist_min > 0.15 :
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.1 * math.sin(angel_diff), 0.1 * math.cos(angel_diff), 0] # 直走
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int((dist_min - 0.15) * const_int1)
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
print('tiny walk')
####转向0度回正
time.sleep(0.5)
self.odo_changeback(1.5708,msg=msg)
self.odo_tinyturn(target=1.5708,msg=msg)
time.sleep(3)
###########################找右边直线和圆环赛道的交界点,以这个点为基准去进行对齐
###########################多次扫描去除错误点
laser_valid = [0] * 180
for i in range(10):
time.sleep(0.04)
laser = laser_rec['data']
for j in range(0,180):
if laser_valid[j] == 0 and laser[j] > 0.15:
laser_valid[j] = laser[j]
##########################根据X轴的值去找到拐角点序号idx
data_with_order_x = []
data_with_order_y = []
for i in range(180):
if laser_valid[i] < 0.12 or laser_valid[i] > 12:
continue
# data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离
data_with_order_x.append(laser_valid[i] * math.cos(math.radians(i - 90)))
data_with_order_y.append(laser_valid[i] * math.sin(math.radians(i - 90))) # x y 轴
len_filtered = len(data_with_order_x)
# print('len_filtered',len_filtered)
# print(data_with_order_x)
idx = -1
i = 60
while i < len_filtered - 10:
cnt = 0
for k in range(i+1,i+9):
if data_with_order_x[k] > data_with_order_x[k - 1]:
cnt += 1
else:
break
if cnt> 6:
idx = i
break
i+=1
###################读出拐角点的值
point_sharp_x = data_with_order_x[idx]
point_sharp_y = data_with_order_y[idx]
print('idx pt_sharp_x',idx, point_sharp_x)
print('idx pt_sharp_y',idx, point_sharp_y)
time.sleep(1)
#####根据拐角点矫正方位先在y轴上矫正
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0.2 if (point_sharp_y - y_sharp_dist) > 0 else -0.2, 0]
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int (const_int2 * abs(y_sharp_dist - point_sharp_y))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(5)
####再矫正x轴
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [-0.2 if (point_sharp_x - x_sharp_dist) < 0 else 0.2, 0, 0]
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int (const_int2 * abs(point_sharp_x - x_sharp_dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(5)
#####无限进行循环
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.165, 0, 0.3] #转向
msg.duration = 12000
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep (20)
# Main function
if __name__ == '__main__':
main()