2025-05-13 09:09:54 +00:00

1103 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# '''
# This demo show the communication interface of MR813 motion control board based on Lcm.
# Dependency:
# - robot_control_cmd_lcmt.py
# - robot_control_response_lcmt.py
# 运行代码之前需要执行:
# cd /home/cyberdog_sim
# source /opt/ros/galactic/setup.bash
# cd localpath
# python3 main.py
# 在项目中我们使用了以下版本的NumPy和OpenCV开源库
# NumPy 版本1.24.4
# OpenCV 版本4.10.0.84
# 感谢NumPy和OpenCV社区的贡献使我们能够更高效地完成本次设计。
# '''
'''
这段代码使用雷达和相机,在<origin rpy="0 0 -3.1416" xyz="2.5 8.5 0" />的起始位置
完成石板路+直角转弯4+连续碰撞
正确无失误地返回起点
'''
import lcm
import sys
import os
import time
from threading import Thread, Lock
from robot_control_cmd_lcmt import robot_control_cmd_lcmt
from robot_control_response_lcmt import robot_control_response_lcmt
from localization_lcmt import localization_lcmt
import toml
import copy
import math
from file_send_lcmt import file_send_lcmt
import rclpy
from rclpy.executors import MultiThreadedExecutor
import numpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan,Image
from rclpy.qos import QoSProfile, qos_profile_sensor_data
from pprint import pprint
import cv2
from cv_bridge import CvBridge
# 自定义步态发送数据的结构
robot_cmd = {
'mode': 0, 'gait_id': 0, 'contact': 0, 'life_count': 0,
'vel_des': [0.0, 0.0, 0.0],
'rpy_des': [0.0, 0.0, 0.0],
'pos_des': [0.0, 0.0, 0.0],
'acc_des': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'ctrl_point': [0.0, 0.0, 0.0],
'foot_pose': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
'step_height': [0.0, 0.0],
'value': 0, 'duration': 0
}
# 读取到的运动数据
# 使用 self.rec_msg.xxx 代替motion_rec 如:
# motion_rec['mode'] 替换为 self.rec_msg.mode
# motion_rec = {
# 'mode':0,
# 'gait_id':0,
# 'contact':0,
# 'order_process_bar':0,
# 'switch_status':0,
# 'ori_error':0,
# 'footpos_error':0,
# 'motor_error':(0) * 12,
# }
# 里程计数据
# 使用 self.odo_msg.xxx 代替odo_rec 如:
# odo_rec['xyz'] 替换为 self.odo_msg.xyz
# odo_rec = {
# 'xyz' : (0.0, 0.0, 0.0),
# 'vxyz' : (0.0, 0.0, 0.0),
# 'rpy' : (0.0, 0.0, 0.0),
# 'omegaBody' : (0.0, 0.0, 0.0),
# 'vBody' : (0.0, 0.0, 0.0),
# 'timestamp' : 0
# }
# 雷达数据
laser_rec = {
'data': [0] * 180
}
RGB_res = 0
def main():
Ctrl = Robot_Ctrl()
msg = robot_control_cmd_lcmt()
# pos = -1
try:
Ctrl.run()
##############################################通过RGB代码#####################################################################
# Ctrl.Recovery_stand(msg)
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3000
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# if pos < 0:
# print(RGB_res)
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# pos = 1
# else:
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# pos = -1
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3333
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
# if pos < 0:
# print(RGB_res)
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# pos = 1
# else:
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# pos = -1
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3333
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
# if pos < 0:
# print(RGB_res)
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# pos = 1
# else:
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# pos = -1
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3333
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
# if pos < 0:
# print(RGB_res)
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# pos = 1
# else:
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# pos = -1
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3333
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
# if pos < 0:
# print(RGB_res)
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# pos = 1
# else:
# if RGB_res > 0:
# Ctrl.rightside_align(msg=msg,expect_dist=0.4)
# else:
# Ctrl.rightside_align(msg=msg,expect_dist=0.9)
# pos = -1
# msg.mode = 11 # Locomotion
# msg.gait_id = 27
# msg.vel_des = [0.3, 0, 0]
# msg.duration = 3333
# msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# msg.life_count += 1
# Ctrl.Send_cmd(msg)
# time.sleep(10)
except KeyboardInterrupt:
pass
Ctrl.quit()
sys.exit()
# pid 控制
class DeltaPid(object):
'''
PID calculate
pwm = pre_pwm + kp*(err-pre_ee) + ki*err + kd*(err-2*pre_err+pre_pre_ee)
'''
def __init__(self, target_val, max_val, min_val, p, i, d):
self.max_val = max_val
self.min_val = min_val
self.k_p = p
self.k_i = i
self.k_d = d
self.target_val = target_val
self._pre_val = target_val
self._pre_pre_val = target_val - 1
def calculate(self, cur_val, pwm_in):
# pwm = pre_pwm + kp*(err-pre_ee) + ki*err + kd*(err-2*pre_err+pre_pre_ee)
pwm_out = 0
p_change = self.k_p * (cur_val - self._pre_val)
i_change = self.k_i * (cur_val - self.target_val)
d_change = self.k_d * (cur_val - 2 * self._pre_val + self._pre_pre_val)
print(f"p:{p_change} i:{i_change} d:{d_change}")
delta_output = p_change + i_change + d_change
print(f"p+i+d output={delta_output}")
pwm_out = delta_output + pwm_in
print(f"calculate pwm={pwm_out}")
self._pre_pre_val = self._pre_val
self._pre_val = cur_val
pwm_out = self.max_val if pwm_out > self.max_val else (self.min_val if pwm_out < self.min_val else pwm_out)
print(f"actual output pwm={pwm_out}")
return pwm_out
# 激光雷达订阅
class LaserScanSubscriber(Node):
def __init__(self):
super().__init__('laser_scan_subscriber')
# 使用SensorDataQoS作为QoS配置文件
qos_profile = qos_profile_sensor_data # 这就是SensorDataQoS
self.subscription = self.create_subscription(
LaserScan,
'scan', # 替换为你的激光雷达数据发布的话题名
self.listener_callback,
qos_profile)
def listener_callback(self, msg):
global laser_rec
laser_rec['data'] = msg.ranges.tolist()
class RGB_Subscriber(Node):
def __init__(self):
super().__init__('RGB_subscriber')
qos_profile = qos_profile_sensor_data
self.subscription = self.create_subscription(
Image,
'/image_rgb',
self.listener_callback,
qos_profile)
self.bridge = CvBridge()
def find_green_location(self, image, red_range = [20, 35], green_range = [50, 80]): #调参部分1
height, width = image.shape[:2]
central_part = [2 * width // 8, 4 * width // 8, 0, 4 * height // 8] #调参部分2
central_image = image[central_part[2]:central_part[3], central_part[0]:central_part[1]]
# cv2.imshow("image",central_image)
# cv2.waitKey(0)
_,g,r = cv2.split(central_image)
mean_green = numpy.mean(g)
mean_red = numpy.mean(r)
if green_range[0] <= mean_green <= green_range[1] and red_range[0] <= mean_red <= red_range[1]: # 如果x坐标的平均值大于图像宽度的一半
return 1
else:
return -1
def mid_before_bridge():
return
def listener_callback(self, msg):
# Convert the ROS Image message to an OpenCV image
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
global RGB_res
RGB_res = self.find_green_location(cv_image)
cv2.imwrite("image.jpg",cv_image)
time.sleep(3)
def get_image(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
cv2.imshow("image",cv_image)
cv2.waitKey()
class Robot_Ctrl(object):
def __init__(self):
# 反馈线程初始化
self.rec_thread = Thread(target=self.rec_responce)
self.send_thread = Thread(target=self.send_publish)
self.odo_thread = Thread(target=self.rec_responce_o)
self.lc_r = lcm.LCM("udpm://239.255.76.67:7670?ttl=255")
self.lc_s = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.cmd_msg = robot_control_cmd_lcmt()
self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt()
self.send_lock = Lock()
self.delay_cnt = 0
self.mode_ok = 0
self.gait_ok = 0
self.runing = 1
self.RGB_res = 0
self.lc_o = lcm.LCM("udpm://239.255.76.67:7667?ttl=255") ###里程计
# self.handle_lock = Lock() ###当前读数据只允许一个handle
self.lcm_cmd = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.lcm_usergait = lcm.LCM("udpm://239.255.76.67:7671?ttl=255")
self.usergait_msg = file_send_lcmt()
rclpy.init(args=None)
self.laser_scan_subscriber = LaserScanSubscriber()
self.RGB_subscriber = RGB_Subscriber()
self.executor = MultiThreadedExecutor()
self.laser_thread = Thread(target=self.spin_func, args=(self.laser_scan_subscriber,))
self.RGB_thread = Thread(target=self.spin_func, args=(self.RGB_subscriber,))
# 控制pid
self.yaw_pid = DeltaPid(0.0, 1.2, 0.01, 10, 0.7, 0.3) # 控制yaw角pid初始化
def spin_func(self, subscriber):
self.executor.add_node(subscriber)
self.executor.spin()
time.sleep(0.02)
def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler)
self.lc_o.subscribe("global_to_robot", self.msg_handler_o) ###里程计订阅话题
self.send_thread.start()
self.rec_thread.start()
self.odo_thread.start() ###启动里程计
self.laser_thread.start()
self.RGB_thread.start()
# self.sub_thread
def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data)
if (self.rec_msg.order_process_bar >= 95):
self.mode_ok = self.rec_msg.mode
# motion_rec['mode'] = self.rec_msg.mode
# motion_rec['gait_id'] = self.rec_msg.gait_id
# motion_rec['contact'] = self.rec_msg.contact
# motion_rec['order_process_bar'] = self.rec_msg.order_process_bar
# motion_rec['switch_status'] = self.rec_msg.switch_status
# motion_rec['ori_error'] =self.rec_msg.ori_error
# motion_rec['footpos_error'] = self.rec_msg.footpos_error
# motion_rec['motor_error'] = self.rec_msg.motor_error
# print(motion_rec)
else:
self.mode_ok = 0
def msg_handler_o(self, channel, data): ###里程计解码函数,被按照频率调用
self.odo_msg = localization_lcmt().decode(data)
# odo_rec['xyz'] = self.odo_msg.xyz
# odo_rec['vxyz'] = self.odo_msg.vxyz
# odo_rec['rpy'] = self.odo_msg.rpy
# odo_rec['omegaBody'] = self.odo_msg.omegaBody
# odo_rec['vBody'] = self.odo_msg.vBody
# odo_rec['timestamp'] = self.odo_msg.timestamp
# print(odo_rec['rpy'])
def rec_responce(self):
while self.runing:
# self.handle_lock.acquire()
self.lc_r.handle()
# self.handle_lock.release()
time.sleep(0.002)
def rec_responce_o(self): ###里程计handle 注意一时刻只能有一个handle
while self.runing:
# self.handle_lock.acquire()
self.lc_o.handle()
# self.handle_lock.release()
time.sleep(0.002)
def selaction(self, mode): # 自定义步态选择参数
try:
self.send_lock.acquire()
if mode == 0: # 石子路
steps = toml.load("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk.toml")
elif mode == 1: # 上下坡
steps = toml.load("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk.toml")
full_steps = {'step': [robot_cmd]}
k = 0
for i in steps['step']:
cmd = copy.deepcopy(robot_cmd)
cmd['duration'] = i['duration']
if i['type'] == 'usergait':
cmd['mode'] = 11 # LOCOMOTION
cmd['gait_id'] = 110 # USERGAIT
cmd['vel_des'] = i['body_vel_des']
cmd['rpy_des'] = i['body_pos_des'][0:3]
cmd['pos_des'] = i['body_pos_des'][3:6]
cmd['foot_pose'][0:2] = i['landing_pos_des'][0:2]
cmd['foot_pose'][2:4] = i['landing_pos_des'][3:5]
cmd['foot_pose'][4:6] = i['landing_pos_des'][6:8]
cmd['ctrl_point'][0:2] = i['landing_pos_des'][9:11]
cmd['step_height'][0] = math.ceil(i['step_height'][0] * 1e3) + math.ceil(
i['step_height'][1] * 1e3) * 1e3
cmd['step_height'][1] = math.ceil(i['step_height'][2] * 1e3) + math.ceil(
i['step_height'][3] * 1e3) * 1e3
cmd['acc_des'] = i['weight']
cmd['value'] = i['use_mpc_traj']
cmd['contact'] = math.floor(i['landing_gain'] * 1e1)
cmd['ctrl_point'][2] = i['mu']
if k == 0:
full_steps['step'] = [cmd]
else:
full_steps['step'].append(cmd)
k = k + 1
if mode == 0: # 石子路
f = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk_full.toml", 'w')
elif mode == 1: # 上下坡
f = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk_full.toml", 'w')
f.write("# Gait Params\n")
f.writelines(toml.dumps(full_steps))
f.close()
if mode == 0: # 石子路
file_obj_gait_def = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Def_walk.toml", 'r')
file_obj_gait_params = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_walk_full.toml", 'r')
elif mode == 1: # 上下坡
file_obj_gait_def = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Def_scopewalk.toml", 'r')
file_obj_gait_params = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Gait_Params_scopewalk_full.toml",
'r')
self.usergait_msg.data = file_obj_gait_def.read()
self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode())
time.sleep(0.5)
self.usergait_msg.data = file_obj_gait_params.read()
self.lcm_usergait.publish("user_gait_file", self.usergait_msg.encode())
time.sleep(0.1)
file_obj_gait_def.close()
file_obj_gait_params.close()
user_gait_list = open("/home/git/project2248014-237570/back_to_start/back_to_start_main/Usergait_List.toml", 'r')
steps = toml.load(user_gait_list)
for step in steps['step']:
self.cmd_msg.mode = step['mode']
self.cmd_msg.value = step['value']
self.cmd_msg.contact = step['contact']
self.cmd_msg.gait_id = step['gait_id']
self.cmd_msg.duration = step['duration']
self.cmd_msg.life_count += 1
for i in range(3):
self.cmd_msg.vel_des[i] = step['vel_des'][i]
self.cmd_msg.rpy_des[i] = step['rpy_des'][i]
self.cmd_msg.pos_des[i] = step['pos_des'][i]
self.cmd_msg.acc_des[i] = step['acc_des'][i]
self.cmd_msg.acc_des[i + 3] = step['acc_des'][i + 3]
self.cmd_msg.foot_pose[i] = step['foot_pose'][i]
self.cmd_msg.ctrl_point[i] = step['ctrl_point'][i]
for i in range(2):
self.cmd_msg.step_height[i] = step['step_height'][i]
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
time.sleep(0.1)
for i in range(200): # 10s Heat beat It is used to maintain the heartbeat when life count is not updated
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
self.err_handler(msg=robot_control_cmd_lcmt())
time.sleep(0.2)
self.send_lock.release()
except KeyboardInterrupt:
self.cmd_msg.mode = 7 # PureDamper before KeyboardInterrupt:
self.cmd_msg.gait_id = 0
self.cmd_msg.duration = 0
self.cmd_msg.life_count += 1
self.lcm_cmd.publish("robot_control_cmd", self.cmd_msg.encode())
self.send_lock.release()
pass
def Wait_finish(self, mode, gait_id, timeout=2000): # timeout // 0.005 = 多少秒超时
count = 0
while self.runing and count < timeout: # 10s
if self.mode_ok == mode and self.gait_ok == gait_id:
return True
else:
time.sleep(0.005)
count += 1
def send_publish(self):
while self.runing:
self.send_lock.acquire()
if self.delay_cnt > 20: # Heartbeat signal 10HZ, It is used to maintain the heartbeat when life count is not updated
self.lc_s.publish("robot_control_cmd", self.cmd_msg.encode())
self.delay_cnt = 0
self.delay_cnt += 1
self.send_lock.release()
time.sleep(0.005)
def Send_cmd(self, msg):
self.send_lock.acquire()
self.delay_cnt = 50
self.cmd_msg = msg
self.send_lock.release()
def Ctrl_yaw(self, msg, angle_set):
while abs(angle_set - self.odo_msg.rpy[2] - angle_set) > 0.05:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0, -0.5] # 转向
msg.duration = 300 # Zero duration means continuous motion until a new command is used.
# Continuous motion can interrupt non-zero duration interpolation motion
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(11, 26)
self.yaw_pid.calculate(self.odo_msg.rpy[2], )
def quit(self):
self.runing = 0
self.rec_thread.join()
self.send_thread.join()
self.odo_thread.join()
self.RGB_thread.join()
self.laser_thread.join()
def jump(self, msg, duration, type): # 跳跃
msg.mode = 16 # Locomotion
msg.gait_id = type # 0左跳 1前跳 3右跳 6原地跳
msg.duration = duration # Zero duration means continuous motion until a new command is used.
# Continuous motion can interrupt non-zero duration interpolation motion
msg.life_count += 1
self.Send_cmd(msg)
self.Wait_finish(16, 1)
time.sleep(0.1)
def Recovery_stand(self, msg):
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
def Pure_Damp(self, msg, mode):
msg.mode = 7 # PureDamp
msg.gait_id = mode # 0为自然倒下1为受控倒下
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
def stone(self, msg):
self.Recovery_stand(msg)
self.selaction(0)
time.sleep(0.1)
# def locomotion(s)
def slope(self, msg):
self.Recovery_stand(msg)
self.selaction(1)
def err_handler(self, msg):
if self.rec_msg.switch_status == 3: # 如果状态为高祖尼保护模式
msg.mode = 12 # Recovery stand
msg.gait_id = 0
msg.life_count += 1 # Command will take effect when life_count update
self.Send_cmd(msg)
self.Wait_finish(12, 0)
# 雷达以及里程计
# 探测直线的时候需要的二分查找
def bio_select(self, k, b, start, end, dist_limit, data_with_order_x, data_with_order_y):
left, right = start, end - 1
# 二分查找
while left < right:
mid = (left + right) // 2
x_mid, y_mid = data_with_order_x[mid], data_with_order_y[mid]
# 计算点到直线的距离
distance = abs(k * x_mid - y_mid + b) / (k ** 2 + 1) ** 0.5
if distance <= dist_limit:
# 点在直线附近,继续向右搜索
left = mid + 1
else:
# 点不在直线附近,缩小搜索范围
right = mid
# 返回直线附近点的范围
return left
def detect_lines(self, data=laser_rec['data']): # data 180个的数组
dist_limit = 0.10
init_points = 5
data_with_order_x = []
data_with_order_y = []
data = laser_rec['data']
# degree = self.odo_msg.rpy[2]
# 过滤无效值
for i in range(180):
if data[i] < 0.12 or data[i] > 12:
continue
# data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离
data_with_order_x.append(data[i] * math.cos(math.radians(i - 90)))
data_with_order_y.append(data[i] * math.sin(math.radians(i - 90))) # x y 轴
len_filtered = len(data_with_order_x)
# print(len(data_with_order_x))
# for i in range(0, len(data_with_order_x)):
# print('(', end='')
# print(data_with_order_x[i], data_with_order_y[i], sep=' ,', end='')
# print('),', end='\n')
# 储存直线方程
lines = []
# 拐动位置
corners = [0]
while corners[-1] < len_filtered - init_points - 1:
start = corners[-1]
k1_init, b_init = numpy.polyfit(data_with_order_x[start:start + init_points],
data_with_order_y[start:start + init_points], 1)
# print(k1_init, b_init)
# 二分查找 end 是范围内最后一个点
end = self.bio_select(k1_init, b_init, start, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
# 允许一次偏差出现此时end + 1 被击毙了,用现有的start - end 再拟合新直线,看 end + 1 还在不在外边,
# 如果还在外边,大概率是救不回了
# 在里面还可以二分一下
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end], data_with_order_y[start:end], 1)
dist_bias = (abs(k1_fixed * data_with_order_x[end] - data_with_order_y[end] + b1_fixed) /
(k1_fixed ** 2 + 1) ** 0.5)
if dist_bias < dist_limit:
end_new = self.bio_select(k1_fixed, b1_fixed, end, len_filtered, dist_limit, data_with_order_x,
data_with_order_y)
k1_fixed, b1_fixed = numpy.polyfit(data_with_order_x[start:end_new],
data_with_order_y[start:end_new], 1)
else:
end_new = end
corners.append(end_new)
# print(end)
lines.append([k1_fixed, b1_fixed])
if len(lines) >= 3:
print(corners)
break
# for i in lines:
# # print(f'{i[0]}x - y + {i[1]} = 0')
# print(f'[{i[0]}, {i[1]}],')
return lines
def circle_dist(self,target,location):
###3.1416 -3.1416 是同一个值
value1 = abs(target - location)
value2 = 6.2832 - value1
direction1 = 1 if target > location else 0 # ##按value1转动1为逆时针 0 为顺时针
if value1 < value2:
return direction1,value1
else:
return 1-direction1,value2
def odo_verticalturn(self, target, msg, limit=0.04, timesleep_s=5): ##允许误差0.04弧度大概是2.29度
const_int = 2470 # 转 1.57 弧度 大概要 3875 duration 每弧度大概这个值 持续时间大概6.5秒
loc = self.odo_msg.rpy[2]
direction, dist = self.circle_dist(target=target, location=loc)
print('odo_verticalturn', self.odo_msg.rpy[2], target)
if abs(dist) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0, 0.5 if direction > 0 else -0.5] # 转向
msg.duration = int(const_int * abs(dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(7 * abs(dist) / 1.57)
# print('1 times finish angle',self.odo_msg.rpy[2])
# return
dist = target - self.odo_msg.rpy[2]
print('odo_verticalturn', self.odo_msg.rpy[2], target)
def side_equal(self, msg, limit=0.08, off_set=0.0, timeout=600): ## 允许最大误差0.08米即离最中线左右偏向4厘米
###先里程计回正再算距离
const_int = 6666 # 对 差的距离 进行倍数放大 控制左右移动时间
###2600
lines = self.detect_lines()
while len(lines) < 3:
time.sleep(0.04)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3)
# print(f'before move to equal: r :{dist1} l: {dist2}')
dist_diff = dist1 - dist2 - 2 * off_set
while abs(dist_diff) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0, -0.15, 0] if dist_diff > 0 else [0, 0.15, 0]
msg.duration = int(const_int * abs(dist_diff) / 2)
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
msg.rpy_des = [0, 0.3, 0]
self.Send_cmd(msg)
time.sleep(msg.duration / 1000 + 0.5)
lines = self.detect_lines()
while len(lines) < 3:
time.sleep(0.04)
lines = self.detect_lines()
dist1 = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist2 = round(abs(lines[2][1] / ((lines[2][0] * lines[2][0] + 1)) ** 0.5), 3)
dist_diff = dist1 - dist2
def walk_forward(self, msg, left_dist, limit=0.04, timeout_s=6, k_bound=5, exp_speed = 0.6): ##在对正之后才能调用left_dist指定前面预留多长的距离
const_int = 1667 # 对 差的距离 进行倍数放大 控制前进时间
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3)
print(dist)
# for i in lines:
# print(i)
while (len(lines) <= 1 or (lines[1][0] < k_bound and lines[1][0] > -k_bound)):
time.sleep(0.04)
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3)
dist_need_walk = dist - left_dist
while abs(dist_need_walk) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [exp_speed if dist_need_walk > 0 else -exp_speed, 0, 0]
msg.duration = int(const_int * abs(dist_need_walk) * 0.6/exp_speed)
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
# if dist_need_walk > 3:
# msg.duration = 4500
# msg.life_count += 1
# self.Wait_finish(11,27,timeout=1200)
# print('固定运动')
# return self.walk_forward(msg,left_dist,limit,timeout)
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(timeout_s)
lines = self.detect_lines()
dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5), 3)
# while (len(lines) <= 1 or (lines[1][0] <k_bound and lines[1][0]>-k_bound)):
# time.sleep(0.04)
# lines = self.detect_lines()
# dist = round(abs(lines[1][1] / ((lines[1][0] * lines[1][0] + 1)) ** 0.5),3)
# dist_need_walk = dist - left_dist
print('walk_forward dis_after', dist)
return
def turnleft_corner(self, msg, target1=0, target2=1.57, dist1_right=0.85, dist2_right=0.35):
'''左转时定位我们需要的位置,在接近拐角的任意位置启动函数,可以顺利运动到下一条赛道的起点'''
'''target1是第一条赛道的正方向,target2是第二条赛道的正方向,dist1_right是第一条赛道右侧保持距离,dist2_right是第二条赛道右侧保持的距离'''
self.odo_changeback(target=target1, msg=msg)
time.sleep(3)
self.rightside_align(msg=msg, expect_dist=dist1_right)
time.sleep(3)
self.walk_forward(msg, left_dist=dist2_right)
time.sleep(4)
self.odo_changeback(target=target2, msg=msg)
def rightside_align(self, msg, expect_dist=0.65, limit=0.04, timeout_s=4):
const_int = 5000
lines = self.detect_lines()
dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist_need_walk = dist - expect_dist
while abs(dist_need_walk) > limit:
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0, -0.2 if dist_need_walk > 0 else 0.2, 0]
msg.duration = int(const_int * abs(dist_need_walk))
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(timeout_s)
lines = self.detect_lines()
dist = round(abs(lines[0][1] / ((lines[0][0] * lines[0][0] + 1)) ** 0.5), 3)
dist_need_walk = dist - expect_dist
print('right align dis_after', dist)
return
###cross_speedump 在 第二个转角回正之后调用,回正参数:
# Ctrl.Recovery_stand(msg)
# Ctrl.odo_changeback(target=0,msg=msg)
# Ctrl.walk_forward(msg,left_dist= 0.40,timeout_s=3)
# Ctrl.rightside_align(msg=msg,expect_dist= 1.15,timeout_s=5)
# Ctrl.odo_changeback(target=1.57, msg=msg)
# Ctrl.cross_speedump(msg=msg)
def cross_speedump(self, msg):
const_int1 = 5000 #######在走向圆柱的情况下使用的duration常数
const_int2 = 5000 #######在侧向靠近圆柱进入赛道位置的情况下使用的duration常数调大 -> 机械狗横向后更加靠近圆柱
y_sharp_dist = -0.18 #######给最后的对齐使用的参数 绝对值是对正之后拐点和狗y轴的差距 正常要负数
x_sharp_dist = 0.2 #######给最后的对齐使用的参数
#####开环走过大部分减速带
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.2, 0, 0] # 直走
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = 14000
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(11)
laser_mid = laser_rec['data']
lines_mid = self.detect_lines(data=laser_mid)
time.sleep(13)
###找到方向处理剩余的不确定距离,主要是向半径方向靠近圆柱
# self.odo_changeback(1.57,msg)
time.sleep(0.04)
laser_keep = laser_rec['data']
idx_min = laser_keep[70:110].index(min(laser_keep[70:110])) + 70 # 最小值点
dist_min = laser_keep[idx_min]
angel_diff = idx_min / 90 * 1.57
print(idx_min, dist_min, angel_diff)
if dist_min > 0.15:
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.1 * math.sin(angel_diff), 0.1 * math.cos(angel_diff), 0] # 直走
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int((dist_min - 0.15) * const_int1)
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
print('tiny walk')
####转向0度回正
time.sleep(0.5)
self.odo_changeback(0, msg=msg)
time.sleep(2)
###########################找右边直线和圆环赛道的交界点,以这个点为基准去进行对齐
###########################多次扫描去除错误点
laser_valid = [0] * 180
for i in range(10):
time.sleep(0.04)
laser = laser_rec['data']
for j in range(0, 180):
if laser_valid[j] == 0 and laser[j] > 0.15:
laser_valid[j] = laser[j]
##########################根据X轴的值去找到拐角点序号idx
data_with_order_x = []
data_with_order_y = []
validist_store = []
for i in range(180):
if laser_valid[i] < 0.12 or laser_valid[i] > 12:
continue
# data_with_order.append([math.radians(i-90) - degree,data[i]]) #新坐标系里面的偏转角(弧度)+ 距离
data_with_order_x.append(laser_valid[i] * math.cos(math.radians(i - 90)))
data_with_order_y.append(laser_valid[i] * math.sin(math.radians(i - 90))) # x y 轴
validist_store.append(laser_valid[i])
len_filtered = len(data_with_order_x)
# print('len_filtered',len_filtered)
# print(data_with_order_x)
idx = -1
i = 60
while i < len_filtered - 10:
cnt = 0
for k in range(i + 1, i + 9):
if data_with_order_x[k] > data_with_order_x[k - 1]:
cnt += 1
else:
break
if cnt > 6:
idx = i
break
i += 1
###################读出拐角点的值
point_sharp_x = data_with_order_x[idx]
point_sharp_y = data_with_order_y[idx]
print('idx pt_sharp_x', idx, point_sharp_x)
print('idx pt_sharp_y', idx, point_sharp_y)
time.sleep(1)
#####根据拐角点矫正方位先在y轴上矫正
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0, 0.1 if (point_sharp_y - y_sharp_dist) > 0 else -0.1, 0]
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int(const_int2 * abs(y_sharp_dist - point_sharp_y))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(5)
####再矫正x轴
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [-0.1 if (point_sharp_x - x_sharp_dist) < 0 else 0.1, 0, 0]
msg.rpy_des = [0, 0, 0]
# Zero duration means continuous motion until a new command is used.
msg.duration = int(const_int2 * abs(point_sharp_x - x_sharp_dist))
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(5)
#####无限进行循环
msg.mode = 11 # Locomotion
msg.gait_id = 26 # TROT_FAST:10 TROT_MEDIUM:3 TROT_SLOW:27 自变频:26
msg.vel_des = [0.165, 0, 0.3] # 转向
msg.duration = 0
# Continuous motion can interrupt non-zero duration interpolation motion
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
self.Send_cmd(msg)
time.sleep(60)
def pass_cloth(Ctrl ,msg ,pos = -1):
Ctrl.Recovery_stand(msg)
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3000
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
if pos < 0:
print(RGB_res)
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
pos = 1
else:
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
pos = -1
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3333
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
if pos < 0:
print(RGB_res)
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
pos = 1
else:
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
pos = -1
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3333
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
if pos < 0:
print(RGB_res)
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
pos = 1
else:
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
pos = -1
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3333
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
if pos < 0:
print(RGB_res)
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
pos = 1
else:
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
pos = -1
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3333
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
if pos < 0:
print(RGB_res)
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
pos = 1
else:
if RGB_res > 0:
Ctrl.rightside_align(msg=msg,expect_dist=0.4)
else:
Ctrl.rightside_align(msg=msg,expect_dist=0.9)
pos = -1
msg.mode = 11 # Locomotion
msg.gait_id = 27
msg.vel_des = [0.3, 0, 0]
msg.duration = 3333
msg.step_height = [0.06, 0.06] # ground clearness of swing leg
msg.life_count += 1
Ctrl.Send_cmd(msg)
time.sleep(10)
# Main function
if __name__ == '__main__':
main()