Update 821
0
.vscode/settings.json
vendored
Normal file → Executable file
44
main.py
@ -24,7 +24,7 @@ from utils.base_msg import BaseMsg
|
||||
from utils.speech_demo import speak
|
||||
# from utils.marker_client import MarkerRunner
|
||||
|
||||
from task_1.task_1 import run_task_1, run_task_1_back
|
||||
from task_1.task_1 import run_task_1 # , run_task_1_back
|
||||
from task_2.task_2 import run_task_2, run_task_2_back, run_task_2_demo
|
||||
from task_2_5.task_2_5 import run_task_2_5, run_task_2_5_back
|
||||
from task_3.task_3 import run_task_3, run_task_3_back
|
||||
@ -33,7 +33,7 @@ from task_5.task_5 import run_task_5
|
||||
from base_move.turn_degree import turn_degree_v2
|
||||
from base_move.go_to_xy import go_to_x_v2, go_to_y_v2
|
||||
from utils.log_helper import info
|
||||
|
||||
from utils.speech_demo import speak
|
||||
pass_marker = True
|
||||
TIME_SLEEP = 3000 # TODO 比赛时改成 5000
|
||||
|
||||
@ -50,7 +50,8 @@ class TaskType(Enum):
|
||||
MOVE_TO_LINE = auto() # TODO 直走逼近直线测试
|
||||
CENTER_ON_DUAL_TRACKS = auto() # TODO 双轨道居中测试
|
||||
|
||||
TASK = ''
|
||||
TASK = TaskType.TASK
|
||||
|
||||
def main():
|
||||
rclpy.init() # 新增:在主程序中统一初始化 ROS 2 上下文
|
||||
Ctrl = Robot_Ctrl()
|
||||
@ -91,7 +92,7 @@ def main():
|
||||
pass_bar(Ctrl, msg)
|
||||
elif TASK == TaskType.YELLOW_LIGHT: # TODO image
|
||||
from task_3.task_3 import go_until_yellow_area
|
||||
turn_degree_v2(Ctrl, msg, degree=-90, absolute=True)
|
||||
# turn_degree_v2(Ctrl, msg, degree=-90, absolute=True)
|
||||
go_until_yellow_area(Ctrl, msg)
|
||||
elif TASK == TaskType.RED_BAR:
|
||||
from task_4.task_4 import go_straight_until_red_bar
|
||||
@ -101,7 +102,7 @@ def main():
|
||||
go_straight_with_enhanced_calibration(Ctrl, msg, distance = 5, speed=0.5, observe=False, mode=11, gait_id=3, step_height=[0.21, 0.21])
|
||||
elif TASK == TaskType.STONE_ROAD:
|
||||
from task_3.task_3 import pass_stone
|
||||
pass_stone(Ctrl, msg, distance = 4, observe=False)
|
||||
pass_stone(Ctrl, msg, distance = 4.5, observe=False)
|
||||
elif TASK == TaskType.MOVE_TO_LINE:
|
||||
from base_move.move_base_hori_line import move_to_hori_line
|
||||
move_to_hori_line(Ctrl, msg, target_distance = 1.1, observe=False)
|
||||
@ -109,25 +110,28 @@ def main():
|
||||
from base_move.center_on_dual_tracks import center_on_dual_tracks
|
||||
center_on_dual_tracks(Ctrl, msg, max_deviation=10.0, observe=False, detect_height=0.3)
|
||||
else:
|
||||
pass
|
||||
return
|
||||
|
||||
# if TASK != TaskType.TASK:
|
||||
# # 如果不是 task 类型,直接返回
|
||||
# return
|
||||
if TASK != TaskType.TASK:
|
||||
# 如果不是 task 类型,直接返回
|
||||
return
|
||||
|
||||
# TAG task - 1
|
||||
#run_task_1(Ctrl, msg, t
|
||||
# ime_sleep=TIME_SLEEP)
|
||||
#TAG task - 1
|
||||
# run_task_1(Ctrl, msg, time_sleep=TIME_SLEEP)
|
||||
|
||||
|
||||
# TAG task - 2
|
||||
# arrow_direction='left'
|
||||
# arrow_direction = run_task_2_demo(Ctrl, msg, xy_flag=False) # TEST
|
||||
# arrow_direction = run_task_2(Ctrl, msg, xy_flag=False)
|
||||
# arrow_direction='left'
|
||||
# print('🏹 arrow_direction: ', arrow_direction)
|
||||
|
||||
arrow_direction='left'
|
||||
|
||||
|
||||
# if(arrow_direction=='left'): speak("左侧路线")
|
||||
# else: speak("右侧路线")
|
||||
|
||||
#TAG task - 2.5
|
||||
# run_task_2_5(Ctrl, msg, direction=arrow_direction)
|
||||
|
||||
@ -141,19 +145,19 @@ def main():
|
||||
|
||||
# TAG task - 5
|
||||
# turn_degree_v2(Ctrl, msg, degree=90, absolute=True)
|
||||
# success, qr=run_task_5(Ctrl, msg, direction=arrow_direction, observe=True, time_sleep=TIME_SLEEP) # B区任务
|
||||
success, qr=run_task_5(Ctrl, msg, direction=arrow_direction, observe=True, time_sleep=TIME_SLEEP) # B区任务
|
||||
# print(success)
|
||||
# print(qr)
|
||||
|
||||
# TAG task - 3 / 4 - part II
|
||||
if arrow_direction == 'left':
|
||||
run_task_4_back(Ctrl, msg)
|
||||
else:
|
||||
run_task_3_back(Ctrl, msg)
|
||||
# if arrow_direction == 'left':
|
||||
# run_task_3_back(Ctrl, msg)
|
||||
# else:
|
||||
# run_task_4_back(Ctrl, msg)
|
||||
|
||||
|
||||
#TAG task - 2.5 - back
|
||||
run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
|
||||
# #TAG task - 2.5 - back
|
||||
# run_task_2_5_back(Ctrl, msg, direction=arrow_direction)
|
||||
|
||||
|
||||
# TAG task - 2 - back
|
||||
|
||||
|
Before Width: | Height: | Size: 71 KiB |
|
Before Width: | Height: | Size: 72 KiB |
|
Before Width: | Height: | Size: 63 KiB |
|
Before Width: | Height: | Size: 77 KiB |
|
Before Width: | Height: | Size: 66 KiB |
|
Before Width: | Height: | Size: 71 KiB |
|
Before Width: | Height: | Size: 58 KiB |
|
Before Width: | Height: | Size: 77 KiB |
|
Before Width: | Height: | Size: 56 KiB |
|
Before Width: | Height: | Size: 56 KiB |
@ -16,7 +16,7 @@ from base_move.turn_degree import turn_degree, turn_degree_twice, turn_degree_v2
|
||||
from base_move.center_on_dual_tracks import center_on_dual_tracks
|
||||
from base_move.go_to_xy import go_to_xy
|
||||
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
||||
|
||||
from utils.speech_demo import speak
|
||||
# 创建本模块特定的日志记录器
|
||||
logger = get_logger("任务1")
|
||||
|
||||
@ -60,6 +60,10 @@ def run_task_1(ctrl, msg, time_sleep=5000):
|
||||
|
||||
section('任务1-3:转弯', "旋转")
|
||||
direction = False if res.get('qr_result') == 'A-1' else True # TODO 需要检查一下,这个方向是否正确
|
||||
|
||||
if direction == False :speak("A 区库位 1")
|
||||
else:speak("A 区库位 2")
|
||||
|
||||
turn_success, res = arc_turn_around_hori_line(
|
||||
ctrl=ctrl,
|
||||
msg=msg,
|
||||
@ -112,91 +116,4 @@ def run_task_1(ctrl, msg, time_sleep=5000):
|
||||
# add
|
||||
go_straight(ctrl, msg, distance=0.3, observe=observe)
|
||||
|
||||
section('任务1-7:90度转弯', "旋转")
|
||||
radius = res['radius'] * 2 + 0.1
|
||||
info(f"任务1-7: 转弯半径: {radius}", "信息")
|
||||
turn_success, res = arc_turn_around_hori_line(
|
||||
ctrl=ctrl,
|
||||
msg=msg,
|
||||
radius=radius,
|
||||
angle_deg=85 if direction else -85,
|
||||
#
|
||||
pass_align=True,
|
||||
observe=observe,
|
||||
no_end_reset=True,
|
||||
)
|
||||
|
||||
section('任务1-8:直线移动', "移动")
|
||||
move_to_hori_line(ctrl, msg, target_distance=0.3, observe=observe)
|
||||
|
||||
section('任务1-9:90度旋转', "旋转")
|
||||
turn_degree_v2(ctrl, msg, degree=0, absolute=True, precision=True)
|
||||
|
||||
section('任务1-10: y校准,准备 task-2', "移动")
|
||||
# TODO
|
||||
|
||||
success("任务1完成", "完成")
|
||||
|
||||
|
||||
def run_task_1_back(ctrl, msg, time_sleep=5000):
|
||||
section('任务1-11: 返回', "移动")
|
||||
go_straight(ctrl, msg, distance=0.2, observe=observe)
|
||||
turn_degree_v2(ctrl, msg, -90, absolute=True) # 不确定 odo 效果如何;
|
||||
|
||||
section('任务1-11: 直线移动', "移动")
|
||||
move_to_hori_line(ctrl, msg, target_distance=0.2, observe=observe)
|
||||
|
||||
section('任务1-12: 180度旋转', "旋转")
|
||||
turn_success, res = arc_turn_around_hori_line(
|
||||
ctrl=ctrl,
|
||||
msg=msg,
|
||||
angle_deg=170 if direction else -170,
|
||||
target_distance=0.6,
|
||||
min_radius=0.3,
|
||||
max_radius=0.4,
|
||||
pass_align=True,
|
||||
observe=observe,
|
||||
no_end_reset=True,
|
||||
)
|
||||
|
||||
turn_degree_v2(ctrl, msg, degree=90, absolute=True)
|
||||
|
||||
section('任务1-13: 直线移动', "移动")
|
||||
move_distance = 0.5
|
||||
go_straight(ctrl, msg, distance=move_distance, observe=observe)
|
||||
|
||||
section('任务1-14: 模拟装货', "停止")
|
||||
info('机器人躺下,模拟装货过程', "信息")
|
||||
start_time = time.time()
|
||||
ctrl.base_msg.lie_down(wait_time=time_sleep)
|
||||
elapsed = time.time() - start_time
|
||||
timing("装货过程", elapsed)
|
||||
|
||||
section('任务1-15: 站起来', "移动")
|
||||
ctrl.base_msg.stand_up()
|
||||
|
||||
section('任务1-16: 返回', "移动")
|
||||
go_straight(ctrl, msg, distance=-(move_distance + res['radius']), observe=observe)
|
||||
|
||||
turn_degree_v2(ctrl, msg, degree=179, absolute=True)
|
||||
|
||||
section('任务1-17: 90度转弯', "旋转")
|
||||
turn_success, res = arc_turn_around_hori_line(
|
||||
ctrl=ctrl,
|
||||
msg=msg,
|
||||
angle_deg=-85 if direction else 85,
|
||||
radius=res['radius'] * 2,
|
||||
pass_align=True,
|
||||
observe=observe,
|
||||
)
|
||||
|
||||
section('任务1-18: 直线移动', "移动")
|
||||
move_to_hori_line(ctrl, msg, target_distance=0.4, observe=observe)
|
||||
|
||||
section('任务1-19: 90度旋转', "旋转")
|
||||
turn_degree_v2(ctrl, msg, degree=0, absolute=True)
|
||||
|
||||
go_straight(ctrl, msg, distance=-1.3, observe=observe)
|
||||
# go_to_xy(ctrl, msg, target_x=-0.2, target_y=0, observe=observe) # TEST
|
||||
|
||||
success("任务1-back完成", "完成")
|
||||
# section
|
||||
0
task_2/Gait_Def_crawl.toml
Normal file → Executable file
0
task_2/Gait_Params_crawl.toml
Normal file → Executable file
0
task_2/README_crawl_gait.md
Normal file → Executable file
0
task_2/crawl_gait.py
Normal file → Executable file
0
task_2/file_send_lcmt.py
Normal file → Executable file
0
task_2/robot_control_cmd_lcmt.py
Normal file → Executable file
0
task_2/test_crawl_integration.py
Normal file → Executable file
@ -22,13 +22,12 @@ from base_move.center_on_dual_tracks import center_on_dual_tracks
|
||||
from file_send_lcmt import file_send_lcmt
|
||||
from utils.yellow_area_analyzer import analyze_yellow_area_ratio
|
||||
from utils.detect_dual_track_lines import detect_dual_track_lines
|
||||
|
||||
from utils.speech_demo import speak
|
||||
# 创建本模块特定的日志记录器
|
||||
logger = get_logger("任务3")
|
||||
|
||||
observe = True
|
||||
|
||||
YELLOW_RATIO_THRESHOLD = 0.15 # TODO 黄色区域比例阈值
|
||||
YELLOW_RATIO_THRESHOLD = 0.03 # TODO 黄色区域比例阈值
|
||||
|
||||
def run_task_3(ctrl, msg, time_sleep=5000):
|
||||
section('任务3:上下坡', "启动")
|
||||
@ -55,8 +54,19 @@ def run_task_3(ctrl, msg, time_sleep=5000):
|
||||
msg.life_count += 1
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
speak("检测到黄灯")
|
||||
info("开始原地站立3秒", "站立")
|
||||
speak('5')
|
||||
time.sleep(time_sleep / 1000)
|
||||
speak('4')
|
||||
time.sleep(time_sleep / 1000)
|
||||
speak('3')
|
||||
time.sleep(time_sleep / 1000)
|
||||
speak('2')
|
||||
time.sleep(time_sleep / 1000)
|
||||
speak('1')
|
||||
time.sleep(time_sleep / 1000)
|
||||
|
||||
info("完成原地站立", "站立")
|
||||
|
||||
|
||||
@ -76,7 +86,9 @@ def run_task_3_back(ctrl, msg, time_sleep=5000):
|
||||
ctrl.Send_cmd(msg)
|
||||
|
||||
info("开始原地站立3秒", "站立")
|
||||
time.sleep(time_sleep / 1000)
|
||||
speak()
|
||||
time.sleep(time_sleep / 5000)
|
||||
|
||||
info("完成原地站立", "站立")
|
||||
|
||||
section('任务3-3:up and down', "开始")
|
||||
@ -222,6 +234,8 @@ def go_until_yellow_area(ctrl, msg, yellow_ratio_threshold=0.15, speed=0.3, max_
|
||||
dy = final_position[1] - start_position[1]
|
||||
final_distance = math.sqrt(dx*dx + dy*dy)
|
||||
|
||||
|
||||
|
||||
if observe:
|
||||
if yellow_decreasing:
|
||||
success(f"成功检测到黄色区域峰值并停止,峰值占比: {max_yellow_ratio:.2%}, 总移动距离: {final_distance:.2f}米", "完成")
|
||||
|
||||
@ -24,8 +24,8 @@ logger = get_logger("任务4")
|
||||
|
||||
observe = True
|
||||
|
||||
STONE_DISTANCE = 4.0 # TODO 距离参数需要微调
|
||||
RED_RATIO_THRESHOLD = 0.35 # TODO 红色区域比例阈值需要微调
|
||||
STONE_DISTANCE = 4.5 # TODO 距离参数需要微调
|
||||
RED_RATIO_THRESHOLD = 0.18 # TODO 红色区域比例阈值需要微调
|
||||
|
||||
def run_task_4(ctrl, msg):
|
||||
info('开始执行任务4...', "启动")
|
||||
@ -59,7 +59,7 @@ def run_task_4_back(ctrl, msg):
|
||||
|
||||
turn_degree_v2(ctrl, msg, degree=-90, absolute=True)
|
||||
|
||||
section('任务4-1:移动直到灰色天空比例低于阈值', "天空检测")
|
||||
section('任务4-1:移动直到红色天空比例低于阈值', "天空检测")
|
||||
go_straight_until_red_bar(ctrl, msg, red_ratio_threshold=RED_RATIO_THRESHOLD, speed=0.2)
|
||||
|
||||
section('任务4-2:通过栏杆', "移动")
|
||||
|
||||
@ -17,6 +17,7 @@ from base_move.move_base_hori_line import (
|
||||
calculate_distance_to_line, move_to_hori_line, arc_turn_around_hori_line
|
||||
)
|
||||
from base_move.center_on_dual_tracks import center_on_dual_tracks
|
||||
from utils.speech_demo import speak
|
||||
# from base_move.follow_dual_tracks import follow_dual_tracks
|
||||
|
||||
SLEEP_TIME = 3000
|
||||
@ -229,6 +230,9 @@ def run_task_5(ctrl, msg, direction='left', observe=False, time_sleep=5000):
|
||||
else:
|
||||
error("未能成功到达横线前指定距离", "失败")
|
||||
|
||||
if res['qr_result'] == 'B-2':speak("B 区库位 2")
|
||||
else:speak("B 区库位 1")
|
||||
|
||||
section('任务5-2:移动到卸货点', "移动")
|
||||
center_on_dual_tracks(ctrl, msg, max_deviation=10.0, observe=False)
|
||||
if direction == 'right' and res['qr_result'] == 'B-2' or direction == 'left' and res['qr_result'] == 'B-1':
|
||||
|
||||
0
test/demo_test/auredemo.py
Normal file → Executable file
0
test/demo_test/qr_detect_demo.py
Normal file → Executable file
0
test/demo_test/rgb_camera_demo.py
Normal file → Executable file
|
Before Width: | Height: | Size: 60 KiB |
|
Before Width: | Height: | Size: 96 KiB |
|
Before Width: | Height: | Size: 98 KiB |
|
Before Width: | Height: | Size: 87 KiB |
|
Before Width: | Height: | Size: 4.5 KiB |
|
Before Width: | Height: | Size: 4.5 KiB |
0
test_ai_camera_integration.py
Normal file → Executable file
0
tmp/auredemo2.py
Normal file → Executable file
0
tmp/interdemo3.py
Normal file → Executable file
0
tmp/interdemo4.py
Normal file → Executable file
0
tmp/new.py
Normal file → Executable file
0
tmp/shoushi.py
Normal file → Executable file
0
tmp/turndemo.py
Normal file → Executable file
0
turndemo.py
Normal file → Executable file
0
utils/fisheye.py
Normal file → Executable file
@ -199,7 +199,7 @@ class ImageSubscriber(Node):
|
||||
try:
|
||||
self.cv_image_rgb = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
if time.time() - self.last_save_time['rgb'] >= self.save_interval:
|
||||
# self.save_image(self.cv_image_rgb, 'rgb')
|
||||
self.save_image(self.cv_image_rgb, 'rgb')
|
||||
self.last_save_time['rgb'] = time.time()
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"RGB图像处理错误: {str(e)}")
|
||||
@ -226,7 +226,7 @@ class ImageSubscriber(Node):
|
||||
try:
|
||||
self.cv_image_ai = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
if time.time() - self.last_save_time['ai'] >= self.save_interval:
|
||||
# self.save_image(self.cv_image_ai, 'ai')
|
||||
self.save_image(self.cv_image_ai, 'ai')
|
||||
self.last_save_time['ai'] = time.time()
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"ai图像处理错误: {str(e)}")
|
||||
|
||||
267
utils/linetrack.py
Executable file
@ -0,0 +1,267 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from cv_bridge import CvBridge
|
||||
from sensor_msgs.msg import Image
|
||||
from std_msgs.msg import Float32
|
||||
import cv2
|
||||
import numpy as np
|
||||
from message_filters import ApproximateTimeSynchronizer, Subscriber
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy
|
||||
|
||||
class BirdseyeNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('birdseye_node')
|
||||
|
||||
self.bridge = CvBridge()
|
||||
|
||||
# 左右鱼眼订阅
|
||||
self.left_sub = Subscriber(self, Image, '/image_left')
|
||||
self.right_sub = Subscriber(self, Image, '/image_right')
|
||||
self.ts = ApproximateTimeSynchronizer(
|
||||
[self.left_sub, self.right_sub],
|
||||
queue_size=10,
|
||||
slop=0.5
|
||||
)
|
||||
self.ts.registerCallback(self.callback)
|
||||
qos = QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT)
|
||||
|
||||
self.proc_pub_left = self.create_publisher(Image, '/image_proc_left', 10)
|
||||
self.proc_pub_right = self.create_publisher(Image, '/image_proc_right', 10)
|
||||
self.trans_pub_left = self.create_publisher(Image, '/image_tran_left', 10)
|
||||
self.trans_pub_right = self.create_publisher(Image, '/image_tran_right', 10)
|
||||
self.line_error_pub = self.create_publisher(Float32, '/line_error', qos)
|
||||
self.last_slope = 0.0
|
||||
# 前置RGB订阅
|
||||
self.rgb_sub = self.create_subscription(Image, '/image_rgb', self.rgb_callback, 10)
|
||||
self.proc_pub_rgb = self.create_publisher(Image, '/image_proc_rgb', 10)
|
||||
self.line_distance_pub = self.create_publisher(Float32, '/line_distance', qos)
|
||||
|
||||
# 相机参数
|
||||
fx = 215.0699086206705
|
||||
fy = 215.0699086206705
|
||||
cx = 250.6595680010422
|
||||
cy = 197.9387845612447
|
||||
self.K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])
|
||||
self.D = np.array([-2.803862613372903e-04, -7.223158724979862e-06,
|
||||
1.437534138630982e-08, 0.0])
|
||||
self.dim = (500, 400)
|
||||
new_K = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify(
|
||||
self.K, self.D, self.dim, np.eye(3), balance=0.24
|
||||
)
|
||||
new_K[0, 2], new_K[1, 2] = self.dim[0] / 2, self.dim[1] / 2
|
||||
self.map1_left, self.map2_left = cv2.fisheye.initUndistortRectifyMap(
|
||||
self.K, self.D, np.eye(3), new_K, self.dim, cv2.CV_16SC2
|
||||
)
|
||||
self.map1_right, self.map2_right = cv2.fisheye.initUndistortRectifyMap(
|
||||
self.K, self.D, np.eye(3), new_K, self.dim, cv2.CV_16SC2
|
||||
)
|
||||
|
||||
self.get_logger().info("鱼眼去畸变节点启动")
|
||||
|
||||
self.filtered_slope_left = None
|
||||
self.filtered_slope_right = None
|
||||
self.alpha = 1.0
|
||||
self.jump_threshold = 0.05
|
||||
self.line_error = 0.0 # 保存 line_error 供 distance 调整使用
|
||||
|
||||
def adaptive_ema(self, new_val, prev_filtered):
|
||||
if prev_filtered is None:
|
||||
return new_val
|
||||
if abs(new_val - prev_filtered) > self.jump_threshold:
|
||||
return new_val
|
||||
return self.alpha * new_val + (1 - self.alpha) * prev_filtered
|
||||
|
||||
def detect_slope(self, img, side='left', roi_h=180, roi_w=200, y_start=None):
|
||||
"""黄线检测 + 底部边界斜率计算"""
|
||||
h, w = img.shape[:2]
|
||||
|
||||
# ROI纵向范围
|
||||
if y_start is None:
|
||||
y1 = h - roi_h
|
||||
else:
|
||||
y1 = y_start
|
||||
y2 = y1 + roi_h
|
||||
|
||||
# ROI横向范围
|
||||
x_center = w // 2
|
||||
x1, x2 = x_center - roi_w // 2, x_center + roi_w // 2
|
||||
|
||||
roi = img[y1:y2, x1:x2]
|
||||
x_offset, y_offset = x1, y1
|
||||
|
||||
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# 黄色范围
|
||||
if side == 'rgb':
|
||||
lower_yellow = np.array([20, 80, 60])
|
||||
upper_yellow = np.array([45, 255, 255])
|
||||
else:
|
||||
lower_yellow = np.array([25, 100, 60])
|
||||
upper_yellow = np.array([45, 255, 255])
|
||||
|
||||
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
kernel = np.ones((5, 5), np.uint8)
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
_, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if not contours:
|
||||
return None, img.copy(), None
|
||||
|
||||
largest = max(contours, key=cv2.contourArea)
|
||||
if cv2.contourArea(largest) < 30:
|
||||
return None, img.copy(), None
|
||||
|
||||
contour_shifted = largest + np.array([[x_offset, y_offset]])
|
||||
all_pts = contour_shifted.reshape(-1, 2)
|
||||
|
||||
hsv_full = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
|
||||
black_mask = cv2.inRange(hsv_full, np.array([0, 0, 0]), np.array([180, 60, 160]))
|
||||
|
||||
bottom_edge_pts = []
|
||||
x_min, x_max = all_pts[:, 0].min(), all_pts[:, 0].max()
|
||||
|
||||
for x in range(x_min, x_max + 1):
|
||||
col_pts = all_pts[all_pts[:, 0] == x]
|
||||
if len(col_pts) > 0:
|
||||
y_bottom = col_pts[:, 1].max()
|
||||
check_y = min(y_bottom + 1, h - 1)
|
||||
if black_mask[check_y, x] > 0:
|
||||
bottom_edge_pts.append([x, y_bottom])
|
||||
|
||||
bottom_edge_pts = np.array(bottom_edge_pts)
|
||||
if len(bottom_edge_pts) < 2:
|
||||
return None, img.copy(), None
|
||||
|
||||
m, b = np.polyfit(bottom_edge_pts[:, 0], bottom_edge_pts[:, 1], 1)
|
||||
m_corrected = -m
|
||||
|
||||
img_with_line = img.copy()
|
||||
x1_line, x2_line = bottom_edge_pts[:, 0].min(), bottom_edge_pts[:, 0].max()
|
||||
y1_line, y2_line = m * x1_line + b, m * x2_line + b
|
||||
|
||||
cv2.line(img_with_line, (int(x1_line), int(y1_line)),
|
||||
(int(x2_line), int(y2_line)), (0, 0, 255), 2)
|
||||
cv2.drawContours(img_with_line, [contour_shifted], -1, (255, 0, 0), 2)
|
||||
|
||||
for px, py in bottom_edge_pts:
|
||||
cv2.circle(img_with_line, (int(px), int(py)), 3, (255, 0, 255), -1)
|
||||
|
||||
return m_corrected, img_with_line, bottom_edge_pts
|
||||
|
||||
def rgb_callback(self, msg):
|
||||
"""RGB前置摄像头处理"""
|
||||
try:
|
||||
img = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
h, w = img.shape[:2]
|
||||
|
||||
y_start = h - 180
|
||||
roi_h = 180
|
||||
roi_w = w
|
||||
|
||||
slope, proc_img, _ = self.detect_slope(
|
||||
img, side='rgb', roi_h=roi_h, roi_w=roi_w, y_start=y_start
|
||||
)
|
||||
|
||||
if slope == 0.0 or slope is None:
|
||||
self.get_logger().info(f"[RGB] 使用上一帧斜率: {self.last_slope:.4f}")
|
||||
slope = self.last_slope
|
||||
self.last_slope = slope
|
||||
self.proc_pub_rgb.publish(self.bridge.cv2_to_imgmsg(proc_img, 'bgr8'))
|
||||
self.rgb_slope = slope
|
||||
|
||||
self.get_logger().info(f"[RGB] 计算斜率: {slope:.4f}")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"[RGB] 处理失败: {e}")
|
||||
self.rgb_slope = 0.0
|
||||
|
||||
def callback(self, left_msg, right_msg):
|
||||
"""左右鱼眼主回调"""
|
||||
try:
|
||||
left_img = self.bridge.imgmsg_to_cv2(left_msg, 'bgr8')
|
||||
right_img = self.bridge.imgmsg_to_cv2(right_msg, 'bgr8')
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"[步骤1] 转换图像失败: {e}")
|
||||
return
|
||||
|
||||
try:
|
||||
left_undistort = cv2.remap(
|
||||
left_img, self.map1_left, self.map2_left,
|
||||
cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT
|
||||
)
|
||||
right_undistort = cv2.remap(
|
||||
right_img, self.map1_right, self.map2_right,
|
||||
cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT
|
||||
)
|
||||
right_undistort = cv2.resize(right_undistort, (left_undistort.shape[1], left_undistort.shape[0]))
|
||||
|
||||
self.trans_pub_left.publish(self.bridge.cv2_to_imgmsg(left_undistort, 'bgr8'))
|
||||
self.trans_pub_right.publish(self.bridge.cv2_to_imgmsg(right_undistort, 'bgr8'))
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"[步骤2] 去畸变或发布失败: {e}")
|
||||
return
|
||||
|
||||
try:
|
||||
h, w = left_undistort.shape[:2]
|
||||
roi_w, roi_h = 80, 240
|
||||
|
||||
slope_left, left_proc_img, left_pts = self.detect_slope(left_undistort, side='left', roi_h=roi_h, roi_w=roi_w)
|
||||
slope_right, right_proc_img, right_pts = self.detect_slope(right_undistort, side='right', roi_h=roi_h, roi_w=roi_w)
|
||||
|
||||
distance_left = np.mean(left_pts[:, 1]) if left_pts is not None else 0.0
|
||||
distance_right = np.mean(right_pts[:, 1]) if right_pts is not None else 0.0
|
||||
line_distance = distance_left - distance_right
|
||||
|
||||
# 如果 line_error 和 line_distance 同号,则 distance * 1.5
|
||||
if (self.line_error > 0 and line_distance > 0) or (self.line_error < 0 and line_distance < 0):
|
||||
line_distance *= 1.5
|
||||
|
||||
if left_pts is not None:
|
||||
for px, py in left_pts:
|
||||
cv2.line(left_proc_img, (px, py), (px, int(h)), (0, 0, 255), 1)
|
||||
if right_pts is not None:
|
||||
for px, py in right_pts:
|
||||
cv2.line(right_proc_img, (px, py), (px, int(h)), (0, 0, 255), 1)
|
||||
|
||||
self.line_distance_pub.publish(Float32(data=float(line_distance)))
|
||||
self.get_logger().info(f"[FishEye] 左右底部距离: {distance_left:.2f}, {distance_right:.2f}, line_distance: {line_distance:.2f}")
|
||||
|
||||
self.proc_pub_left.publish(self.bridge.cv2_to_imgmsg(left_proc_img, 'bgr8'))
|
||||
self.proc_pub_right.publish(self.bridge.cv2_to_imgmsg(right_proc_img, 'bgr8'))
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"[步骤3] 鱼眼处理失败: {e}")
|
||||
return
|
||||
|
||||
try:
|
||||
rgb_slope = getattr(self, 'rgb_slope', 0.0)
|
||||
if rgb_slope > 0:
|
||||
self.line_error = 6.0
|
||||
elif rgb_slope < 0:
|
||||
self.line_error = -10.0
|
||||
else:
|
||||
self.line_error = 0.0
|
||||
|
||||
self.get_logger().info(f"RGB斜率(line_error): {self.line_error:.4f}")
|
||||
self.line_error_pub.publish(Float32(data=float(self.line_error)))
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"[步骤4] 发布line_error失败: {e}")
|
||||
return
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = BirdseyeNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
Before Width: | Height: | Size: 98 KiB |
|
Before Width: | Height: | Size: 92 KiB |
284
utils/shoushi_demo.py
Executable file
@ -0,0 +1,284 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from protocol.srv import GestureActionControl
|
||||
from protocol.msg import GestureActionResult
|
||||
from std_srvs.srv import Trigger
|
||||
import time
|
||||
import sys
|
||||
|
||||
class GestureControlNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('gesture_control_node')
|
||||
|
||||
# 当前状态变量
|
||||
self.current_state = "waiting_for_loading" # 可能的状态: waiting_for_loading, loading, transporting, waiting_for_unloading, unloading
|
||||
|
||||
# 创建手势控制服务客户端
|
||||
self.gesture_control_cli = self.create_client(GestureActionControl, '/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_control')
|
||||
|
||||
# 创建手势识别结果订阅
|
||||
self.gesture_sub = self.create_subscription(
|
||||
GestureActionResult,
|
||||
'/mi_desktop_48_b0_2d_7b_05_1d/gesture_action_msg',
|
||||
self.gesture_callback,
|
||||
10)
|
||||
|
||||
# 假设存在运输开始和完成的服务
|
||||
self.start_transport_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/start_transport')
|
||||
self.complete_unloading_cli = self.create_client(Trigger, '/mi_desktop_48_b0_2d_7b_05_1d/complete_unloading')
|
||||
|
||||
# 定时器用于状态检查
|
||||
self.timer = self.create_timer(1.0, self.timer_callback)
|
||||
|
||||
self.get_logger().info("手势控制节点已启动")
|
||||
|
||||
# 手势识别超时时间(秒)
|
||||
self.gesture_timeout = 60
|
||||
|
||||
# 最后一次检测到手势的时间
|
||||
self.last_gesture_time = 0
|
||||
|
||||
# 手势识别是否激活的标志
|
||||
self.is_gesture_active = False
|
||||
|
||||
def activate_gesture_recognition(self):
|
||||
"""激活手势识别功能"""
|
||||
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
|
||||
self.get_logger().info('手势控制服务不可用,等待中...')
|
||||
|
||||
req = GestureActionControl.Request()
|
||||
req.command = GestureActionControl.Request.START_ALGO
|
||||
req.timeout = self.gesture_timeout
|
||||
|
||||
future = self.gesture_control_cli.call_async(req)
|
||||
future.add_done_callback(self.gesture_control_callback)
|
||||
|
||||
self.get_logger().info("已激活手势识别功能")
|
||||
self.is_gesture_active = True
|
||||
|
||||
def deactivate_gesture_recognition(self):
|
||||
"""关闭手势识别功能"""
|
||||
while not self.gesture_control_cli.wait_for_service(timeout_sec=1.0):
|
||||
self.get_logger().info('手势控制服务不可用,等待中...')
|
||||
|
||||
req = GestureActionControl.Request()
|
||||
req.command = GestureActionControl.Request.STOP_ALGO
|
||||
|
||||
future = self.gesture_control_cli.call_async(req)
|
||||
future.add_done_callback(self.gesture_control_callback)
|
||||
|
||||
self.get_logger().info("已关闭手势识别功能")
|
||||
self.is_gesture_active = False
|
||||
|
||||
def gesture_control_callback(self, future):
|
||||
"""手势控制服务调用回调"""
|
||||
try:
|
||||
response = future.result()
|
||||
if response.code == GestureActionControl.Response.RESULT_SUCCESS:
|
||||
self.get_logger().info("手势控制服务调用成功")
|
||||
else:
|
||||
self.get_logger().warn("手势控制服务繁忙")
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"手势控制服务调用失败: {e}")
|
||||
|
||||
def gesture_callback(self, msg):
|
||||
"""手势识别结果回调"""
|
||||
if not self.is_gesture_active:
|
||||
return
|
||||
|
||||
self.last_gesture_time = time.time()
|
||||
|
||||
# 手势映射
|
||||
gesture_names = {
|
||||
0: "无手势",
|
||||
1: "手掌拉近",
|
||||
2: "手掌推开",
|
||||
3: "手向上抬",
|
||||
4: "手向下压",
|
||||
5: "手向左推",
|
||||
6: "手向右推",
|
||||
7: "停止手势",
|
||||
8: "大拇指朝上",
|
||||
9: "张开手掌或手指",
|
||||
10: "闭合手掌或手指",
|
||||
11: "大拇指朝下"
|
||||
}
|
||||
|
||||
gesture_name = gesture_names.get(msg.id, "未知手势")
|
||||
self.get_logger().info(f"检测到手势: {gesture_name} (ID: {msg.id})")
|
||||
|
||||
# 根据当前状态和手势执行相应操作
|
||||
if self.current_state == "loading" and msg.id == 8: # 大拇指朝上表示完成配货
|
||||
self.complete_loading()
|
||||
elif self.current_state == "unloading" and msg.id == 8: # 大拇指朝上表示完成卸货
|
||||
self.complete_unloading()
|
||||
elif msg.id == 7: # 停止手势
|
||||
self.get_logger().info("检测到停止手势")
|
||||
|
||||
def complete_loading(self):
|
||||
"""完成配货操作"""
|
||||
self.get_logger().info("配货完成,开始运输")
|
||||
|
||||
# 调用开始运输服务
|
||||
if self.start_transport_cli.wait_for_service(timeout_sec=1.0):
|
||||
req = Trigger.Request()
|
||||
future = self.start_transport_cli.call_async(req)
|
||||
future.add_done_callback(self.transport_start_callback)
|
||||
else:
|
||||
self.get_logger().warn("开始运输服务不可用")
|
||||
|
||||
# 更新状态
|
||||
self.current_state = "transporting"
|
||||
|
||||
def transport_start_callback(self, future):
|
||||
"""运输开始服务回调"""
|
||||
try:
|
||||
response = future.result()
|
||||
if response.success:
|
||||
self.get_logger().info("运输已开始")
|
||||
else:
|
||||
self.get_logger().warn("运输启动失败")
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"运输服务调用失败: {e}")
|
||||
|
||||
def complete_unloading(self):
|
||||
"""完成卸货操作"""
|
||||
self.get_logger().info("卸货完成,准备新的配货")
|
||||
|
||||
# 调用完成卸货服务
|
||||
if self.complete_unloading_cli.wait_for_service(timeout_sec=1.0):
|
||||
req = Trigger.Request()
|
||||
future = self.complete_unloading_cli.call_async(req)
|
||||
future.add_done_callback(self.unloading_complete_callback)
|
||||
else:
|
||||
self.get_logger().warn("完成卸货服务不可用")
|
||||
|
||||
# 更新状态
|
||||
self.current_state = "waiting_for_loading"
|
||||
|
||||
def unloading_complete_callback(self, future):
|
||||
"""卸货完成服务回调"""
|
||||
try:
|
||||
response = future.result()
|
||||
if response.success:
|
||||
self.get_logger().info("卸货已完成确认")
|
||||
else:
|
||||
self.get_logger().warn("卸货完成确认失败")
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"卸货完成服务调用失败: {e}")
|
||||
|
||||
def timer_callback(self):
|
||||
"""定时器回调,用于状态检查和超时处理"""
|
||||
# 检查手势识别是否激活且超时
|
||||
if self.is_gesture_active and time.time() - self.last_gesture_time > self.gesture_timeout:
|
||||
self.get_logger().info("手势识别超时,重新激活")
|
||||
self.activate_gesture_recognition()
|
||||
self.last_gesture_time = time.time()
|
||||
|
||||
# 这里可以添加状态机逻辑,根据实际需求更新current_state
|
||||
# 例如,当机器狗到达配货区域时,设置self.current_state = "loading"
|
||||
# 当机器狗到达卸货区域时,设置self.current_state = "unloading"
|
||||
|
||||
def update_state(self, new_state):
|
||||
"""更新机器狗状态"""
|
||||
old_state = self.current_state
|
||||
self.current_state = new_state
|
||||
self.get_logger().info(f"状态更新: {old_state} -> {new_state}")
|
||||
|
||||
# 如果进入需要手势交互的状态,确保手势识别已激活
|
||||
if new_state in ["loading", "unloading"]:
|
||||
self.activate_gesture_recognition()
|
||||
|
||||
|
||||
def start_gesture_recognition(timeout=60):
|
||||
"""启动手势识别功能(供外部调用)"""
|
||||
rclpy.init()
|
||||
node = GestureControlNode()
|
||||
|
||||
try:
|
||||
# 设置超时时间
|
||||
node.gesture_timeout = timeout
|
||||
|
||||
# 激活手势识别
|
||||
node.activate_gesture_recognition()
|
||||
|
||||
# 保持节点运行
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
def stop_gesture_recognition():
|
||||
"""关闭手势识别功能(供外部调用)"""
|
||||
rclpy.init()
|
||||
node = GestureControlNode()
|
||||
|
||||
try:
|
||||
# 关闭手势识别
|
||||
node.deactivate_gesture_recognition()
|
||||
|
||||
# 短暂延迟确保完成
|
||||
time.sleep(2)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
# 检查命令行参数
|
||||
if len(sys.argv) > 1:
|
||||
command = sys.argv[1]
|
||||
|
||||
if command == "start":
|
||||
# 启动手势识别
|
||||
timeout = 60
|
||||
if len(sys.argv) > 2:
|
||||
try:
|
||||
timeout = int(sys.argv[2])
|
||||
except ValueError:
|
||||
print(f"无效的超时时间: {sys.argv[2]},使用默认值60秒")
|
||||
|
||||
print(f"启动手势识别,超时时间: {timeout}秒")
|
||||
start_gesture_recognition(timeout)
|
||||
return
|
||||
elif command == "stop":
|
||||
# 停止手势识别
|
||||
print("停止手势识别")
|
||||
stop_gesture_recognition()
|
||||
return
|
||||
elif command == "test":
|
||||
# 测试模式:启动手势识别,运行一段时间后停止
|
||||
print("测试模式:启动手势识别,5秒后停止")
|
||||
start_gesture_recognition(5)
|
||||
time.sleep(5)
|
||||
stop_gesture_recognition()
|
||||
return
|
||||
else:
|
||||
print(f"未知命令: {command}")
|
||||
print("可用命令: start [timeout], stop, test")
|
||||
return
|
||||
|
||||
# 如果没有参数,运行原始的主函数
|
||||
rclpy.init(args=args)
|
||||
gesture_control_node = GestureControlNode()
|
||||
|
||||
try:
|
||||
# 激活手势识别
|
||||
gesture_control_node.activate_gesture_recognition()
|
||||
|
||||
# 运行节点
|
||||
rclpy.spin(gesture_control_node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
gesture_control_node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||