You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

977 lines
41 KiB

from queue import Queue
from ETController import send_Point, sendCMD, wait_stop
from gpio import turn_off_relay, turn_on_relay
# from program_config import (
# PRESET_PARAMS, turn_off_relay_end, turn_off_relay_start,
# turn_on_relay_end, turn_on_relay_start
# )
from serial_init import SerialSharedData
from trajectory_data import (
Pose_1, Pose_2, Pose_3, Pose_4,
joint_positions_assembled_1, joint_positions_assembled_2,
joint_positions_assembled_3, joint_positions_assembled_4
)
import time
# 导入串口处理模块
import serial_handler
# 采样时间、延时时间和衔接机器人速度
SAMP_TIME = 20
SLP_TIME = 0.02
# 补偿值更新队列(解耦线程通信,避免锁竞争)
compensate_queue = Queue(maxsize=1) # 最多缓存1个最新值,确保数据时效性
# 给串口模块赋值队列(新增,关键!)
serial_handler.compensate_queue = compensate_queue
# 全局坐标补偿变量
increments_x = 0
increments_y = 0
increments_z = 0
Coordinate_compensation = [0, 0, 0, 0, 0, 0]
# 预期的帧长度,可根据实际情况修改
EXPECTED_LENGTH = 7
global sample_time
global sleep_time
move_speed = 100
# 前瞻时间
lookahead_time = 400
# 调整之后的位姿
Pose_after_adjustment = [0, 0, 0, 0, 0, 0]
# 运动速度调整、z方向初始位置调整
# speed_adjustment = serial_handler.speed_adjustment
# Trajectory_angle = serial_handler.Trajectory_angle
velocity_coefficient = 100
z_positionadjustment = -200 # 轨迹整体调整
# 初始化全局共享数据实例
serial_shared = SerialSharedData()
# 轨迹插值点数
Interpolation_points = 20
# 第一段程序封装为函数
# 第二段程序封装为函数(上升喷涂)
# 第三段程序封装为函数
# 第四段程序封装为函数(上升喷涂)
# 第五段程序封装为函数
# 第六段程序封装为函数(下降喷涂)
# 第七段程序封装为函数
# 第八段程序封装为函数(下降喷涂)
# 第九段程序封装为函数
# 第十二段程序封装为函数
def program1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_1) > 0 and len(Pose_1[0]) == 6:
Path_First_Position = Pose_1[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_1:
# 确保每行是6个数值(与原文件的逗号分割后逻辑一致)
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第一段程序执行完")
def program1_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第1_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_1[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points # 插值点数(包含起止点)
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第1_1段程序执行完")
def program2(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第二段程序开始执行")
# 遍历Pose_1,同时获取行号(enumerate的index从0开始)
for line_idx, line_list in enumerate(Pose_1):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1 # 关键:将枚举的索引转为从1开始的行号
# 核心逻辑:判断行号是否在13到143之间,修改第三列(索引为2,因为列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值:360.2 + ((204.75 + Trajectory_angle)/143) * 行号
new_third_value = 360.2 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改第三列(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= turn_on_relay_start and i <= turn_on_relay_end):
turn_on_relay()
print(f"打开喷枪2:{turn_on_relay_start} ~ {turn_on_relay_end}")
if(i >= turn_off_relay_start and i <= turn_off_relay_end):
turn_off_relay()
print(f"关闭喷枪2:{turn_off_relay_start} ~ {turn_off_relay_end}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第二段程序执行完")
def program3(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第三段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_2) > 0 and len(Pose_2[0]) == 6:
Path_First_Position = Pose_2[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_2:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第三段程序执行完")
def program3_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第3_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_2[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第3_1段程序执行完")
def program4(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第四段程序开始执行")
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
for line_idx, line_list in enumerate(Pose_2):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值
new_third_value = 372.19 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= ex_turn_on_relay_start and i <= ex_turn_on_relay_end):
turn_on_relay()
print(f"打开喷枪4:{ex_turn_on_relay_start} ~ {ex_turn_on_relay_end}")
if(i >= ex_turn_off_relay_start and i <= ex_turn_off_relay_end):
turn_off_relay()
print(f"关闭喷枪4:{ex_turn_off_relay_start} ~ {ex_turn_off_relay_end}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第四段程序执行完")
def program5(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第五段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_3) > 0 and len(Pose_3[0]) == 6:
Path_First_Position = Pose_3[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_3:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第五段程序执行完")
def program5_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第5_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_3[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第5_1段程序执行完")
def program6(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
for line_idx, line_list in enumerate(Pose_3):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值(注意是减法公式)
4 weeks ago
new_third_value = 576.94 - ((204.75 - serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= turn_on_relay_start and i <= turn_on_relay_end):
turn_on_relay()
print(f"打开喷枪6:{turn_on_relay_start} ~ {turn_on_relay_end}")
if(i >= turn_off_relay_start and i <= turn_off_relay_end):
turn_off_relay()
print(f"关闭喷枪6:{turn_off_relay_start} ~ {turn_off_relay_end}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第六段程序执行完")
def program7(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_4) > 0 and len(Pose_4[0]) == 6:
Path_First_Position = Pose_4[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_4:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第七段程序执行完")
def program7_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
print("第7_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_4[0]
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
# 仅对第三列进行线性插值,其余列保持起点值不变
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第7_1段程序执行完")
def program8(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end, ex_turn_on_relay_start, ex_turn_on_relay_end, ex_turn_off_relay_start, ex_turn_off_relay_end):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 遍历Pose_4,用enumerate获取行索引(从0开始)
for line_idx, line_list in enumerate(Pose_4):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值(注意是减法公式)
4 weeks ago
new_third_value = 564.95 - ((204.75 - serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= ex_turn_on_relay_start and i <= ex_turn_on_relay_end):
turn_on_relay()
print(f"打开喷枪8:{ex_turn_on_relay_start} ~ {ex_turn_on_relay_end}")
if(i >= ex_turn_off_relay_start and i <= ex_turn_off_relay_end):
turn_off_relay()
print(f"关闭喷枪8:{ex_turn_off_relay_start} ~ {ex_turn_off_relay_end}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第八段程序执行完")
def program9(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None # 初始化参考位置
new_joint_pos = None # 初始化逆解得到的新关节位置
pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_5.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt'
fo = open(file_name, "r")
while 1:
# 读取一行内容
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第九段程序执行完")
def program10(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_6.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十段程序执行完")
def program11(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_7.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock) # 等待机器人停止
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十一段程序执行完")
def program12(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_8.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_1.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_8.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.02)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十二段程序执行完")