2025-05-13 09:44:34 +08:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
import cv2
|
|
|
|
|
|
|
|
|
|
# 添加父目录到路径,以便能够导入utils
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
|
|
|
|
from utils.image_raw import ImageProcessor
|
|
|
|
|
from utils.decode_arrow import detect_arrow_direction, visualize_arrow_detection
|
2025-05-17 12:34:02 +08:00
|
|
|
|
from utils.log_helper import LogHelper, get_logger, section, info, debug, warning, error, success, timing
|
|
|
|
|
|
|
|
|
|
# 创建本模块特定的日志记录器
|
|
|
|
|
logger = get_logger("箭头检测")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
class ArrowDetector:
|
|
|
|
|
def __init__(self, image_processor=None):
|
|
|
|
|
"""
|
|
|
|
|
初始化箭头检测器
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
image_processor: 可选的ImageProcessor实例,如果不提供则会创建一个新的
|
|
|
|
|
"""
|
|
|
|
|
# 如果提供了图像处理器,则使用它,否则创建新的
|
|
|
|
|
if image_processor is not None:
|
|
|
|
|
self.image_processor = image_processor
|
|
|
|
|
self.should_destroy = False # 不应该在destroy方法中销毁外部传入的实例
|
|
|
|
|
else:
|
|
|
|
|
self.image_processor = ImageProcessor()
|
|
|
|
|
self.image_processor.run()
|
|
|
|
|
self.should_destroy = True # 应该在destroy方法中销毁自己创建的实例
|
|
|
|
|
|
2025-05-17 12:34:02 +08:00
|
|
|
|
info("箭头检测器已初始化", "启动")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
def get_arrow_direction(self):
|
|
|
|
|
"""
|
|
|
|
|
获取当前图像中箭头的方向
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
direction: 字符串,"left"表示左箭头,"right"表示右箭头,"unknown"表示无法确定
|
|
|
|
|
"""
|
|
|
|
|
# 获取当前图像
|
|
|
|
|
image = self.image_processor.get_current_image()
|
|
|
|
|
|
|
|
|
|
if image is None:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
warning("无法获取图像", "警告")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
return "unknown"
|
|
|
|
|
|
2025-05-17 12:34:02 +08:00
|
|
|
|
debug("正在检测箭头方向...", "检测")
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
2025-05-13 09:44:34 +08:00
|
|
|
|
# 检测箭头方向
|
|
|
|
|
direction = detect_arrow_direction(image)
|
2025-05-17 12:34:02 +08:00
|
|
|
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
|
timing("箭头检测", elapsed)
|
|
|
|
|
|
|
|
|
|
if direction == "unknown":
|
|
|
|
|
warning("未能识别箭头方向", "检测")
|
|
|
|
|
else:
|
|
|
|
|
success(f"检测到{direction}箭头", "检测")
|
|
|
|
|
|
2025-05-13 09:44:34 +08:00
|
|
|
|
return direction
|
|
|
|
|
|
|
|
|
|
def visualize_current_detection(self, save_path=None):
|
|
|
|
|
"""
|
|
|
|
|
可视化当前图像的箭头检测过程
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
save_path: 保存结果图像的路径(可选)
|
|
|
|
|
"""
|
|
|
|
|
# 获取当前图像
|
|
|
|
|
image = self.image_processor.get_current_image()
|
|
|
|
|
|
|
|
|
|
if image is None:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
warning("无法获取图像", "警告")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
return
|
|
|
|
|
|
2025-05-17 12:34:02 +08:00
|
|
|
|
info(f"可视化箭头检测{' 并保存到 ' + save_path if save_path else ''}", "信息")
|
|
|
|
|
|
2025-05-13 09:44:34 +08:00
|
|
|
|
# 可视化箭头检测
|
|
|
|
|
visualize_arrow_detection(image, save_path)
|
2025-05-17 12:34:02 +08:00
|
|
|
|
|
|
|
|
|
if save_path:
|
|
|
|
|
success(f"检测结果已保存到 {save_path}", "完成")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
def destroy(self):
|
|
|
|
|
"""
|
|
|
|
|
清理资源
|
|
|
|
|
"""
|
|
|
|
|
if self.should_destroy:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
info("正在销毁图像处理器...", "信息")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
self.image_processor.destroy()
|
2025-05-17 12:34:02 +08:00
|
|
|
|
success("箭头检测器已销毁", "完成")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
"""
|
|
|
|
|
演示箭头检测器的用法
|
|
|
|
|
"""
|
|
|
|
|
try:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
section("箭头检测演示", "启动")
|
|
|
|
|
|
2025-05-13 09:44:34 +08:00
|
|
|
|
# 初始化箭头检测器
|
|
|
|
|
detector = ArrowDetector()
|
|
|
|
|
|
|
|
|
|
# 等待一段时间,确保图像已经接收
|
2025-05-17 12:34:02 +08:00
|
|
|
|
info("等待接收图像...", "信息")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
time.sleep(3)
|
|
|
|
|
|
|
|
|
|
# 持续检测箭头方向
|
2025-05-17 12:34:02 +08:00
|
|
|
|
section("开始箭头检测", "检测")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
for i in range(10): # 检测10次
|
|
|
|
|
direction = detector.get_arrow_direction()
|
2025-05-17 12:34:02 +08:00
|
|
|
|
info(f"检测到的箭头方向 ({i+1}/10): {direction}", "检测")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
# 可选: 保存第一次检测的可视化结果
|
|
|
|
|
if i == 0:
|
|
|
|
|
detector.visualize_current_detection("arrow_detection_result.jpg")
|
|
|
|
|
|
|
|
|
|
time.sleep(1) # 每秒检测一次
|
|
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
warning("\n程序被用户中断", "停止")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
except Exception as e:
|
2025-05-17 12:34:02 +08:00
|
|
|
|
error(f"发生错误: {e}", "错误")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
finally:
|
|
|
|
|
# 清理资源
|
2025-05-17 12:34:02 +08:00
|
|
|
|
section("清理资源", "停止")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
if 'detector' in locals():
|
|
|
|
|
detector.destroy()
|
2025-05-17 12:34:02 +08:00
|
|
|
|
success("程序已退出", "完成")
|
2025-05-13 09:44:34 +08:00
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|