from queue import Queue from ETController import send_Point, sendCMD, wait_stop from gpio import turn_off_relay, turn_on_relay # from program_config import ( # PRESET_PARAMS, turn_off_relay_end, turn_off_relay_start, # turn_on_relay_end, turn_on_relay_start # ) from serial_init import SerialSharedData from trajectory_data import ( Pose_1, Pose_2, Pose_3, Pose_4, joint_positions_assembled_1, joint_positions_assembled_2, joint_positions_assembled_3, joint_positions_assembled_4 ) import time # 导入串口处理模块 import serial_handler # 采样时间、延时时间和衔接机器人速度 SAMP_TIME = 20 SLP_TIME = 0.02 # 补偿值更新队列(解耦线程通信,避免锁竞争) compensate_queue = Queue(maxsize=1) # 最多缓存1个最新值,确保数据时效性 # 给串口模块赋值队列(新增,关键!) serial_handler.compensate_queue = compensate_queue # 全局坐标补偿变量 increments_x = 0 increments_y = 0 increments_z = 0 Coordinate_compensation = [0, 0, 0, 0, 0, 0] # 预期的帧长度,可根据实际情况修改 EXPECTED_LENGTH = 7 global sample_time global sleep_time move_speed = 100 # 前瞻时间 lookahead_time = 400 # 调整之后的位姿 Pose_after_adjustment = [0, 0, 0, 0, 0, 0] # 运动速度调整、z方向初始位置调整 # speed_adjustment = serial_handler.speed_adjustment # Trajectory_angle = serial_handler.Trajectory_angle velocity_coefficient = 100 z_positionadjustment = -200 # 轨迹整体调整 # 初始化全局共享数据实例 serial_shared = SerialSharedData() # 轨迹插值点数 Interpolation_points = 20 # 第一段程序封装为函数 # 第二段程序封装为函数(上升喷涂) # 第三段程序封装为函数 # 第四段程序封装为函数(上升喷涂) # 第五段程序封装为函数 # 第六段程序封装为函数(下降喷涂) # 第七段程序封装为函数 # 第八段程序封装为函数(下降喷涂) # 第九段程序封装为函数 # 第十二段程序封装为函数 def program1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None if len(Pose_1) > 0 and len(Pose_1[0]) == 6: Path_First_Position = Pose_1[0] # 获取第一行数据 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: return for line_list in joint_positions_assembled_1: # 确保每行是6个数值(与原文件的逗号分割后逻辑一致) if len(line_list) != 6: continue if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第一段程序执行完") def program1_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第1_1段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time sample_time = SAMP_TIME sleep_time = sample_time*0.001 lookahead_time = sample_time * Interpolation_points # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments # 获取插值起点和插值终点 start_pose = Pose_after_adjustment end_pose = Pose_1[0].copy() # 计算带补偿的终点位姿(最终插值要到达的目标) end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)] interpolate_points = Interpolation_points # 插值点数(包含起止点) interpolated_poses = [] for idx in range(interpolate_points): # 计算插值权重(0→1,对应从起点第三列到终点第三列) t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0 # 组装当前插值位姿:仅更新第三列,其余列沿用起点值 current_pose = start_pose.copy() # 第三列:从起点值 线性插值到 终点补偿值的第三列 current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t # 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重) current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t # 添加到插值结果列表 interpolated_poses.append(current_pose) # print("\n=== 插值后的位姿列表(逐行)===") # for i, pose in enumerate(interpolated_poses): # print(f"第 {i+1} 个插值点: {pose}") # 遍历插值结果,逐个发送位姿(保留你原有发送逻辑) for modified_list in interpolated_poses: # 初始化透传 if i == 0: suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0 }) # 发送当前插值得到的位姿 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第1_1段程序执行完") def program2(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time global Pose_after_adjustment lookahead_time = 400 i = 0 sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第二段程序开始执行") # 遍历Pose_1,同时获取行号(enumerate的index从0开始) for line_idx, line_list in enumerate(Pose_1): if len(line_list) != 6: continue actual_line_num = line_idx + 1 # 关键:将枚举的索引转为从1开始的行号 # 核心逻辑:判断行号是否在13到143之间,修改第三列(索引为2,因为列表从0开始) if 13 <= actual_line_num <= 143: # 计算新的第三列值:360.2 + ((204.75 + Trajectory_angle)/143) * 行号 new_third_value = 360.2 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12) # 复制原列表并修改第三列(避免修改原列表的引用) line_list = line_list.copy() line_list[2] = new_third_value # 第三列对应索引2 if actual_line_num == 143: row_143_third_value = new_third_value elif actual_line_num > 143 and row_143_third_value is not None: # 行号大于143时,用143行的第三列值替换 line_list = line_list.copy() line_list[2] = row_143_third_value increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] Pose_after_adjustment = modified_list # print(Pose_after_adjustment) if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪") if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪") i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第二段程序执行完") def program3(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第三段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None if len(Pose_2) > 0 and len(Pose_2[0]) == 6: Path_First_Position = Pose_2[0] # 获取第一行数据 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: return for line_list in joint_positions_assembled_2: if len(line_list) != 6: continue if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第三段程序执行完") def program3_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第3_1段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time sample_time = SAMP_TIME sleep_time = sample_time*0.001 lookahead_time = sample_time * Interpolation_points # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments # 获取插值起点和插值终点 start_pose = Pose_after_adjustment end_pose = Pose_2[0].copy() # 计算带补偿的终点位姿(最终插值要到达的目标) end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)] interpolate_points = Interpolation_points interpolated_poses = [] for idx in range(interpolate_points): # 计算插值权重(0→1,对应从起点第三列到终点第三列) t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0 # 组装当前插值位姿:仅更新第三列,其余列沿用起点值 current_pose = start_pose.copy() # 第三列:从起点值 线性插值到 终点补偿值的第三列 current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t # 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重) current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t # 添加到插值结果列表 interpolated_poses.append(current_pose) # print("\n=== 插值后的位姿列表(逐行)===") # for i, pose in enumerate(interpolated_poses): # print(f"第 {i+1} 个插值点: {pose}") # 遍历插值结果,逐个发送位姿(保留你原有发送逻辑) for modified_list in interpolated_poses: # 初始化透传 if i == 0: suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0 }) # 发送当前插值得到的位姿 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第3_1段程序执行完") def program4(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第四段程序开始执行") global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time global Pose_after_adjustment lookahead_time = 400 i = 0 sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") for line_idx, line_list in enumerate(Pose_2): if len(line_list) != 6: continue actual_line_num = line_idx + 1 # 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始) if 13 <= actual_line_num <= 143: # 计算新的第三列值 new_third_value = 372.19 + ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12) # 复制原列表并修改(避免修改原列表的引用) line_list = line_list.copy() line_list[2] = new_third_value # 第三列对应索引2 if actual_line_num == 143: row_143_third_value = new_third_value elif actual_line_num > 143 and row_143_third_value is not None: # 行号大于143时,用143行的第三列值替换 line_list = line_list.copy() line_list[2] = row_143_third_value increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] Pose_after_adjustment = modified_list # print(Pose_after_adjustment) if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪") if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪") i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第四段程序执行完") def program5(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第五段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None if len(Pose_3) > 0 and len(Pose_3[0]) == 6: Path_First_Position = Pose_3[0] # 获取第一行数据 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: return for line_list in joint_positions_assembled_3: if len(line_list) != 6: continue if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第五段程序执行完") def program5_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第5_1段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time sample_time = SAMP_TIME sleep_time = sample_time*0.001 lookahead_time = sample_time * Interpolation_points if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments # 获取插值起点和插值终点 start_pose = Pose_after_adjustment end_pose = Pose_3[0].copy() # 计算带补偿的终点位姿(最终插值要到达的目标) end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)] interpolate_points = Interpolation_points interpolated_poses = [] for idx in range(interpolate_points): # 计算插值权重(0→1,对应从起点第三列到终点第三列) t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0 # 组装当前插值位姿:仅更新第三列,其余列沿用起点值 current_pose = start_pose.copy() # 第三列:从起点值 线性插值到 终点补偿值的第三列 current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t # 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重) current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t # 添加到插值结果列表 interpolated_poses.append(current_pose) # print("\n=== 插值后的位姿列表(逐行)===") # for i, pose in enumerate(interpolated_poses): # print(f"第 {i+1} 个插值点: {pose}") # 遍历插值结果,逐个发送位姿(保留你原有发送逻辑) for modified_list in interpolated_poses: # 初始化透传 if i == 0: suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0 }) # 发送当前插值得到的位姿 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第5_1段程序执行完") def program6(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time global Pose_after_adjustment suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") lookahead_time = 400 i = 0 sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 for line_idx, line_list in enumerate(Pose_3): if len(line_list) != 6: continue actual_line_num = line_idx + 1 # 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始) if 13 <= actual_line_num <= 143: # 计算新的第三列值(注意是减法公式) new_third_value = 576.94 - ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12) # 复制原列表并修改(避免修改原列表的引用) line_list = line_list.copy() line_list[2] = new_third_value # 第三列对应索引2 if actual_line_num == 143: row_143_third_value = new_third_value elif actual_line_num > 143 and row_143_third_value is not None: # 行号大于143时,用143行的第三列值替换 line_list = line_list.copy() line_list[2] = row_143_third_value increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] Pose_after_adjustment = modified_list # print(Pose_after_adjustment) if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪") if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪") i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第六段程序执行完") def program7(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None if len(Pose_4) > 0 and len(Pose_4[0]) == 6: Path_First_Position = Pose_4[0] # 获取第一行数据 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: return for line_list in joint_positions_assembled_4: if len(line_list) != 6: continue if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第七段程序执行完") def program7_1(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): print("第7_1段程序开始执行") i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time sample_time = SAMP_TIME sleep_time = sample_time*0.001 lookahead_time = sample_time * Interpolation_points if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z+z_positionadjustment, 0, 0, 0] Coordinate_compensation = increments # 获取插值起点和插值终点 start_pose = Pose_after_adjustment end_pose = Pose_4[0] # 计算带补偿的终点位姿(最终插值要到达的目标) end_pose_with_comp = [end_pose[i] + Coordinate_compensation[i] for i in range(6)] interpolate_points = Interpolation_points interpolated_poses = [] # 仅对第三列进行线性插值,其余列保持起点值不变 for idx in range(interpolate_points): # 计算插值权重(0→1,对应从起点第三列到终点第三列) t = idx / (interpolate_points - 1) if interpolate_points > 1 else 1.0 # 组装当前插值位姿:仅更新第三列,其余列沿用起点值 current_pose = start_pose.copy() # 第三列:从起点值 线性插值到 终点补偿值的第三列 current_pose[2] = start_pose[2] * (1 - t) + end_pose_with_comp[2] * t # 前两列:同步从起点值 线性调整到 终点补偿值的前两列(与第三列同权重) current_pose[0] = start_pose[0] * (1 - t) + end_pose_with_comp[0] * t current_pose[1] = start_pose[1] * (1 - t) + end_pose_with_comp[1] * t # 添加到插值结果列表 interpolated_poses.append(current_pose) # print("\n=== 插值后的位姿列表(逐行)===") # for i, pose in enumerate(interpolated_poses): # print(f"第 {i+1} 个插值点: {pose}") # 遍历插值结果,逐个发送位姿(保留你原有发送逻辑) for modified_list in interpolated_poses: # 初始化透传 if i == 0: suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0 }) # 发送当前插值得到的位姿 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第7_1段程序执行完") def program8(sock, turn_on_relay_start, turn_on_relay_end, turn_off_relay_start, turn_off_relay_end): global increments_x, increments_y, increments_z global Coordinate_compensation global sample_time global sleep_time global lookahead_time global Pose_after_adjustment suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") lookahead_time = 400 i = 0 sample_time = (int)(SAMP_TIME*velocity_coefficient/serial_handler.speed_adjustment) sleep_time = sample_time*0.001 # 遍历Pose_4,用enumerate获取行索引(从0开始) for line_idx, line_list in enumerate(Pose_4): if len(line_list) != 6: continue actual_line_num = line_idx + 1 # 核心逻辑:判断行号是否在13-143之间,修改第三列(索引2,列表从0开始) if 13 <= actual_line_num <= 143: # 计算新的第三列值(注意是减法公式) new_third_value = 564.95 - ((204.75 + serial_handler.Trajectory_angle) / 131) * (actual_line_num-12) # 复制原列表并修改(避免修改原列表的引用) line_list = line_list.copy() line_list[2] = new_third_value # 第三列对应索引2 if actual_line_num == 143: row_143_third_value = new_third_value elif actual_line_num > 143 and row_143_third_value is not None: # 行号大于143时,用143行的第三列值替换 line_list = line_list.copy() line_list[2] = row_143_third_value increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] Pose_after_adjustment = modified_list # print(Pose_after_adjustment) if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪") if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪") i = i + 1 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("第八段程序执行完") def program9(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None # 初始化参考位置 new_joint_pos = None # 初始化逆解得到的新关节位置 pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_5.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt' fo = open(file_name, "r") while 1: # 读取一行内容 line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) # 重试间隔,可根据需求调整 wait_stop(sock) time.sleep(0.01) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第九段程序执行完") def program10(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_6.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) # 重试间隔,可根据需求调整 wait_stop(sock) time.sleep(0.01) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第十段程序执行完") def program11(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_7.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) # 重试间隔,可根据需求调整 wait_stop(sock) # 等待机器人停止 time.sleep(0.01) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第十一段程序执行完") def program12(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_8.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_8.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) # 重试间隔,可根据需求调整 wait_stop(sock) time.sleep(0.02) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第十二段程序执行完")