mi-task/single_test/test_vision_functions.py

355 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉功能测试脚本
测试QR码扫描、箭头检测、横线检测等视觉相关功能
"""
import time
import sys
import os
import cv2
import numpy as np
# 添加父目录到路径以便能够导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection
from utils.gray_sky_analyzer import analyze_gray_sky_ratio
from utils.yellow_area_analyzer import analyze_yellow_area_ratio
from base_move.move_base_hori_line import (
detect_horizontal_track_edge, detect_horizontal_track_edge_v2, detect_horizontal_track_edge_v3,
calculate_distance_to_line
)
from base_move.go_straight import go_straight_with_qrcode
# 创建日志记录器
logger = get_logger("视觉功能测试")
def test_qr_code_scanning(ctrl, msg, observe=True):
"""测试QR码扫描功能"""
section('测试QR码扫描功能', "视觉测试")
info('开始测试QR码扫描功能...', "测试")
try:
# 启动异步QR码扫描
ctrl.image_processor.start_async_scan(interval=0.3)
info("已启动异步QR码扫描", "扫描")
# 持续扫描10秒
scan_duration = 10
start_time = time.time()
qr_results = []
info(f"开始扫描,持续{scan_duration}秒...", "扫描")
while time.time() - start_time < scan_duration:
# 获取当前扫描结果
qr_data, scan_time = ctrl.image_processor.get_last_qr_result()
if qr_data and scan_time > start_time:
if qr_data not in qr_results:
qr_results.append(qr_data)
success(f"扫描到QR码: {qr_data}", "扫描")
time.sleep(0.5)
# 停止扫描
ctrl.image_processor.stop_async_scan()
# 总结结果
if qr_results:
success(f"QR码扫描测试完成共扫描到{len(qr_results)}个QR码", "成功")
for i, qr_data in enumerate(qr_results, 1):
info(f"QR码{i}: {qr_data}", "结果")
else:
warning("QR码扫描测试完成但未扫描到任何QR码", "警告")
except Exception as e:
error(f"QR码扫描测试失败: {str(e)}", "失败")
finally:
# 确保停止扫描
try:
ctrl.image_processor.stop_async_scan()
except:
pass
def test_arrow_detection(ctrl, msg, observe=True):
"""测试箭头检测功能"""
section('测试:箭头检测功能', "视觉测试")
info('开始测试箭头检测功能...', "测试")
try:
# 连续检测箭头方向
detection_count = 20
left_count = 0
right_count = 0
unknown_count = 0
info(f"开始检测箭头方向,共检测{detection_count}次...", "检测")
for i in range(detection_count):
# 获取当前图像
img = ctrl.image_processor.get_current_image()
if img is not None:
# 检测箭头方向
direction = detect_arrow_direction(img)
if direction == "left":
left_count += 1
elif direction == "right":
right_count += 1
else:
unknown_count += 1
if observe:
info(f"检测{i+1}: {direction}", "检测")
# 保存检测结果的可视化图像每5次保存一次
if i % 5 == 0:
save_dir = "logs/image"
os.makedirs(save_dir, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_path = f"{save_dir}/arrow_detection_test_{timestamp}_{i}.jpg"
try:
visualize_arrow_detection(img, save_path=save_path)
info(f"箭头检测可视化结果已保存至: {save_path}", "保存")
except Exception as e:
warning(f"保存可视化结果失败: {str(e)}", "警告")
time.sleep(0.2)
# 总结结果
info(f"箭头检测统计结果:", "统计")
info(f" 左箭头: {left_count}", "统计")
info(f" 右箭头: {right_count}", "统计")
info(f" 未知/无箭头: {unknown_count}", "统计")
# 判断主要方向
if left_count > right_count:
final_direction = "left"
elif right_count > left_count:
final_direction = "right"
else:
final_direction = "unknown"
success(f"箭头检测测试完成,主要方向: {final_direction}", "成功")
except Exception as e:
error(f"箭头检测测试失败: {str(e)}", "失败")
def test_horizontal_line_detection(ctrl, msg, observe=True):
"""测试横线检测功能"""
section('测试:横线检测功能', "视觉测试")
info('开始测试横线检测功能...', "测试")
try:
# 相机高度
camera_height = 0.355 # 单位: 米
# 测试不同版本的横线检测函数
detection_functions = [
("v1", detect_horizontal_track_edge),
("v2", detect_horizontal_track_edge_v2),
("v3", detect_horizontal_track_edge_v3),
]
for version, detect_func in detection_functions:
info(f"测试横线检测函数 {version}...", "测试")
# 连续检测5次
detection_results = []
for i in range(5):
# 获取当前图像
image = ctrl.image_processor.get_current_image()
if image is not None:
# 检测横线
edge_point, edge_info = detect_func(image, observe=False, save_log=True)
if edge_point is not None and edge_info is not None:
# 计算到横线的距离
distance = calculate_distance_to_line(edge_info, camera_height, observe=False)
detection_results.append(distance)
info(f" 检测{i+1}: 横线距离 {distance:.3f}", "检测")
else:
info(f" 检测{i+1}: 未检测到横线", "检测")
time.sleep(0.5)
# 统计结果
if detection_results:
avg_distance = sum(detection_results) / len(detection_results)
min_distance = min(detection_results)
max_distance = max(detection_results)
info(f" {version} 统计结果: 平均距离 {avg_distance:.3f}米, 最小 {min_distance:.3f}米, 最大 {max_distance:.3f}", "统计")
success(f"横线检测函数 {version} 测试成功", "成功")
else:
warning(f"横线检测函数 {version} 未检测到任何横线", "警告")
success("横线检测功能测试完成", "成功")
except Exception as e:
error(f"横线检测测试失败: {str(e)}", "失败")
def test_sky_analysis(ctrl, msg, observe=True):
"""测试天空分析功能"""
section('测试:天空分析功能', "视觉测试")
info('开始测试天空分析功能...', "测试")
try:
# 连续分析天空比例
analysis_count = 10
sky_ratios = []
info(f"开始分析天空比例,共分析{analysis_count}次...", "分析")
for i in range(analysis_count):
# 获取当前图像
current_image = ctrl.image_processor.get_current_image()
if current_image is not None:
# 保存临时图像
temp_image_path = f"/tmp/sky_analysis_{i}.jpg"
cv2.imwrite(temp_image_path, current_image)
# 分析灰色天空比例
try:
sky_ratio = analyze_gray_sky_ratio(temp_image_path)
sky_ratios.append(sky_ratio)
info(f"分析{i+1}: 天空比例 {sky_ratio:.2%}", "分析")
except Exception as e:
warning(f"分析{i+1}失败: {str(e)}", "警告")
# 清理临时文件
try:
os.remove(temp_image_path)
except:
pass
time.sleep(0.5)
# 统计结果
if sky_ratios:
avg_ratio = sum(sky_ratios) / len(sky_ratios)
min_ratio = min(sky_ratios)
max_ratio = max(sky_ratios)
info(f"天空分析统计结果:", "统计")
info(f" 平均天空比例: {avg_ratio:.2%}", "统计")
info(f" 最小天空比例: {min_ratio:.2%}", "统计")
info(f" 最大天空比例: {max_ratio:.2%}", "统计")
success("天空分析功能测试完成", "成功")
else:
warning("天空分析功能测试完成,但未获得有效分析结果", "警告")
except Exception as e:
error(f"天空分析测试失败: {str(e)}", "失败")
def test_yellow_area_analysis(ctrl, msg, observe=True):
"""测试黄色区域分析功能"""
section('测试:黄色区域分析功能', "视觉测试")
info('开始测试黄色区域分析功能...', "测试")
try:
# 连续分析黄色区域比例
analysis_count = 10
yellow_ratios = []
info(f"开始分析黄色区域比例,共分析{analysis_count}次...", "分析")
for i in range(analysis_count):
# 获取当前图像
current_image = ctrl.image_processor.get_current_image()
if current_image is not None:
try:
# 分析黄色区域比例
yellow_ratio = analyze_yellow_area_ratio(current_image)
yellow_ratios.append(yellow_ratio)
info(f"分析{i+1}: 黄色区域比例 {yellow_ratio:.2%}", "分析")
except Exception as e:
warning(f"分析{i+1}失败: {str(e)}", "警告")
time.sleep(0.5)
# 统计结果
if yellow_ratios:
avg_ratio = sum(yellow_ratios) / len(yellow_ratios)
min_ratio = min(yellow_ratios)
max_ratio = max(yellow_ratios)
info(f"黄色区域分析统计结果:", "统计")
info(f" 平均黄色区域比例: {avg_ratio:.2%}", "统计")
info(f" 最小黄色区域比例: {min_ratio:.2%}", "统计")
info(f" 最大黄色区域比例: {max_ratio:.2%}", "统计")
success("黄色区域分析功能测试完成", "成功")
else:
warning("黄色区域分析功能测试完成,但未获得有效分析结果", "警告")
except Exception as e:
error(f"黄色区域分析测试失败: {str(e)}", "失败")
def test_movement_with_qr_scanning(ctrl, msg, observe=True):
"""测试移动过程中的QR码扫描"""
section('测试移动过程中的QR码扫描', "综合测试")
info('开始测试移动过程中的QR码扫描...', "测试")
try:
# 记录初始位置
initial_pos = ctrl.odo_msg.xyz
info(f"初始位置: x={initial_pos[0]:.3f}, y={initial_pos[1]:.3f}", "位置")
# 使用带QR码扫描的直线移动
qr_result = go_straight_with_qrcode(
ctrl, msg,
distance=2.0,
speed=0.3,
qr_check_interval=0.3,
observe=observe
)
# 记录最终位置
final_pos = ctrl.odo_msg.xyz
distance_moved = ((final_pos[0] - initial_pos[0])**2 + (final_pos[1] - initial_pos[1])**2)**0.5
info(f"最终位置: x={final_pos[0]:.3f}, y={final_pos[1]:.3f}", "位置")
info(f"移动距离: {distance_moved:.3f}", "距离")
# 报告QR码扫描结果
if qr_result:
success(f"移动过程中成功扫描到QR码: {qr_result}", "成功")
else:
warning("移动过程中未扫描到QR码", "警告")
success("移动过程中的QR码扫描测试完成", "成功")
except Exception as e:
error(f"移动过程中的QR码扫描测试失败: {str(e)}", "失败")
def run_vision_function_tests(ctrl, msg):
"""运行所有视觉功能测试"""
section('视觉功能测试套件', "开始")
tests = [
("QR码扫描功能", test_qr_code_scanning),
("箭头检测功能", test_arrow_detection),
("横线检测功能", test_horizontal_line_detection),
("天空分析功能", test_sky_analysis),
("黄色区域分析功能", test_yellow_area_analysis),
("移动过程中的QR码扫描", test_movement_with_qr_scanning),
]
for test_name, test_func in tests:
try:
info(f"开始执行测试: {test_name}", "测试")
test_func(ctrl, msg)
success(f"测试 {test_name} 成功完成", "成功")
except Exception as e:
error(f"测试 {test_name} 失败: {str(e)}", "失败")
# 每个测试之间暂停
time.sleep(2)
success("所有视觉功能测试完成", "完成")
if __name__ == "__main__":
# 这里可以添加独立运行的代码
print("视觉功能测试脚本")
print("使用方法:")
print("from single_test.test_vision_functions import run_vision_function_tests")
print("run_vision_function_tests(ctrl, msg)")