You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1249 lines
53 KiB

import sys
import copy
from queue import Queue
from ETController import send_Point, sendCMD, wait_stop
from gpio import turn_off_relay, turn_on_relay
from serial_handler import read_cmd_from_shared
# from program_config import (
# PRESET_PARAMS, turn_off_relay_start,
# turn_on_relay_start
# )
from serial_init import SerialSharedData
from trajectory_data import (
Pose_1, Pose_2, Pose_3, Pose_4, Pose_3_4, Pose_4_3,
joint_positions_assembled_1, joint_positions_assembled_2,
joint_positions_assembled_3, joint_positions_assembled_4,
Pose_1_2, Pose_2_1,
)
import time
# 导入串口处理模块
import serial_handler
# 采样时间、延时时间和衔接机器人速度
SAMP_TIME = 20
SLP_TIME = 0.02
# 补偿值更新队列(解耦线程通信,避免锁竞争)
compensate_queue = Queue(maxsize=1) # 最多缓存1个最新值,确保数据时效性
# 给串口模块赋值队列(新增,关键!)
serial_handler.compensate_queue = compensate_queue
# 全局坐标补偿变量
increments_x = 0
increments_y = 0
increments_z = 0
Coordinate_compensation = [0, 0, 0, 0, 0, 0]
# 预期的帧长度,可根据实际情况修改
EXPECTED_LENGTH = 7
global sample_time
global sleep_time
move_speed = 100
# 前瞻时间
lookahead_time = 400
# 调整之后的位姿
Pose_after_adjustment = [0, 0, 0, 0, 0, 0]
# 运动速度调整、z方向初始位置调整
# speed_adjustment = serial_handler.speed_adjustment
# Trajectory_angle = serial_handler.Trajectory_angle
velocity_coefficient = 100
z_positionadjustment = -200 # 轨迹整体调整
# 初始化全局共享数据实例
serial_shared = SerialSharedData()
# 轨迹插值点数
Interpolation_points = 20
# 第一段程序封装为函数
# 第二段程序封装为函数(上升喷涂)
# 第三段程序封装为函数
# 第四段程序封装为函数(上升喷涂)
# 第五段程序封装为函数
# 第六段程序封装为函数(下降喷涂)
# 第七段程序封装为函数
# 第八段程序封装为函数(下降喷涂)
# 第九段程序封装为函数
# 第十二段程序封装为函数
def program1(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_1) > 0 and len(Pose_1[0]) == 6:
Path_First_Position = Pose_1[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_1:
# 确保每行是6个数值(与原文件的逗号分割后逻辑一致)
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第一段程序执行完")
def program1_1(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第1_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_1[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points # 插值点数(包含起止点)
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第1_1段程序执行完")
def program2(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第二段程序开始执行")
# 遍历Pose_1,同时获取行号(enumerate的index从0开始)
for line_idx, line_list in enumerate(Pose_1):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1 # 关键:将枚举的索引转为从1开始的行号
# 核心逻辑:判断行号是否在13到143之间,修改第三列(索引为2,因为列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值:360.2 + ((204.75 + Trajectory_angle)/143) * 行号
new_third_value = 360.2 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改第三列(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= turn_on_relay_start):
turn_on_relay()
print(f"打开喷枪2:{turn_on_relay_start}")
if(i >= turn_off_relay_start):
turn_off_relay()
print(f"关闭喷枪2:{turn_off_relay_start}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第二段程序执行完")
def program3(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第三段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_2) > 0 and len(Pose_2[0]) == 6:
Path_First_Position = Pose_2[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_2:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第三段程序执行完")
def program3_1(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第3_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
# 从队列获取最新补偿值(非阻塞,无新值则使用当前值)
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_2[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第3_1段程序执行完")
def program4(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第四段程序开始执行")
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
for line_idx, line_list in enumerate(Pose_2):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值
new_third_value = 372.19 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= ex_turn_on_relay_start):
turn_on_relay()
print(f"打开喷枪4:{ex_turn_on_relay_start}")
if(i >= ex_turn_off_relay_start):
turn_off_relay()
print(f"关闭喷枪4:{ex_turn_off_relay_start}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第四段程序执行完")
def program5(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第五段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_3) > 0 and len(Pose_3[0]) == 6:
Path_First_Position = Pose_3[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_3:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第五段程序执行完")
def program5_1(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第5_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_3[0].copy()
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第5_1段程序执行完")
def program6(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
for line_idx, line_list in enumerate(Pose_3):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值(注意是减法公式)
new_third_value = 576.94 - ((204.75 - serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= turn_on_relay_start):
turn_on_relay()
print(f"打开喷枪6:{turn_on_relay_start}")
if(i >= turn_off_relay_start):
turn_off_relay()
print(f"关闭喷枪6:{turn_off_relay_start}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第六段程序执行完")
def program7(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
reference_pos = None
new_joint_pos = None
if len(Pose_4) > 0 and len(Pose_4[0]) == 6:
Path_First_Position = Pose_4[0] # 获取第一行数据
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)]
target_pose = Modified_First_Position
else:
return
for line_list in joint_positions_assembled_4:
if len(line_list) != 6:
continue
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1)
wait_stop(sock)
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第七段程序执行完")
def program7_1(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
print("第7_1段程序开始执行")
i = 0
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
turn_off_relay()
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
sample_time = SAMP_TIME
sleep_time = sample_time*0.001
lookahead_time = sample_time * Interpolation_points
if not compensate_queue.empty():
increments_x, increments_y, increments_z = compensate_queue.get_nowait()
increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0]
Coordinate_compensation = increments
# 获取插值起点和插值终点
start_pose = Pose_after_adjustment
end_pose = Pose_4[0]
# 计算带补偿的终点位姿(最终插值要到达的目标)
end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)]
interpolate_points = Interpolation_points
interpolated_poses = []
# 仅对第三列进行线性插值,其余列保持起点值不变
for idx in range(interpolate_points):
# 计算插值权重(0→1,对应从起点第三列到终点第三列)
t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0
# 组装当前插值位姿:仅更新第三列,其余列沿用起点值
current_pose = start_pose.copy()
# 第三列:从起点值 线性插值到 终点补偿值的第三列
current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t
# 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重)
current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t
current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t
# 添加到插值结果列表
interpolated_poses.append(current_pose)
# print("\n=== 插值后的位姿列表(逐行)===")
# for i, pose in enumerate(interpolated_poses):
# print(f"第 {i+1} 个插值点: {pose}")
# 遍历插值结果,逐个发送位姿(保留你原有发送逻辑)
for modified_list in interpolated_poses:
# 初始化透传
if i == 0:
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time,
"t": sample_time,
"smoothness": 1,
"response_enable": 0
})
# 发送当前插值得到的位姿
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第7_1段程序执行完")
def program8(sock, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start):
global increments_x, increments_y, increments_z
global Coordinate_compensation
global sample_time
global sleep_time
global lookahead_time
global Pose_after_adjustment
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
lookahead_time = 400
i = 0
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
# 遍历Pose_4,用enumerate获取行索引(从0开始)
for line_idx, line_list in enumerate(Pose_4):
if len(line_list) != 6:
continue
actual_line_num = line_idx + 1
# 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始)
if 13 <= actual_line_num <= 143:
# 计算新的第三列值(注意是减法公式)
new_third_value = 564.95 - ((204.75 - serial_handler.Trajectory_angle) / 131) * (actual_line_num-12)
# 复制原列表并修改(避免修改原列表的引用)
line_list = line_list.copy()
line_list[2] = new_third_value # 第三列对应索引2
if actual_line_num == 143:
row_143_third_value = new_third_value
elif actual_line_num > 143 and row_143_third_value is not None:
# 行号大于143时,用143行的第三列值替换
line_list = line_list.copy()
line_list[2] = row_143_third_value
increments = Coordinate_compensation
modified_list = [val + inc for val, inc in zip(line_list, increments)]
Pose_after_adjustment = modified_list
# print(Pose_after_adjustment)
if (i == 0):
suc, result, id = sendCMD(sock, "transparent_transmission_init", {
"lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0})
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list})
time.sleep(sleep_time)
if(i >= ex_turn_on_relay_start):
turn_on_relay()
print(f"打开喷枪8:{ex_turn_on_relay_start}")
if(i >= ex_turn_off_relay_start):
turn_off_relay()
print(f"关闭喷枪8:{ex_turn_off_relay_start}")
i = i + 1
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
print("第八段程序执行完")
def program9(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None # 初始化参考位置
new_joint_pos = None # 初始化逆解得到的新关节位置
pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_5.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt'
fo = open(file_name, "r")
while 1:
# 读取一行内容
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第九段程序执行完")
def program10(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_6.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十段程序执行完")
def program11(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_7.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic",
{"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list # 失败时使用原位置
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock) # 等待机器人停止
time.sleep(0.01)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十一段程序执行完")
def program12(sock):
start_time = time.time()
i = 0
increments = [0, 0, 0, 0, 0, 0]
reference_pos = None
new_joint_pos = None
pose_file = open('/home/raspberrypi/robot1/Pose_8.txt', "r")
first_pose_line = pose_file.readline().strip()
if first_pose_line:
Path_First_Position = list(map(float, first_pose_line.split(',')))
Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)]
target_pose = Modified_First_Position
else:
#print("Pose_1.txt为空,无法获取初始位姿")
pose_file.close()
return
pose_file.close()
file_name = '/home/raspberrypi/robot1/joint_positions_assembled_8.txt'
fo = open(file_name, "r")
while 1:
line = fo.readline()
if not line:
break
line_list = line.strip()
line_list = list(map(float, line_list.split(',')))
if reference_pos is None:
reference_pos = line_list
suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos})
if suc and angle is not None:
new_joint_pos = angle
else:
#print("逆解计算失败,使用原有关节位置")
new_joint_pos = line_list
modified_list = new_joint_pos if new_joint_pos is not None else line_list
if (i == 0):
while True:
suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100})
if suc:
break
print("moveByJoint命令发送失败,重试...")
time.sleep(0.1) # 重试间隔,可根据需求调整
wait_stop(sock)
time.sleep(0.02)
i = i + 1
fo.close()
suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf")
end_time = time.time()
print("第十二段程序执行完")
def program100(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1):
global sample_time, lookahead_time
lookahead_time = 400 # 前瞻时间 (ms)
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
#0.点位平移操作
# 1.假设 Off_Set_Value 已由外部传入(示例值)
Actual_value_mm=133*sleep_time*80
Comp_value_mm = 204.75 - Actual_value_mm
Off_Set_Value = serial_handler.Trajectory_angle + Comp_value_mm# 示例值,实际由外部传入
# 2.深拷贝原始轨迹(避免修改原数据)
Pose_3_M = copy.deepcopy(Pose_3)
Pose_4_M = copy.deepcopy(Pose_4)
# 3.修改 Pose_3_M 和 Pose_4_M 的第12到144行(MATLAB索引12~144 => Python索引11~143)
# 注意:共 133 行(144-12+1=133),但范围是 12:144 共133个点
for i in range(11, 144): # i = 11 ~ 143 对应 MATLAB 第12~144行
factor = (i - 11) / 133.0 # MATLAB中 (i-12)/133,i从12开始,对应Python i=11时因子0
Pose_3_M[i][2] = Pose_3[i][2] + Off_Set_Value * factor
Pose_4_M[i][2] = Pose_4[i][2] + Off_Set_Value * factor
# 修改第145行到末尾(MATLAB索引145:end => Python索引144:)
for i in range(144, len(Pose_3_M)):
Pose_3_M[i][2] = Pose_3[i][2] + Off_Set_Value
Pose_4_M[i][2] = Pose_4[i][2] + Off_Set_Value
# 4.处理 Pose_3_4 和 Pose_4_3
num = len(Pose_3_4) # 假设两者长度相同
Pose_3_4_M = copy.deepcopy(Pose_3_4)
Pose_4_3_M = copy.deepcopy(Pose_4_3)
for i in range(num):
# MATLAB 中 i 从 1 开始,公式: -Off_Set_Value/num*i + Off_Set_Value
# Python 中 i 从 0 开始,所以因子为 (i+1)
factor = (i + 1) / num
Pose_3_4_M[i][2] = Pose_3_4[i][2] - Off_Set_Value * factor + Off_Set_Value
Pose_4_3_M[i][2] = Pose_4_3[i][2] - Off_Set_Value * factor + Off_Set_Value
Pose_3_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_3_M]
Pose_4_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_4_M]
Pose_3_4_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_3_4_M]
Pose_4_3_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_4_3_M]
#0.延时操作,点位延时
#1.添加点位延时
NUM = int(delay * 1000 / sample_time)
NUM1 = int(delay1 * 1000 / sample_time)
# 2.第一道喷涂延时点位
last_point_3_4 = Pose_3_4_M[-1].copy() # 先复制最后一个点位
Pose_3_4_Stop = [last_point_3_4.copy() for _ in range(NUM)] # 每个点位独立副本
# 3.第2道喷涂延时点位
last_point_4_3 = Pose_4_3_M[-1].copy()
Pose_4_3_Stop = [last_point_4_3.copy() for _ in range(NUM1)]
trajectories = [Pose_3_M, Pose_3_4_M,Pose_3_4_Stop, Pose_4_M, Pose_4_3_M,Pose_4_3_Stop]
# ----- 一次性清理和初始化透传服务 -----
sendCMD(sock, "tt_clear_servo_joint_buf")
time.sleep(0.05)
#0.喷枪开关控制,开喷枪时间最小为0,最大为155;Pose_4_3_M及Pose_3_4_M中各有20组数据
#每道喷涂指令有155个点,
# 第1道开枪时间在第10个点位(Pose_3_M的开枪点位 lookahead_time/sample_time+10),关枪时间在第130点位示例(lookahead_time/sample_time+135)
# 第2道开枪时间在第15个点位(Pose_4_M的开枪点位 lookahead_time/sample_time+15),,关枪时间在第150点位示例(lookahead_time/sample_time+135,,,,如果数据>155,那么关枪就在Pose_4_3_M中)
#队列,延时时间计算
off_set_Time=int(lookahead_time / sample_time)
# 1.喷枪1开关时间计算
OPEN_1=turn_on_relay_start
CLOSE_1=turn_off_relay_start
OPEN_1_M = off_set_Time+OPEN_1
CLOSE_1_M1S=off_set_Time+CLOSE_1
CLOSE_1_M1E = off_set_Time + CLOSE_1-155
#2.喷枪2开关时间计算
OPEN_2=ex_turn_on_relay_start
CLOSE_2 = ex_turn_off_relay_start
OPEN_2_M = off_set_Time+OPEN_2
CLOSE_2_M2S = off_set_Time+CLOSE_2
CLOSE_2_M2E = off_set_Time + CLOSE_2- 155
sendCMD(sock, "transparent_transmission_init", {"lookahead": lookahead_time,"t": sample_time,"smoothness": 1,"response_enable": 0 })
cycles = 100 # 循环5次
for _ in range(cycles):
for traj in trajectories:
print("开始执行轨迹...")
# 针对不同轨迹进行计数
if traj is Pose_3_M:
for idx, point in enumerate(traj, start=1):
if idx == OPEN_1_M:
turn_on_relay()
if idx == CLOSE_1_M1S:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_3_4_M:
for idx, point in enumerate(traj, start=1):
if idx == CLOSE_1_M1E:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_4_M:
for idx, point in enumerate(traj, start=1):
if idx == OPEN_2_M:
turn_on_relay()
if idx == CLOSE_2_M2S:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_4_3_M:
for idx, point in enumerate(traj, start=1):
if idx == CLOSE_2_M2E:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
else:
# 其他轨迹(如 Stop 轨迹)无需喷枪控制
for point in traj:
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
print(f"停止轨迹,NUM = {NUM}, delay = {delay}")
stop_cmd_bytes = read_cmd_from_shared(timeout=0.1)
if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30':
print("收到0xAA 0x30,停止循环")
for i in range(3):
ser.write(b'\x80\x80')
ser.flush()
time.sleep(0.01)
time.sleep(0.5)
return False
print("轨迹执行完成")
return False
def program101(sock, ser, turn_on_relay_start, turn_off_relay_start, ex_turn_on_relay_start, ex_turn_off_relay_start, delay, delay1):
global sample_time, lookahead_time
lookahead_time = 400 # 前瞻时间 (ms)
sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment)
sleep_time = sample_time*0.001
#0.点位平移操作
# 1.假设 Off_Set_Value 已由外部传入(示例值)
Actual_value_mm=133*sleep_time*80
Comp_value_mm = 204.75 - Actual_value_mm
Off_Set_Value = serial_handler.Trajectory_angle + Comp_value_mm # 示例值,实际由外部传入
# 这里的喷涂方向与program100相反
Off_Set_Value = -Off_Set_Value
# 2.深拷贝原始轨迹(避免修改原数据)
Pose_3_M = copy.deepcopy(Pose_1)
Pose_4_M = copy.deepcopy(Pose_2)
# 3.修改 Pose_3_M 和 Pose_4_M 的第12到144行(MATLAB索引12~144 => Python索引11~143)
# 注意:共 133 行(144-12+1=133),但范围是 12:144 共133个点
for i in range(11, 144): # i = 11 ~ 143 对应 MATLAB 第12~144行
factor = (i - 11) / 133.0 # MATLAB中 (i-12)/133,i从12开始,对应Python i=11时因子0
Pose_3_M[i][2] = Pose_1[i][2] + Off_Set_Value * factor
Pose_4_M[i][2] = Pose_2[i][2] + Off_Set_Value * factor
# 修改第145行到末尾(MATLAB索引145:end => Python索引144:)
for i in range(144, len(Pose_3_M)):
Pose_3_M[i][2] = Pose_1[i][2] + Off_Set_Value
Pose_4_M[i][2] = Pose_2[i][2] + Off_Set_Value
# 4.处理 Pose_1_2 和 Pose_2_1
num = len(Pose_1_2) # 假设两者长度相同
Pose_3_4_M = copy.deepcopy(Pose_1_2)
Pose_4_3_M = copy.deepcopy(Pose_2_1)
for i in range(num):
# MATLAB 中 i 从 1 开始,公式: -Off_Set_Value/num*i + Off_Set_Value
# Python 中 i 从 0 开始,所以因子为 (i+1)
factor = (i + 1) / num
Pose_3_4_M[i][2] = Pose_1_2[i][2] - Off_Set_Value * factor + Off_Set_Value
Pose_4_3_M[i][2] = Pose_2_1[i][2] - Off_Set_Value * factor + Off_Set_Value
Pose_3_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_3_M]
Pose_4_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_4_M]
Pose_3_4_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_3_4_M]
Pose_4_3_M = [[p[0], p[1], p[2] - 200] + p[3:] for p in Pose_4_3_M]
#0.延时操作,点位延时
#1.添加点位延时
NUM = int(delay * 1000 / sample_time)
NUM1 = int(delay1 * 1000 / sample_time)
# 2.第一道喷涂延时点位
last_point_3_4 = Pose_3_4_M[-1].copy() # 先复制最后一个点位
Pose_3_4_Stop = [last_point_3_4.copy() for _ in range(NUM)] # 每个点位独立副本
# 3.第2道喷涂延时点位
last_point_4_3 = Pose_4_3_M[-1].copy()
Pose_4_3_Stop = [last_point_4_3.copy() for _ in range(NUM1)]
trajectories = [Pose_3_M, Pose_3_4_M,Pose_3_4_Stop, Pose_4_M, Pose_4_3_M,Pose_4_3_Stop]
# ----- 一次性清理和初始化透传服务 -----
sendCMD(sock, "tt_clear_servo_joint_buf")
time.sleep(0.05)
#0.喷枪开关控制,开喷枪时间最小为0,最大为155;Pose_4_3_M及Pose_3_4_M中各有20组数据
#每道喷涂指令有155个点,
# 第1道开枪时间在第10个点位(Pose_3_M的开枪点位 lookahead_time/sample_time+10),关枪时间在第130点位示例(lookahead_time/sample_time+135)
# 第2道开枪时间在第15个点位(Pose_4_M的开枪点位 lookahead_time/sample_time+15),,关枪时间在第150点位示例(lookahead_time/sample_time+135,,,,如果数据>155,那么关枪就在Pose_4_3_M中)
#队列,延时时间计算
off_set_Time=int(lookahead_time / sample_time)
# 1.喷枪1开关时间计算
OPEN_1=turn_on_relay_start
CLOSE_1=turn_off_relay_start
OPEN_1_M = off_set_Time+OPEN_1
CLOSE_1_M1S=off_set_Time+CLOSE_1
CLOSE_1_M1E = off_set_Time + CLOSE_1-155
#2.喷枪2开关时间计算
OPEN_2=ex_turn_on_relay_start
CLOSE_2 = ex_turn_off_relay_start
OPEN_2_M = off_set_Time+OPEN_2
CLOSE_2_M2S = off_set_Time+CLOSE_2
CLOSE_2_M2E = off_set_Time + CLOSE_2- 155
sendCMD(sock, "transparent_transmission_init", {"lookahead": lookahead_time,"t": sample_time,"smoothness": 1,"response_enable": 0 })
cycles = 100 # 循环5次s
for _ in range(cycles):
for traj in trajectories:
print("开始执行轨迹...")
# 针对不同轨迹进行计数
if traj is Pose_3_M:
for idx, point in enumerate(traj, start=1):
if idx == OPEN_1_M:
turn_on_relay()
if idx == CLOSE_1_M1S:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_3_4_M:
for idx, point in enumerate(traj, start=1):
if idx == CLOSE_1_M1E:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_4_M:
for idx, point in enumerate(traj, start=1):
if idx == OPEN_2_M:
turn_on_relay()
if idx == CLOSE_2_M2S:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
elif traj is Pose_4_3_M:
for idx, point in enumerate(traj, start=1):
if idx == CLOSE_2_M2E:
turn_off_relay()
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
else:
# 其他轨迹(如 Stop 轨迹)无需喷枪控制
for point in traj:
send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": point})
time.sleep(sleep_time)
print(f"停止轨迹,NUM = {NUM}, delay = {delay}")
stop_cmd_bytes = read_cmd_from_shared(timeout=0.1)
if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30':
print("收到0xAA 0x30,停止循环")
for i in range(3):
ser.write(b'\x80\x80')
ser.flush()
time.sleep(0.01)
time.sleep(0.5)
return False
print("轨迹执行完成")
return False