feat(utils): add QR code decoding functionality
- Integrate QReader for QR code detection and decoding - Implement decode_qrcode method in ImageProcessor class - Uncomment run_task_1 function call in main.py
This commit is contained in:
parent
4d924d889a
commit
c9686a84e1
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
*/__pycachee__/
|
2
main.py
2
main.py
@ -32,7 +32,7 @@ def main():
|
|||||||
Ctrl.Send_cmd(msg)
|
Ctrl.Send_cmd(msg)
|
||||||
Ctrl.Wait_finish(12, 0)
|
Ctrl.Wait_finish(12, 0)
|
||||||
|
|
||||||
# run_task_1(Ctrl, msg)
|
run_task_1(Ctrl, msg)
|
||||||
|
|
||||||
time.sleep(100)
|
time.sleep(100)
|
||||||
|
|
||||||
|
Binary file not shown.
BIN
test/plugins/qrcode/binary_qrcode-A1.jpg
Normal file
BIN
test/plugins/qrcode/binary_qrcode-A1.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 485 KiB |
15
test/plugins/qrcode/opencv-test.py
Normal file
15
test/plugins/qrcode/opencv-test.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import cv2
|
||||||
|
|
||||||
|
# Read the image containing a QR code
|
||||||
|
img = cv2.imread("image.png")
|
||||||
|
|
||||||
|
# Create a QR code detector
|
||||||
|
qr_decoder = cv2.QRCodeDetector()
|
||||||
|
|
||||||
|
# Detect and decode the QR code
|
||||||
|
data, bbox, _ = qr_decoder.detectAndDecode(img)
|
||||||
|
|
||||||
|
if data:
|
||||||
|
print("Decoded data:", data)
|
||||||
|
else:
|
||||||
|
print("QR code not detected.")
|
29
test/plugins/qrcode/pyzbar-test.py
Normal file
29
test/plugins/qrcode/pyzbar-test.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import cv2
|
||||||
|
from pyzbar import pyzbar
|
||||||
|
|
||||||
|
def read_qrcode(image_path):
|
||||||
|
# 读取图像
|
||||||
|
img = cv2.imread(image_path)
|
||||||
|
|
||||||
|
if img is None:
|
||||||
|
print("无法加载图像,请检查文件路径是否正确。")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 图像预处理:转换为灰度图并进行二值化处理
|
||||||
|
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||||
|
_, binary_img = cv2.threshold(gray_img, 128, 255, cv2.THRESH_BINARY)
|
||||||
|
|
||||||
|
# 识别二维码
|
||||||
|
barcodes = pyzbar.decode(binary_img)
|
||||||
|
|
||||||
|
# 输出结果
|
||||||
|
if not barcodes:
|
||||||
|
print("未检测到二维码。")
|
||||||
|
else:
|
||||||
|
for barcode in barcodes:
|
||||||
|
barcode_data = barcode.data.decode("utf-8")
|
||||||
|
barcode_type = barcode.type
|
||||||
|
print(f"发现 {barcode_type} 二维码: {barcode_data}")
|
||||||
|
|
||||||
|
# 使用示例
|
||||||
|
read_qrcode("image.png")
|
BIN
test/plugins/qrcode/qrcode-A1.jpg
Normal file
BIN
test/plugins/qrcode/qrcode-A1.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 13 MiB |
BIN
test/plugins/qrcode/qrcode.png
Normal file
BIN
test/plugins/qrcode/qrcode.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.4 KiB |
13
test/plugins/qrcode/qreader-test.py
Normal file
13
test/plugins/qrcode/qreader-test.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
from qreader import QReader
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
# 创建QReader实例
|
||||||
|
qreader = QReader()
|
||||||
|
|
||||||
|
# 读取包含二维码的图像
|
||||||
|
image = cv2.cvtColor(cv2.imread("captured_images/qrcode-2/image_20250511_103244.png"), cv2.COLOR_BGR2RGB)
|
||||||
|
|
||||||
|
# 检测并解码二维码
|
||||||
|
decoded_text = qreader.detect_and_decode(image=image)
|
||||||
|
|
||||||
|
print(decoded_text[0])
|
102
test/plugins/qrcode/test-1.py
Normal file
102
test/plugins/qrcode/test-1.py
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
"""
|
||||||
|
author: puhaiyang
|
||||||
|
blog: https://blog.csdn.net/puhaiyang
|
||||||
|
github: https://github.com/puhaiyang
|
||||||
|
"""
|
||||||
|
import math
|
||||||
|
import cv2
|
||||||
|
from pyzbar import pyzbar
|
||||||
|
import imutils
|
||||||
|
|
||||||
|
|
||||||
|
def azimuthangle(x1, y1, x2, y2):
|
||||||
|
""" 已知两点坐标计算角度 -
|
||||||
|
:param x1: 原点横坐标值
|
||||||
|
:param y1: 原点纵坐标值
|
||||||
|
:param x2: 目标点横坐标值
|
||||||
|
:param y2: 目标纵坐标值
|
||||||
|
"""
|
||||||
|
dx = x2 - x1
|
||||||
|
dy = y2 - y1
|
||||||
|
# 求斜率
|
||||||
|
k = dy / dx
|
||||||
|
# 结果是弧度值
|
||||||
|
angle = math.atan(k)
|
||||||
|
# 弧度值转为角度
|
||||||
|
return angle * 180 / math.pi
|
||||||
|
|
||||||
|
|
||||||
|
def get_angle(qr_item):
|
||||||
|
"""
|
||||||
|
获取出进行矫正所需要的角度
|
||||||
|
"""
|
||||||
|
# 将坐标从下到上,从左到右进行排序
|
||||||
|
locs = {qr_item.polygon[0], qr_item.polygon[1], qr_item.polygon[2], qr_item.polygon[3]}
|
||||||
|
locs = sorted(locs, key=lambda x: x.y * 100000 + x.x * 1000)
|
||||||
|
return azimuthangle(locs[2].x, locs[2].y, locs[3].x, locs[3].y)
|
||||||
|
|
||||||
|
|
||||||
|
def to_up_angle(qr_item):
|
||||||
|
"""
|
||||||
|
获取出使二维码朝上的角度
|
||||||
|
"""
|
||||||
|
if qr_item.orientation == 'UP':
|
||||||
|
angle_ext = 0
|
||||||
|
elif qr_item.orientation == 'RIGHT':
|
||||||
|
angle_ext = 270
|
||||||
|
elif qr_item.orientation == 'DOWN':
|
||||||
|
angle_ext = 180
|
||||||
|
else:
|
||||||
|
angle_ext = 90
|
||||||
|
return angle_ext
|
||||||
|
|
||||||
|
|
||||||
|
def resize_img(ori_img):
|
||||||
|
"""
|
||||||
|
图片压缩
|
||||||
|
"""
|
||||||
|
height = ori_img.shape[0]
|
||||||
|
width = ori_img.shape[1]
|
||||||
|
# 执行压缩,按照500的宽度为标准
|
||||||
|
if width > 500:
|
||||||
|
scale_percent = int(500 / width * 100)
|
||||||
|
s_width = int(width * scale_percent / 100)
|
||||||
|
s_height = int(height * scale_percent / 100)
|
||||||
|
# 新的宽度和高度
|
||||||
|
dim = (s_width, s_height)
|
||||||
|
return cv2.resize(ori_img, dim, interpolation=cv2.INTER_AREA)
|
||||||
|
else:
|
||||||
|
return ori_img
|
||||||
|
|
||||||
|
|
||||||
|
def adjust_rotae_angle(img):
|
||||||
|
angle = 0
|
||||||
|
# 对图片进行压缩
|
||||||
|
img = resize_img(img)
|
||||||
|
# symbol为64代表二维码
|
||||||
|
qr_result = pyzbar.decode(img, symbols=[64])
|
||||||
|
if len(qr_result) == 1:
|
||||||
|
# 识别到了一个二维码,将二维码朝上旋转
|
||||||
|
first_adjust_angle = to_up_angle(qr_result[0])
|
||||||
|
# 进行旋转
|
||||||
|
img_rotae_to_up = imutils.rotate_bound(img, first_adjust_angle)
|
||||||
|
# 再次识别
|
||||||
|
qr_result2 = pyzbar.decode(img_rotae_to_up, symbols=[64])
|
||||||
|
if len(qr_result2) == 1:
|
||||||
|
last_adjust_angle = -get_angle(qr_result2[0])
|
||||||
|
angle = first_adjust_angle + last_adjust_angle
|
||||||
|
print("first angle:%d last angle:%d angle:%d" % (first_adjust_angle, last_adjust_angle, angle))
|
||||||
|
else:
|
||||||
|
print('last 未识别到二维码')
|
||||||
|
else:
|
||||||
|
print('first 未识别到二维码')
|
||||||
|
return angle
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# 加载图片
|
||||||
|
img = cv2.imread('123.jpg')
|
||||||
|
adjust_angle = adjust_rotae_angle(img.copy())
|
||||||
|
if adjust_angle != 0:
|
||||||
|
img_rotae = imutils.rotate_bound(img, adjust_angle)
|
||||||
|
cv2.imwrite('img_rotae.jpg', img_rotae)
|
13
test/plugins/qrcode/threshold.py
Normal file
13
test/plugins/qrcode/threshold.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import cv2
|
||||||
|
|
||||||
|
# 读取图像(灰度)
|
||||||
|
image = cv2.imread('qrcode-A1.jpg', 0)
|
||||||
|
|
||||||
|
# 设置阈值
|
||||||
|
threshold_value = 100
|
||||||
|
|
||||||
|
# 进行二值化处理
|
||||||
|
_, binary_image = cv2.threshold(image, threshold_value, 255, cv2.THRESH_BINARY)
|
||||||
|
|
||||||
|
# 保存结果
|
||||||
|
cv2.imwrite('binary_qrcode-A1.jpg', binary_image)
|
10
test/plugins/qrcode/zxing.py
Normal file
10
test/plugins/qrcode/zxing.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
from pyzxing import BarCodeReader
|
||||||
|
reader = BarCodeReader()
|
||||||
|
results = reader.decode('binary_qrcode-A1.jpg')
|
||||||
|
print(results[0])
|
||||||
|
# # 支持输入文件模式以检测多个文件
|
||||||
|
# results = reader.decode('/PATH/TO/FILES/*.png')
|
||||||
|
# print(results)
|
||||||
|
# 支持传入图片的向量
|
||||||
|
# 需要额外安装opencv,pip install opencv-python
|
||||||
|
# results = reader.decode_array(img)
|
60
test/ros2/rgb-camera/image-raw-show.py
Normal file
60
test/ros2/rgb-camera/image-raw-show.py
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import Image
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
import cv2
|
||||||
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
|
||||||
|
class ImageSubscriber(Node):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('image_subscriber')
|
||||||
|
# 定义 QoS 配置(匹配发布者的可靠性策略)
|
||||||
|
qos_profile = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 或 BEST_EFFORT,取决于发布者
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10
|
||||||
|
)
|
||||||
|
|
||||||
|
self.subscription = self.create_subscription(
|
||||||
|
Image,
|
||||||
|
'/rgb_camera/image_raw',
|
||||||
|
self.image_callback,
|
||||||
|
qos_profile=qos_profile
|
||||||
|
)
|
||||||
|
self.subscription # 防止未使用变量警告
|
||||||
|
self.bridge = CvBridge()
|
||||||
|
|
||||||
|
def image_callback(self, msg):
|
||||||
|
try:
|
||||||
|
# 将ROS图像消息转换为OpenCV格式
|
||||||
|
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
||||||
|
|
||||||
|
# 在这里进行你的图像分析
|
||||||
|
# 例如显示图像
|
||||||
|
cv2.imshow("Camera Feed", cv_image)
|
||||||
|
cv2.waitKey(1)
|
||||||
|
|
||||||
|
# 可以在这里添加你的图像处理代码
|
||||||
|
# analyze_image(cv_image)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error('Failed to convert image: %s' % str(e))
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
image_subscriber = ImageSubscriber()
|
||||||
|
|
||||||
|
try:
|
||||||
|
rclpy.spin(image_subscriber)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 清理
|
||||||
|
image_subscriber.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
71
test/ros2/rgb-camera/img-raw-get.py
Normal file
71
test/ros2/rgb-camera/img-raw-get.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import Image
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
import cv2
|
||||||
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
class ImageSubscriber(Node):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('image_subscriber')
|
||||||
|
|
||||||
|
# Define QoS Profile
|
||||||
|
qos_profile = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10
|
||||||
|
)
|
||||||
|
|
||||||
|
self.subscription = self.create_subscription(
|
||||||
|
Image,
|
||||||
|
'/rgb_camera/image_raw',
|
||||||
|
self.image_callback,
|
||||||
|
qos_profile=qos_profile
|
||||||
|
)
|
||||||
|
self.subscription # Prevent unused variable warning
|
||||||
|
self.bridge = CvBridge()
|
||||||
|
self.cv_image = None # Store latest image
|
||||||
|
self.image_saved = False # Flag to track if image is saved
|
||||||
|
|
||||||
|
def image_callback(self, msg):
|
||||||
|
if self.image_saved:
|
||||||
|
return # Skip processing if image already saved
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Convert image message to OpenCV format
|
||||||
|
self.cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
||||||
|
self.save_image()
|
||||||
|
self.image_saved = True # Mark image as saved
|
||||||
|
|
||||||
|
# Shutdown the node after saving the image
|
||||||
|
self.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error('Failed to convert image: %s' % str(e))
|
||||||
|
|
||||||
|
def save_image(self):
|
||||||
|
if self.cv_image is not None:
|
||||||
|
# Generate a timestamped filename
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"captured_images/arrow-right/image_{timestamp}.png"
|
||||||
|
|
||||||
|
cv2.imwrite(filename, self.cv_image)
|
||||||
|
self.get_logger().info(f"Saved image as {filename}")
|
||||||
|
else:
|
||||||
|
self.get_logger().warn("No image received yet to save.")
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
image_subscriber = ImageSubscriber()
|
||||||
|
|
||||||
|
try:
|
||||||
|
rclpy.spin(image_subscriber)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
BIN
utils/__pycache__/image_raw.cpython-38.pyc
Normal file
BIN
utils/__pycache__/image_raw.cpython-38.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc
Normal file
BIN
utils/__pycache__/robot_control_cmd_lcmt.cpython-38.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/robot_control_response_lcmt.cpython-38.pyc
Normal file
BIN
utils/__pycache__/robot_control_response_lcmt.cpython-38.pyc
Normal file
Binary file not shown.
@ -6,6 +6,8 @@ from sensor_msgs.msg import Image
|
|||||||
from cv_bridge import CvBridge
|
from cv_bridge import CvBridge
|
||||||
import cv2
|
import cv2
|
||||||
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
from qreader import QReader
|
||||||
|
|
||||||
|
|
||||||
class ImageSubscriber(Node):
|
class ImageSubscriber(Node):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -35,10 +37,12 @@ class ImageSubscriber(Node):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().error('Failed to convert image: %s' % str(e))
|
self.get_logger().error('Failed to convert image: %s' % str(e))
|
||||||
|
|
||||||
|
|
||||||
class ImageProcessor:
|
class ImageProcessor:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
rclpy.init()
|
rclpy.init()
|
||||||
self.image_subscriber = ImageSubscriber()
|
self.image_subscriber = ImageSubscriber()
|
||||||
|
self.qreader = QReader()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
@ -46,13 +50,20 @@ class ImageProcessor:
|
|||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_current_image(self):
|
|
||||||
return self.image_subscriber.cv_image
|
|
||||||
|
|
||||||
def destroy(self):
|
def destroy(self):
|
||||||
self.image_subscriber.destroy_node()
|
self.image_subscriber.destroy_node()
|
||||||
rclpy.shutdown()
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
def get_current_image(self):
|
||||||
|
return self.image_subscriber.cv_image
|
||||||
|
|
||||||
|
def decode_qrcode(self, img = None):
|
||||||
|
if img is None:
|
||||||
|
img = self.get_current_image()
|
||||||
|
decoded_info = self.qreader.detect_and_decode(image=img)
|
||||||
|
return decoded_info[0]
|
||||||
|
|
||||||
|
|
||||||
""" DEBUG """
|
""" DEBUG """
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user