import socket import json import time import numpy as np import math import RPi.GPIO as GPIO import time import serial import struct # GPIO 代码 # 定义继电器引脚 EN_Relay = 17 #预期的帧长度,可根据实际情况修改 EXPECTED_LENGTH = 10 #采样时间、延时时间和衔接机器人速度 sample_time = 100 #50 sleep_time = 0.020#0.008 #0.005 move_speed = 75 #试运行速度 sample_time_test = 100 #3 sleep_time_test = 0.05 #0.003 move_speed_test = 20 #喷枪开启和关闭位置 turn_on_relay_start = 20 turn_on_relay_end = 25 turn_off_relay_start = 290 turn_off_relay_end = 295 # 定义转换矩阵R_T R_T = np.array([ [-0.7206, -0.6908, 0], [0.6884, -0.7231, 0], [0.0830, 0, 1] ]) def init_gpio(): """初始化GPIO设置""" GPIO.setmode(GPIO.BCM) GPIO.setup(EN_Relay, GPIO.OUT) # 初始化为高电平(关闭状态) GPIO.output(EN_Relay, GPIO.LOW) time.sleep(1) # 等待稳定 def turn_on_relay(): """打开继电器(设置为LOW)""" try: GPIO.output(EN_Relay, GPIO.HIGH) print('Relay turned on (LOW)') except Exception as e: print(f"Error turning on relay: {e}") def turn_off_relay(): """关闭继电器(设置为HIGH)""" try: GPIO.output(EN_Relay, GPIO.LOW) print('Relay turned off (HIGH)') except Exception as e: print(f"Error turning off relay: {e}") # 初始化串口 def serial_init(): """初始化串口""" try: ser = serial.Serial( port="/dev/ttyS0", baudrate=115200, timeout=0.1 # 读取超时时间 ) print("串口初始化成功") return ser except Exception as e: print(f"串口初始化失败: {e}") return None #串口接收 def serial_receive_data(ser, buffer): """接收数据并处理,增加严格的串口有效性检查""" # 先检查串口对象是否有效 if not ser: print("错误:串口对象未初始化") return buffer # 检查是否是有效的Serial实例 if not isinstance(ser, serial.Serial): print("错误:不是有效的Serial对象") return buffer # 检查串口是否处于打开状态 if not ser.is_open: print("错误:串口未打开") return buffer # 尝试读取串口数据,增加异常捕获 try: # 检查是否有等待读取的数据 if ser.in_waiting > 0: data = ser.read(ser.in_waiting) buffer += data # 处理缓冲区中的完整帧(根据需要调整) while len(buffer) >= EXPECTED_LENGTH: frame = buffer[:EXPECTED_LENGTH] buffer = buffer[EXPECTED_LENGTH:] print("data:", ' '.join(f'{b:02X}' for b in frame)) except AttributeError: print("错误:串口对象属性错误,可能已被关闭") except Exception as e: print(f"串口操作错误: {str(e)}") return buffer # 解析以0xFF为帧头的数据包 # 添加超时参数,默认1秒 def parse_frame(ser, timeout=1.0): """ 从串口接收并解析以0xFF为帧头的数据包: 帧结构: 0xFF(帧头) + x高8位 + x低8位 + y高8位 + y低8位 + z高8位 + z低8位 返回: (成功标志, x坐标, y坐标, z坐标),坐标为有符号整数 新增超时机制,超时后返回失败但允许程序继续执行 """ frame_buffer = b'' frame_header = 0xFF frame_length = 7 # 帧头1字节 + x(2字节) + y(2字节) + z(2字节) = 7字节 start_time = time.time() # 等待足够的数据或超时 while len(frame_buffer) < frame_length and (time.time() - start_time) < timeout: frame_buffer = serial_receive_data(ser, frame_buffer) time.sleep(0.01) # 超时检查 if (time.time() - start_time) >= timeout: print("解析0xFF帧头数据超时,将使用上次的补偿值继续执行") return (False, 0, 0, 0) # 查找帧头 header_index = frame_buffer.find(bytes([frame_header])) if header_index == -1: print("未找到帧头0xFF,解析失败,将使用上次的补偿值继续执行") return (False, 0, 0, 0) # 确保有足够的后续字节 if header_index + frame_length > len(frame_buffer): print("帧不完整,解析失败,将使用上次的补偿值继续执行") return (False, 0, 0, 0) # 提取完整帧 frame = frame_buffer[header_index:header_index+frame_length] # 解析x轴坐标(高8位 + 低8位)并转换为有符号整数 x_high = frame[1] x_low = frame[2] x_coord = (x_high << 8) | x_low # 处理16位有符号整数 if x_coord & 0x8000: # 最高位为1,表示负数 x_coord -= 0x10000 # 转换为补码 # 解析y轴坐标(高8位 + 低8位)并转换为有符号整数 y_high = frame[3] y_low = frame[4] y_coord = (y_high << 8) | y_low if y_coord & 0x8000: y_coord -= 0x10000 # 解析z轴坐标(高8位 + 低8位)并转换为有符号整数 z_high = frame[5] z_low = frame[6] z_coord = (z_high << 8) | z_low if z_coord & 0x8000: z_coord -= 0x10000 print(f"解析成功 - 帧头: 0x{frame_header:02X}, " f"x: 0x{x_high:02X}{x_low:02X}={x_coord}, " f"y: 0x{y_high:02X}{y_low:02X}={y_coord}, " f"z: 0x{z_high:02X}{z_low:02X}={z_coord}") return (True, x_coord, y_coord, z_coord) # 矩阵转换函数:将P_L1_O2通过R_T转换为P_L1_O1 def transform_data(x_coord, y_coord, z_coord): # 将接收到的3个坐标组成列矩阵P_L1_O2 P_L1_O2 = np.array([[x_coord], [y_coord], [z_coord]], dtype=np.float64) # 执行矩阵乘法:P_L1_O1 = R_T * P_L1_O2 P_L1_O1 = np.dot(R_T, P_L1_O2) # 提取转换后的三个数据 increments_x = P_L1_O1[0][0] increments_y = P_L1_O1[1][0] increments_z = P_L1_O1[2][0] print(f"矩阵转换完成 - increments_x: {increments_x:.4f}, increments_y: {increments_y:.4f}, increments_z: {increments_z:.4f}") return increments_x, increments_y, increments_z # 连接机器人控制器 def connectETController(ip, port=8055): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((ip, port)) return (True, sock) except Exception as e: sock.close() return (False,) # 发送命令并接收响应 def sendCMD(sock, cmd, params=None, id=1): if not params: params = [] else: params = json.dumps(params) sendStr = '{"method":"%s","params":%s,"jsonrpc":"2.0","id":%d}' % (cmd, params, id) + "\n" try: sock.sendall(bytes(sendStr, "utf-8")) ret = sock.recv(1024) jdata = json.loads(str(ret, "utf-8")) if "result" in jdata.keys(): return (True, json.loads(jdata["result"]), jdata["id"]) elif "error" in jdata.keys(): return (False, jdata["error"], jdata["id"]) else: return (False, None, None) except Exception as e: return (False, None, None) def send_Point(sock, cmd, params=None, id=1): if (not params): params = [] else: params = json.dumps(params) sendStr = "{{\"method\":\"{0}\",\"params\":{1},\"jsonrpc\":\"2.0\",\"id\":{2}}}".format(cmd, params, id) + "\n" sock.sendall(bytes(sendStr, "utf-8")) def wait_stop(sock): # 修正:添加sock参数 while True: time.sleep(0.1) ret1, result1, id1 = sendCMD(sock, "getRobotState") # getRobotstate if (ret1): if result1 == 0 or result1 == 4: break else: print("getRobotState failed") break #需要做8端程序, # 第1段,起点处的关节位置(txt);第2段,上升喷涂(txt); # 第3段,起点初的关节位置(txt);第4段,上升喷涂(txt); # 第5段,起点初的关节位置(txt);第6段,上升喷涂(txt); # 第7段,起点初的关节位置(txt);第8段,上升喷涂(txt); # 第一段程序封装为函数 def program1(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position else: print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第一段程序执行完") # 第二段程序封装为函数 def program2(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第二段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第二段程序执行完") # 第三段程序封装为函数,hjb改 def program3(sock): # 记录开始时间 start_time = time.time() i = 0 turn_off_relay() print("关闭喷枪") increments = [increments_x, increments_y, increments_z, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list #print(modified_list) if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed}) #print(result) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) time.sleep(sleep_time) i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第三段程序执行完") # 第四段程序封装为函数 def program4(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第四段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第四段程序执行完") # 第五段程序封装为函数,hjb改 def program5(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_3.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_3.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_3.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 # print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第五段程序执行完") # 第六段程序封装为函数 def program6(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第六段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_3.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第六段程序执行完") # 第七段程序封装为函数,hjb改 def program7(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_4.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_4.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_4.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第七段程序执行完") # 第八段程序封装为函数 def program8(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第八段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_4.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第八段程序执行完") # 第九段程序封装为函数,hjb改 def program9(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_5.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第九段程序执行完") def program10(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_6.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第十段程序执行完") def program11(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_7.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第十一段程序执行完") # 第十二段程序封装为函数 def program12(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position else: print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time_test) # 循环计数器加1,记录已处理的行数 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十二段程序执行完") # 第十三段程序封装为函数 def program13(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第二段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time_test) #if(i >= turn_on_relay_start and i <= turn_on_relay_end): #turn_on_relay() #print("打开喷枪")#开喷枪 #if(i >= turn_off_relay_start and i <= turn_off_relay_end): #t#urn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十三段程序执行完") # 第十四段程序封装为函数,hjb改 def program14(sock): # 记录开始时间 start_time = time.time() i = 0 turn_off_relay() print("关闭喷枪") increments = [increments_x, increments_y, increments_z, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list #print(modified_list) if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) time.sleep(1) suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) time.sleep(1) suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) #print(result) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十四段程序执行完") # 第十五段程序封装为函数 def program15(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第四段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time_test) #if(i >= turn_on_relay_start and i <= turn_on_relay_end): #turn_on_relay() #print("打开喷枪")#开喷枪 #if(i >= turn_off_relay_start and i <= turn_off_relay_end): #turn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十五段程序执行完") if __name__ == "__main__": init_gpio() ser = serial_init() # 严格检查串口初始化结果 if not ser or not isinstance(ser, serial.Serial) or not ser.is_open: print("串口初始化失败,无法继续运行") # 确保在函数内部返回 exit() # 数据缓冲区 buffer = b'' time.sleep(20) #time.sleep(20) # 机器人IP地址 robot_ip = "192.168.1.100" result = connectETController(robot_ip) sock = None if len(result) == 2: conSuc, sock = result if conSuc: print("成功连接到机器人") ##########################################################################################################################新添加 # 1.上电设置 ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) print("开始上电") time.sleep(20) print("开始检测上电状态") ret, result, id = sendCMD(sock, "get_robot_power_status") if result == 0: ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) time.sleep(20) else : print("已上电") # 2. 清除报警 print("清除报警") ret, result, id = sendCMD(sock,"clearAlarm") print("清除报警返回值") print(result) time.sleep(5) print("报警清除完成") # 获取同步状态 suc, result , id = sendCMD(sock, "getMotorStatus") if result == 0: # 同步伺服编码器数据 suc,result,id = sendCMD(sock, "syncMotorStatus") if result: print("同步伺服编码器数据成功") else: print("同步伺服编码器数据失败") time. sleep (2) # 3. 编码器零位校准 suc, result, id = sendCMD(sock, "getServoStatus") print("获取伺服状态返回值") print(result) time.sleep(2) if result == 0: # 设置机械臂伺服状态为“开”(status=1) suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) print("设置机械臂伺服状态返回值") print(result) time.sleep(5) suc, result, id = sendCMD(sock, "calibrate_encoder_zero_position") print("编码器校准返回值") print(result) time.sleep(5) if result == 1: print("编码器校准成功") else: print("编码器校准失败") ################################################################################################################################ # 初始化透传服务 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") if id == 1: print(suc, result, id) if result == 1: suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("已清空透传缓存") time.sleep(0.5) else: print("无缓存") # 在此处做检测等相关操作 suc, result, id = sendCMD(sock, "getRobotState") if (result == 4): # 清除报警 suc, result, id = sendCMD(sock, "clearAlarm") time.sleep(0.5) # 获取同步状态 suc, result, id = sendCMD(sock, "getMotorStatus") if (result == 0): # 同步伺服编码器数据 suc, result, id = sendCMD(sock, "syncMotorStatus") time.sleep(0.5) else: print("同步状态获取失败") # 获取机械臂伺服状态 suc, result, id = sendCMD(sock, "getServoStatus") if (result == 0): # 设置机械臂伺服状态ON suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) time.sleep(1) else: print("机械臂伺服取失败") # 获取当前机器人是否处于透传状态 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") print(result) if (result == 1): # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0.5) else: print("非缓存状态") time.sleep(0.01) print("初始化完成,等待指令...") # 用字典模拟switch-case逻辑 program_switch = { 1: program1, 2: program2, 3: program3, # 保留第三段程序的选择入口 4: program4, 5: program5, 6: program6, 7: program7, 8: program8, 9: program9,#停机 10:program10,#洗枪位置1 11:program11,#洗枪位置2 12:program12,#以下为试运行程序 13:program13, 14:program14, 15:program15, } # 需要在执行前接收额外数据的程序 need_extra_data_programs = {1, 3, 5, 7, 12, 14} # 定义四种循环序列 sequence_1 = [2, 3, 4, 1] # 情况1的循环序列 sequence_2 = [4, 1, 2, 3] # 情况2的循环序列 sequence_3 = [6, 7, 8, 5] # 情况3的循环序列 sequence_4 = [8, 5, 6, 7] # 情况4的循环序列 sequence_5 = [13, 14, 15, 12] # 情况4的循环序列 #坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 delay_time_1357 = 5 #在衔接处轨迹延时时间 delay_time_2468 = 0.5 #在上升、下降轨迹延时时间 delay_time_10_13579_30 = 5 # 外层主循环 while True: try: # 等待串口指令,先验证帧头0xAA print("\n等待串口指令...") buffer = b'' frame_header = 0xAA # 定义帧头为0xAA # 循环等待,直到收到帧头0xAA while True: buffer = serial_receive_data(ser, buffer) # 检查是否收到了帧头 if len(buffer) > 0: if buffer[0] == frame_header: print(f"收到帧头: 0x{frame_header:02X}") # 确保有足够的数据(帧头+至少1字节指令) while len(buffer) < 2: buffer = serial_receive_data(ser, buffer) time.sleep(0.01) # 解析指令 cmd = buffer[1] # 帧头后的第一个字节作为指令 print(f"收到串口指令: 0x{cmd:02X}") # 清除已处理的数据,保留可能的剩余数据 buffer = buffer[2:] break else: print(f"收到无效帧头: 0x{buffer[0]:02X},继续等待帧头0x{frame_header:02X}") # 移除无效的帧头字节,继续等待 buffer = buffer[1:] time.sleep(0.01) # 情况1: 接收到1 if cmd == 1: # 接收额外一帧串口数据(0xFF为帧头,后跟6字节坐标数据) print("program1执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print("偏移数据接收失败,继续执行程序") #continue print("触发情况1: 执行program1") program_switch[1](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program1结束,已发送响应0x80 0x80") # 等待继续指令 print("等待0xAA 0x20开始循环或其他指令...") sub_buffer = b'' while len(sub_buffer) == 0: sub_buffer = serial_receive_data(ser, sub_buffer) time.sleep(0.01) sub_cmd = sub_buffer[0] sub_cmd1 = sub_buffer[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况1循环序列: program2->3->4->1...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_1[current_index] print(f"执行序列程序: {current_program}") # 检查是否需要在执行前接收额外数据 if current_program in need_extra_data_programs: print(f"program{current_program}执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print(f"偏移数据接收失败,继续执行程序") #current_index = (current_index + 1) % len(sequence_1) #continue program_switch[current_program](sock) print(current_program) # 检查是否需要延时 if current_program == 2: print("program2执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 3: print("program3执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 4: print("program4执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 1: print("program1执行完成,延时5秒...") time.sleep(delay_time_1357) else: print(f"当前执行程序是: {current_program}") # 检查是否有停止指令(0xAA 0x30) check_buffer = b'' while ser.in_waiting > 0: check_buffer += ser.read(ser.in_waiting) # 确保有足够的数据再检查 if len(check_buffer) >= 2 and check_buffer[0] == 0xAA and check_buffer[1] == 0x30: print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) else: # 收到其他数据,清空缓冲区 if len(check_buffer) > 0: print(f"收到其他数据: {check_buffer.hex()},清空缓冲区") check_buffer = b'' # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) # 检查是否是其他程序指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 跳出等待循环,回到主指令处理 #break else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") sub_buffer = sub_buffer[2:] # 移除已处理的指令 # 情况2: 接收到3 elif cmd == 3: # 接收额外一帧串口数据 print("program3执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print("偏移数据接收失败,继续执行程序") #continue print("触发情况2: 执行program3") program_switch[3](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program3结束,已发送响应0x80 0x80") # 等待继续指令 print("等待0x20开始循环或其他指令...") sub_buffer = b'' while len(sub_buffer) == 0: sub_buffer = serial_receive_data(ser, sub_buffer) time.sleep(0.01) sub_cmd = sub_buffer[0] sub_cmd1 = sub_buffer[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况2循环序列: program4->1->2->3...") current_index = 0 running = True while running: current_program = sequence_2[current_index] print(f"执行序列程序: {current_program}") # 检查是否需要在执行前接收额外数据 if current_program in need_extra_data_programs: print(f"program{current_program}执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) ser.write(b'\x61\x61') ser.flush() else: print(f"偏移数据接收失败,继续执行程序") #current_index = (current_index + 1) % len(sequence_2) #continue program_switch[current_program](sock) # 检查是否需要延时 if current_program == 4: print("program4执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 1: print("program1执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 2: print("program2执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 3: print("program3执行完成,延时5秒...") time.sleep(delay_time_1357) # 检查是否有停止指令(0xAA 0x30) check_buffer = b'' while ser.in_waiting > 0: check_buffer += ser.read(ser.in_waiting) # 确保有足够的数据再检查 if len(check_buffer) >= 2 and check_buffer[0] == 0xAA and check_buffer[1] == 0x30: print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) else: # 收到其他数据,清空缓冲区 if len(check_buffer) > 0: print(f"收到其他数据: {check_buffer.hex()},清空缓冲区") check_buffer = b'' # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 跳出等待循环,回到主指令处理 #break else: print(f"收到未知子指令: 0x{sub_cmd1:02X},继续等待") sub_buffer = sub_buffer[2:] # 移除已处理的指令 # 情况3: 接收到5 elif cmd == 5: # 接收额外一帧串口数据 print("program5执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print("偏移数据接收失败,继续执行程序") #continue print("触发情况3: 执行program5") program_switch[5](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program5结束,已发送响应0x80 0x80") # 等待继续指令 print("等待0x20开始循环或其他指令...") sub_buffer = b'' while len(sub_buffer) == 0: sub_buffer = serial_receive_data(ser, sub_buffer) time.sleep(0.01) sub_cmd = sub_buffer[0] sub_cmd1 = sub_buffer[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况3循环序列: program6->7->8->5...") current_index = 0 running = True while running: current_program = sequence_3[current_index] print(f"执行序列程序: {current_program}") # 检查是否需要在执行前接收额外数据 if current_program in need_extra_data_programs: print(f"program{current_program}执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) ser.write(b'\x61\x61') ser.flush() else: print(f"偏移数据接收失败,继续执行程序") #current_index = (current_index + 1) % len(sequence_3) #continue program_switch[current_program](sock) # 检查是否需要延时 # 检查是否需要延时 if current_program == 6: print("program6执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 7: print("program7执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 8: print("program8执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 5: print("program5执行完成,延时5秒...") time.sleep(delay_time_1357) # 检查是否有停止指令(0xAA 0x30) check_buffer = b'' while ser.in_waiting > 0: check_buffer += ser.read(ser.in_waiting) # 确保有足够的数据再检查 if len(check_buffer) >= 2 and check_buffer[0] == 0xAA and check_buffer[1] == 0x30: print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) else: # 收到其他数据,清空缓冲区 if len(check_buffer) > 0: print(f"收到其他数据: {check_buffer.hex()},清空缓冲区") check_buffer = b'' # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 跳出等待循环,回到主指令处理 #break else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") sub_buffer = sub_buffer[2:] # 移除已处理的指令 # 情况4: 接收到7 elif cmd == 7: # 接收额外一帧串口数据 print("program7执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) ser.write(b'\x61\x61') ser.flush() else: print("偏移数据接收失败,继续执行程序") #continue print("触发情况4: 执行program7") program_switch[7](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program7结束,已发送响应0x80 0x80") # 等待继续指令 print("等待0x20开始循环或其他指令...") sub_buffer = b'' while len(sub_buffer) == 0: sub_buffer = serial_receive_data(ser, sub_buffer) time.sleep(0.01) sub_cmd = sub_buffer[0] sub_cmd1 = sub_buffer[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况4循环序列: program8->5->6->7...") current_index = 0 running = True while running: current_program = sequence_4[current_index] print(f"执行序列程序: {current_program}") # 检查是否需要在执行前接收额外数据 if current_program in need_extra_data_programs: print(f"program{current_program}执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) ser.write(b'\x61\x61') ser.flush() else: print(f"偏移数据接收失败,继续执行程序") #current_index = (current_index + 1) % len(sequence_4) #continue program_switch[current_program](sock) # 检查是否需要延时 if current_program == 8: print("program8执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 5: print("program5执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 6: print("program6执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 7: print("program7执行完成,延时5秒...") time.sleep(delay_time_1357) # 检查是否有停止指令(0xAA 0x30) check_buffer = b'' while ser.in_waiting > 0: check_buffer += ser.read(ser.in_waiting) # 确保有足够的数据再检查 if len(check_buffer) >= 2 and check_buffer[0] == 0xAA and check_buffer[1] == 0x30: print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) else: # 收到其他数据,清空缓冲区 if len(check_buffer) > 0: print(f"收到其他数据: {check_buffer.hex()},清空缓冲区") check_buffer = b'' # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 跳出等待循环,回到主指令处理 #break else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") sub_buffer = sub_buffer[2:] # 移除已处理的指令 # 试运行程序段,与情况1相同,只不过速度较慢 elif cmd == 12: # 接收额外一帧串口数据(0xFF为帧头,后跟6字节坐标数据) print("program12执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print("偏移数据接收失败,继续执行程序") #continue print("触发情况12: 执行program12") program_switch[12](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program12结束,已发送响应0x80 0x80") # 等待继续指令 print("等待0xAA 0x20开始循环或其他指令...") sub_buffer = b'' while len(sub_buffer) == 0: sub_buffer = serial_receive_data(ser, sub_buffer) time.sleep(0.01) sub_cmd = sub_buffer[0] sub_cmd1 = sub_buffer[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA and sub_cmd1 == 0x20: print("开始情况1循环序列: program2->3->4->1...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_5[current_index] print(f"执行序列程序: {current_program}") # 检查是否需要在执行前接收额外数据 if current_program in need_extra_data_programs: print(f"program{current_program}执行前,接收额外串口数据...") success, x_coord, y_coord, z_coord = parse_frame(ser) if success: # 执行矩阵转换 increments_x, increments_y, increments_z = transform_data(x_coord, y_coord, z_coord) else: print(f"偏移数据接收失败,继续执行程序") #current_index = (current_index + 1) % len(sequence_1) #continue program_switch[current_program](sock) print(current_program) # 检查是否需要延时 if current_program == 13: print("program13执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 14: print("program14执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 15: print("program15执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 12: print("program12执行完成,延时5秒...") time.sleep(delay_time_1357) # 检查是否有停止指令(0xAA 0x30) check_buffer = b'' while ser.in_waiting > 0: check_buffer += ser.read(ser.in_waiting) # 确保有足够的数据再检查 if len(check_buffer) >= 2 and check_buffer[0] == 0xAA and check_buffer[1] == 0x30: print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) else: # 收到其他数据,清空缓冲区 if len(check_buffer) > 0: print(f"收到其他数据: {check_buffer.hex()},清空缓冲区") check_buffer = b'' # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) # 情况5: 接收到9 elif cmd == 9: print("触发情况5: 执行program9") program_switch[9](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program9结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况6: 接收到10 elif cmd == 10: print("触发情况6: 执行program10") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况7: 接收到11 elif cmd == 11: print("触发情况7: 执行program11") program_switch[11](sock) print("返回最外层等待串口数据") # 退出指令 elif cmd == 0x00: print("收到退出指令,程序终止") break elif cmd == 16: #开喷枪 turn_on_relay() print("打开喷枪") elif cmd == 17: #关喷枪 turn_off_relay() print("关闭喷枪") # 未知指令 else: print(f"未知指令: 0x{cmd:02X},等待下一条指令...") time.sleep(0.01) except KeyboardInterrupt: print("程序被手动中断") break except Exception as e: print(f"主循环错误: {e}") time.sleep(0.5) else: print("连接机器人失败") else: print("无法连接到机器人") # 资源清理 if sock: sock.close() if ser and isinstance(ser, serial.Serial) and ser.is_open: ser.close() print("串口已关闭") GPIO.cleanup()