Browse Source

【6月5日发往现场】增加偏移量补偿

master
Lizongdi 2 weeks ago
parent
commit
320dcfe6d7
  1. 134
      kelaifen/Kelaifen_V6.0/trajectory_program.py

134
kelaifen/Kelaifen_V6.0/trajectory_program.py

@ -16,10 +16,14 @@ from trajectory_data import (
Pose_1_2, Pose_2_1, Pose_1_2, Pose_2_1,
) )
import time import time
import copy
from collections import deque
import queue
# 导入串口处理模块 # 导入串口处理模块
import serial_handler import serial_handler
# 采样时间、延时时间和衔接机器人速度 # 采样时间、延时时间和衔接机器人速度
ENABLE_INCREMENTS = 1 # 表示是否支持偏移
SAMP_TIME = 20 SAMP_TIME = 20
SLP_TIME = 0.02 SLP_TIME = 0.02
# 补偿值更新队列(解耦线程通信,避免锁竞争) # 补偿值更新队列(解耦线程通信,避免锁竞争)
@ -31,6 +35,17 @@ increments_x = 0
increments_y = 0 increments_y = 0
increments_z = 0 increments_z = 0
Coordinate_compensation = [0, 0, 0, 0, 0, 0] Coordinate_compensation = [0, 0, 0, 0, 0, 0]
FILTER_N = 10
buf_x = deque([0.0] * FILTER_N, maxlen=FILTER_N)
buf_y = deque([0.0] * FILTER_N, maxlen=FILTER_N)
buf_z = deque([0.0] * FILTER_N, maxlen=FILTER_N)
X_M = 0.0
Y_M = 0.0
Z_M = 0.0
# 预期的帧长度,可根据实际情况修改 # 预期的帧长度,可根据实际情况修改
EXPECTED_LENGTH = 7 EXPECTED_LENGTH = 7
global sample_time global sample_time
@ -51,6 +66,87 @@ serial_shared = SerialSharedData()
# 轨迹插值点数 # 轨迹插值点数
Interpolation_points = 20 Interpolation_points = 20
def reset_filter():
global buf_x
global buf_y
global buf_z
global X_M
global Y_M
global Z_M
buf_x = deque([0.0] * FILTER_N, maxlen=FILTER_N)
buf_y = deque([0.0] * FILTER_N, maxlen=FILTER_N)
buf_z = deque([0.0] * FILTER_N, maxlen=FILTER_N)
X_M = 0.0
Y_M = 0.0
Z_M = 0.0
while True:
try:
compensate_queue.get_nowait()
except:
break
print("Filter Reset")
def update_filter(x, y, z):
buf_x.append(x)
buf_y.append(y)
buf_z.append(z)
x_m = sum(buf_x) / len(buf_x)
y_m = sum(buf_y) / len(buf_y)
z_m = sum(buf_z) / len(buf_z)
MAX_COMP = 200
x_m = max(min(x_m, MAX_COMP), -MAX_COMP)
y_m = max(min(y_m, MAX_COMP), -MAX_COMP)
z_m = max(min(z_m, MAX_COMP), -MAX_COMP)
return x_m, y_m, z_m
def get_compensated_point(point):
global X_M
global Y_M
global Z_M
try:
while True:
increments_x, increments_y, increments_z = \
compensate_queue.get_nowait()
increments_x = max(min(increments_x, 200), -200)
increments_y = max(min(increments_y, 200), -200)
increments_z = max(min(increments_z, 200), -200)
X_M, Y_M, Z_M = update_filter(
increments_x,
increments_y,
increments_z
)
except queue.Empty:
pass
point_M = point.copy()
if 1 == ENABLE_INCREMENTS:
point_M[0] += X_M
point_M[1] += Y_M
point_M[2] += Z_M
return point_M
# 第一段程序封装为函数 # 第一段程序封装为函数
# 第二段程序封装为函数(上升喷涂) # 第二段程序封装为函数(上升喷涂)
# 第三段程序封装为函数 # 第三段程序封装为函数
@ -982,6 +1078,10 @@ def program12(sock):
def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1): def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1):
global sample_time, lookahead_time global sample_time, lookahead_time
if 1 == ENABLE_INCREMENTS:
reset_filter()
lookahead_time = 400 # 前瞻时间 (ms) lookahead_time = 400 # 前瞻时间 (ms)
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
@ -1069,13 +1169,15 @@ def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_
turn_on_relay() turn_on_relay()
if idx == CLOSE_1_M1S: if idx == CLOSE_1_M1S:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_3_4_M: elif traj is Pose_3_4_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
if idx == CLOSE_1_M1E: if idx == CLOSE_1_M1E:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_4_M: elif traj is Pose_4_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
@ -1083,18 +1185,21 @@ def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_
turn_on_relay() turn_on_relay()
if idx == CLOSE_2_M2S: if idx == CLOSE_2_M2S:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_4_3_M: elif traj is Pose_4_3_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
if idx == CLOSE_2_M2E: if idx == CLOSE_2_M2E:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
# 其他轨迹(如 Stop 轨迹)无需喷枪控制 # 其他轨迹(如 Stop 轨迹)无需喷枪控制
for point in traj: for point in traj:
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
print(f"停止轨迹,NUM = {NUM}, delay = {delay}") print(f"停止轨迹,NUM = {NUM}, delay = {delay}")
stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1)
@ -1112,6 +1217,10 @@ def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_
def program101(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1): def program101(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1):
global sample_time, lookahead_time global sample_time, lookahead_time
if 1 == ENABLE_INCREMENTS:
reset_filter()
lookahead_time = 400 # 前瞻时间 (ms) lookahead_time = 400 # 前瞻时间 (ms)
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
@ -1201,13 +1310,15 @@ def program101(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_
turn_on_relay() turn_on_relay()
if idx == CLOSE_1_M1S: if idx == CLOSE_1_M1S:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_3_4_M: elif traj is Pose_3_4_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
if idx == CLOSE_1_M1E: if idx == CLOSE_1_M1E:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_4_M: elif traj is Pose_4_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
@ -1215,18 +1326,21 @@ def program101(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_
turn_on_relay() turn_on_relay()
if idx == CLOSE_2_M2S: if idx == CLOSE_2_M2S:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
elif traj is Pose_4_3_M: elif traj is Pose_4_3_M:
for idx, point in enumerate(traj, start=1): for idx, point in enumerate(traj, start=1):
if idx == CLOSE_2_M2E: if idx == CLOSE_2_M2E:
turn_off_relay() turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
# 其他轨迹(如 Stop 轨迹)无需喷枪控制 # 其他轨迹(如 Stop 轨迹)无需喷枪控制
for point in traj: for point in traj:
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point}) point_M = get_compensated_point(point)
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point_M})
time.sleep(sleep_time) time.sleep(sleep_time)
print(f"停止轨迹,NUM = {NUM}, delay = {delay}") print(f"停止轨迹,NUM = {NUM}, delay = {delay}")
stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1)

Loading…
Cancel
Save