增加 MarkerRunner 支持,优化标记放置功能

- 在 Robot_Ctrl 类中初始化 MarkerRunner,并在运行时调用其方法
- 在移动到横向线的函数中添加标记放置功能,分别在起点和终点放置绿色和红色标记
- 增强错误处理,确保 MarkerRunner 正确销毁
This commit is contained in:
Havoc 2025-05-15 11:19:24 +08:00
parent 30d02cba9d
commit f77cada464
3 changed files with 87 additions and 1 deletions

View File

@ -302,6 +302,9 @@ def move_to_hori_line(ctrl, msg, target_distance=0.1, observe=False):
if observe: if observe:
print(f"起始位置: {start_position}") print(f"起始位置: {start_position}")
print(f"开始移动: {'前进' if forward else '后退'} {abs(distance_to_move):.3f}") print(f"开始移动: {'前进' if forward else '后退'} {abs(distance_to_move):.3f}")
# 在起点放置绿色标记
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(start_position[0], start_position[1], start_position[2] if len(start_position) > 2 else 0.0, 'green', observe=True)
# 估算移动时间,但实际上会通过里程计控制 # 估算移动时间,但实际上会通过里程计控制
move_time = abs(distance_to_move) / move_speed move_time = abs(distance_to_move) / move_speed
@ -340,6 +343,10 @@ def move_to_hori_line(ctrl, msg, target_distance=0.1, observe=False):
if observe: if observe:
print(f"移动完成,通过里程计计算的移动距离: {distance_moved:.3f}") print(f"移动完成,通过里程计计算的移动距离: {distance_moved:.3f}")
# 在终点放置红色标记
end_position = list(ctrl.odo_msg.xyz)
if hasattr(ctrl, 'place_marker'):
ctrl.place_marker(end_position[0], end_position[1], end_position[2] if len(end_position) > 2 else 0.0, 'red', observe=True)
# 如果没有提供图像处理器或图像验证失败,则使用里程计数据判断 # 如果没有提供图像处理器或图像验证失败,则使用里程计数据判断
return abs(distance_moved - abs(distance_to_move)) < 0.1 # 如果误差小于10厘米则认为成功 return abs(distance_moved - abs(distance_to_move)) < 0.1 # 如果误差小于10厘米则认为成功

26
main.py
View File

@ -15,6 +15,7 @@ from utils.robot_control_cmd_lcmt import robot_control_cmd_lcmt
from utils.robot_control_response_lcmt import robot_control_response_lcmt from utils.robot_control_response_lcmt import robot_control_response_lcmt
from utils.localization_lcmt import localization_lcmt from utils.localization_lcmt import localization_lcmt
from utils.image_raw import ImageProcessor from utils.image_raw import ImageProcessor
from utils.marker_client import MarkerRunner
from task_1.task_1 import run_task_1 from task_1.task_1 import run_task_1
from task_5.task_5 import run_task_5 from task_5.task_5 import run_task_5
@ -63,11 +64,14 @@ class Robot_Ctrl(object):
self.rec_msg = robot_control_response_lcmt() self.rec_msg = robot_control_response_lcmt()
self.odo_msg = localization_lcmt() self.odo_msg = localization_lcmt()
self.image_processor = ImageProcessor() self.image_processor = ImageProcessor()
# DEBUG
self.marker_runner = MarkerRunner()
self.send_lock = Lock() self.send_lock = Lock()
self.delay_cnt = 0 self.delay_cnt = 0
self.mode_ok = 0 self.mode_ok = 0
self.gait_ok = 0 self.gait_ok = 0
self.runing = 1 self.runing = 1
# 初始化 MarkerRunner
def run(self): def run(self):
self.lc_r.subscribe("robot_control_response", self.msg_handler) self.lc_r.subscribe("robot_control_response", self.msg_handler)
@ -76,6 +80,7 @@ class Robot_Ctrl(object):
self.rec_thread.start() self.rec_thread.start()
self.odo_thread.start() self.odo_thread.start()
self.image_processor.run() self.image_processor.run()
self.marker_runner.run()
def msg_handler(self, channel, data): def msg_handler(self, channel, data):
self.rec_msg = robot_control_response_lcmt().decode(data) self.rec_msg = robot_control_response_lcmt().decode(data)
@ -126,12 +131,31 @@ class Robot_Ctrl(object):
self.cmd_msg = msg self.cmd_msg = msg
self.send_lock.release() self.send_lock.release()
def place_marker(self, x, y, z, color, observe=False):
"""调用 MarkerRunner 放置标志物"""
if self.marker_runner is None or self.marker_runner.marker_client is None:
print("MarkerRunner 未初始化,无法放置标志物")
return None
try:
response = self.marker_runner.send_request(x, y, z, color, observe=observe)
print(f"放置标志物结果: {response.success}, 消息: {response.message}")
return response
except Exception as e:
print(f"放置标志物时发生异常: {e}")
return None
def quit(self): def quit(self):
self.runing = 0 self.runing = 0
self.rec_thread.join() self.rec_thread.join()
self.send_thread.join() self.send_thread.join()
self.image_processor.destroy() self.image_processor.destroy()
# 销毁 MarkerRunner
if hasattr(self, 'marker_runner') and self.marker_runner is not None:
try:
self.marker_runner.destroy()
except Exception as e:
print(f"MarkerRunner 销毁失败: {e}")
# Main function # Main function
if __name__ == '__main__': if __name__ == '__main__':
main() main()

55
utils/marker_client.py Executable file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from cyberdog_marker.srv import PlaceMarker
class MarkerClient(Node):
def __init__(self):
super().__init__('marker_client')
self.client = self.create_client(PlaceMarker, 'place_marker')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务不可用,等待...')
self.req = PlaceMarker.Request()
def send_request(self, x, y, z, color):
self.req.x = x
self.req.y = y
self.req.z = z
self.req.color = color
self.future = self.client.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
class MarkerRunner:
def __init__(self):
self.marker_client = None
def run(self):
rclpy.init()
self.marker_client = MarkerClient()
def destroy(self):
self.marker_client.destroy_node()
rclpy.shutdown()
def send_request(self, x, y, z, color, observe=False):
response = self.marker_client.send_request(x, y, z, color)
if observe:
self.marker_client.get_logger().info(
f'结果: {response.success}, 消息: {response.message}')
return response
def main(args=None):
rclpy.init(args=args)
client = MarkerClient()
response = client.send_request(1.0, 1.0, 0.0, 'blue')
client.get_logger().info(
f'结果: {response.success}, 消息: {response.message}')
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()